mapper.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. import json
  2. from applications.config import Chunk
  3. class TaskConst:
  4. INIT_STATUS = 0
  5. PROCESSING_STATUS = 1
  6. FINISHED_STATUS = 2
  7. FAILED_STATUS = 3
  8. class BaseMySQLClient:
  9. def __init__(self, pool):
  10. self.pool = pool
  11. class Contents(BaseMySQLClient):
  12. async def insert_content(self, doc_id, text, text_type):
  13. query = """
  14. INSERT IGNORE INTO contents
  15. (doc_id, text, text_type)
  16. VALUES (%s, %s, %s);
  17. """
  18. return await self.pool.async_save(query=query, params=(doc_id, text, text_type))
  19. async def update_content_status(self, doc_id, ori_status, new_status):
  20. query = """
  21. UPDATE contents
  22. SET status = %s
  23. WHERE doc_id = %s AND status = %s;
  24. """
  25. return await self.pool.async_save(
  26. query=query, params=(new_status, doc_id, ori_status)
  27. )
  28. class ContentChunks(BaseMySQLClient):
  29. async def insert_chunk(self, chunk: Chunk) -> int:
  30. query = """
  31. INSERT IGNORE INTO content_chunks
  32. (chunk_id, doc_id, text, tokens, topic_purity, text_type)
  33. VALUES (%s, %s, %s, %s, %s, %s);
  34. """
  35. return await self.pool.async_save(
  36. query=query,
  37. params=(
  38. chunk.chunk_id,
  39. chunk.doc_id,
  40. chunk.text,
  41. chunk.tokens,
  42. chunk.topic_purity,
  43. chunk.text_type,
  44. ),
  45. )
  46. async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  47. query = """
  48. UPDATE content_chunks
  49. SET chunk_status = %s
  50. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  51. """
  52. return await self.pool.async_save(
  53. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  54. )
  55. async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
  56. query = """
  57. UPDATE content_chunks
  58. SET embedding_status = %s
  59. WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
  60. """
  61. return await self.pool.async_save(
  62. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  63. )
  64. async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
  65. query = """
  66. UPDATE content_chunks
  67. SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s,
  68. keywords = %s, questions = %s, chunk_status = %s, entities = %s
  69. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  70. """
  71. return await self.pool.async_save(
  72. query=query,
  73. params=(
  74. chunk.summary,
  75. chunk.topic,
  76. chunk.domain,
  77. chunk.task_type,
  78. json.dumps(chunk.concepts),
  79. json.dumps(chunk.keywords),
  80. json.dumps(chunk.questions),
  81. new_status,
  82. json.dumps(chunk.entities),
  83. chunk.doc_id,
  84. chunk.chunk_id,
  85. ori_status,
  86. ),
  87. )