content_service.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. import json
  2. import requests
  3. from core.database import DBHelper
  4. from data_models.contents import Contents
  5. def get_contents(page_num, page_size, dataset_id):
  6. db_helper = DBHelper()
  7. res = db_helper.get_paginated(Contents, page_num, page_size, order_by={'id': 'desc'}, dataset_id=dataset_id,
  8. doc_status=1)
  9. data = []
  10. for entity in res["entities"]:
  11. data.append({'text': entity.text, 'title': entity.title, 'doc_id': entity.doc_id})
  12. res['entities'] = data
  13. return res
  14. def get_content(doc_id):
  15. db_helper = DBHelper()
  16. content = db_helper.get(Contents, doc_id=doc_id)
  17. data = {'title': content.title, 'text': content.text, 'doc_id': content.doc_id}
  18. return data
  19. def add_contents(content_param):
  20. try:
  21. response = requests.post(
  22. url='http://61.48.133.26:8001/api/chunk',
  23. json={
  24. "text": content_param.text,
  25. "title": content_param.title,
  26. "dataset_id": content_param.datasetId},
  27. headers={"Content-Type": "application/json"},
  28. )
  29. doc_id = response.json()['doc_id']
  30. if doc_id:
  31. return True
  32. except Exception as e:
  33. print(e)
  34. return False
  35. if __name__ == '__main__':
  36. print(json.dumps(get_contents(1, 10), ensure_ascii=False))