content_service.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import json
  2. import requests
  3. from core.database import DBHelper
  4. from data_models.contents import Contents
  5. def get_contents(page_num, page_size, dataset_id):
  6. db_helper = DBHelper()
  7. res = db_helper.get_paginated(Contents, page_num, page_size, order_by={'id': 'desc'}, dataset_id=dataset_id)
  8. data = []
  9. for entity in res["entities"]:
  10. data.append({'text': entity.text, 'title': entity.title, 'doc_id': entity.doc_id})
  11. res['entities'] = data
  12. return res
  13. def get_content(doc_id):
  14. db_helper = DBHelper()
  15. content = db_helper.get(Contents, doc_id=doc_id)
  16. data = {'title': content.title, 'text': content.text, 'doc_id': content.doc_id}
  17. return data
  18. def add_contents(content_param):
  19. try:
  20. response = requests.post(
  21. url='http://61.48.133.26:8001/api/chunk',
  22. json={
  23. "text": content_param.text,
  24. "title": content_param.title,
  25. "dataset_id": content_param.datasetId},
  26. headers={"Content-Type": "application/json"},
  27. )
  28. doc_id = response.json()['doc_id']
  29. if doc_id:
  30. return True
  31. except Exception as e:
  32. print(e)
  33. return False
  34. if __name__ == '__main__':
  35. print(json.dumps(get_contents(1, 10), ensure_ascii=False))