mapper.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. import json
  2. from applications.config import Chunk
  3. class TaskConst:
  4. INIT_STATUS = 0
  5. PROCESSING_STATUS = 1
  6. FINISHED_STATUS = 2
  7. FAILED_STATUS = 3
  8. class BaseMySQLClient:
  9. def __init__(self, pool):
  10. self.pool = pool
  11. class Contents(BaseMySQLClient):
  12. async def insert_content(self, doc_id, text, text_type, title, dataset_id):
  13. query = """
  14. INSERT IGNORE INTO contents
  15. (doc_id, text, text_type, title, dataset_id)
  16. VALUES (%s, %s, %s, %s, %s);
  17. """
  18. return await self.pool.async_save(
  19. query=query, params=(doc_id, text, text_type, title, dataset_id)
  20. )
  21. async def update_content_status(self, doc_id, ori_status, new_status):
  22. query = """
  23. UPDATE contents
  24. SET status = %s
  25. WHERE doc_id = %s AND status = %s;
  26. """
  27. return await self.pool.async_save(
  28. query=query, params=(new_status, doc_id, ori_status)
  29. )
  30. class ContentChunks(BaseMySQLClient):
  31. async def insert_chunk(self, chunk: Chunk) -> int:
  32. query = """
  33. INSERT IGNORE INTO content_chunks
  34. (chunk_id, doc_id, text, tokens, topic_purity, text_type, dataset_id)
  35. VALUES (%s, %s, %s, %s, %s, %s, %s);
  36. """
  37. return await self.pool.async_save(
  38. query=query,
  39. params=(
  40. chunk.chunk_id,
  41. chunk.doc_id,
  42. chunk.text,
  43. chunk.tokens,
  44. chunk.topic_purity,
  45. chunk.text_type,
  46. chunk.dataset_id,
  47. ),
  48. )
  49. async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  50. query = """
  51. UPDATE content_chunks
  52. SET chunk_status = %s
  53. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  54. """
  55. return await self.pool.async_save(
  56. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  57. )
  58. async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
  59. query = """
  60. UPDATE content_chunks
  61. SET embedding_status = %s
  62. WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
  63. """
  64. return await self.pool.async_save(
  65. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  66. )
  67. async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
  68. query = """
  69. UPDATE content_chunks
  70. SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s,
  71. keywords = %s, questions = %s, chunk_status = %s, entities = %s
  72. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  73. """
  74. return await self.pool.async_save(
  75. query=query,
  76. params=(
  77. chunk.summary,
  78. chunk.topic,
  79. chunk.domain,
  80. chunk.task_type,
  81. json.dumps(chunk.concepts),
  82. json.dumps(chunk.keywords),
  83. json.dumps(chunk.questions),
  84. new_status,
  85. json.dumps(chunk.entities),
  86. chunk.doc_id,
  87. chunk.chunk_id,
  88. ori_status,
  89. ),
  90. )
  91. async def update_es_status(self, doc_id, chunk_id, ori_status, new_status):
  92. query = """
  93. UPDATE content_chunks SET es_status = %s
  94. WHERE doc_id = %s AND chunk_id = %s AND es_status = %s;
  95. """
  96. return await self.pool.async_save(
  97. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  98. )