mapper.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import json
  2. from applications.config import Chunk
  3. class TaskConst:
  4. INIT_STATUS = 0
  5. PROCESSING_STATUS = 1
  6. FINISHED_STATUS = 2
  7. FAILED_STATUS = 3
  8. class BaseMySQLClient:
  9. def __init__(self, pool):
  10. self.pool = pool
  11. class Contents(BaseMySQLClient):
  12. async def insert_content(self, doc_id, text):
  13. query = """
  14. INSERT IGNORE INTO contents
  15. (doc_id, text)
  16. VALUES (%s, %s);
  17. """
  18. return await self.pool.async_save(query=query, params=(doc_id, text))
  19. async def update_content_status(self, doc_id, ori_status, new_status):
  20. query = """
  21. UPDATE contents
  22. SET status = %s
  23. WHERE doc_id = %s AND status = %s;
  24. """
  25. return await self.pool.async_save(
  26. query=query, params=(new_status, doc_id, ori_status)
  27. )
  28. class ContentChunks(BaseMySQLClient):
  29. async def insert_chunk(self, chunk: Chunk) -> int:
  30. query = """
  31. INSERT IGNORE INTO content_chunks
  32. (chunk_id, doc_id, text, tokens, topic_purity)
  33. VALUES (%s, %s, %s, %s, %s);
  34. """
  35. return await self.pool.async_save(
  36. query=query,
  37. params=(
  38. chunk.chunk_id,
  39. chunk.doc_id,
  40. chunk.text,
  41. chunk.tokens,
  42. chunk.topic_purity,
  43. ),
  44. )
  45. async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  46. query = """
  47. UPDATE content_chunks
  48. SET chunk_status = %s
  49. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  50. """
  51. return await self.pool.async_save(
  52. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  53. )
  54. async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
  55. query = """
  56. UPDATE content_chunks
  57. SET embedding_status = %s
  58. WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
  59. """
  60. return await self.pool.async_save(
  61. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  62. )
  63. async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
  64. query = """
  65. UPDATE content_chunks
  66. SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s, keywords = %s, questions = %s, chunk_status = %s
  67. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  68. """
  69. return await self.pool.async_save(
  70. query=query,
  71. params=(
  72. chunk.summary, chunk.topic, chunk.domain, chunk.task_type,
  73. json.dumps(chunk.concepts), json.dumps(chunk.keywords), json.dumps(chunk.questions), new_status,
  74. chunk.doc_id, chunk.chunk_id, ori_status
  75. )
  76. )