mapper.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. import json
  2. from applications.config import Chunk
  3. class TaskConst:
  4. INIT_STATUS = 0
  5. PROCESSING_STATUS = 1
  6. FINISHED_STATUS = 2
  7. FAILED_STATUS = 3
  8. class BaseMySQLClient:
  9. def __init__(self, pool):
  10. self.pool = pool
  11. class Dataset(BaseMySQLClient):
  12. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  13. query = """
  14. UPDATE dataset set status = %s where id = %s and status = %s;
  15. """
  16. return await self.pool.async_save(
  17. query=query,
  18. params=(new_status, dataset_id, ori_status),
  19. )
  20. class Contents(BaseMySQLClient):
  21. async def insert_content(self, doc_id, text, text_type, title, dataset_id):
  22. query = """
  23. INSERT IGNORE INTO contents
  24. (doc_id, text, text_type, title, dataset_id)
  25. VALUES (%s, %s, %s, %s, %s);
  26. """
  27. return await self.pool.async_save(
  28. query=query, params=(doc_id, text, text_type, title, dataset_id)
  29. )
  30. async def update_content_status(self, doc_id, ori_status, new_status):
  31. query = """
  32. UPDATE contents
  33. SET status = %s
  34. WHERE doc_id = %s AND status = %s;
  35. """
  36. return await self.pool.async_save(
  37. query=query, params=(new_status, doc_id, ori_status)
  38. )
  39. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  40. query = """
  41. UPDATE contents
  42. SET status = %s
  43. WHERE dataset_id = %s AND status = %s;
  44. """
  45. return await self.pool.async_save(
  46. query=query, params=(new_status, dataset_id, ori_status)
  47. )
  48. async def update_doc_status(self, doc_id, ori_status, new_status):
  49. """
  50. this function is to change the using status of each document
  51. :param doc_id:
  52. :param ori_status:
  53. :param new_status:
  54. :return:
  55. """
  56. query = """
  57. UPDATE contents
  58. SET doc_status = %s WHERE doc_id = %s and doc_status = %s;
  59. """
  60. return await self.pool.async_save(
  61. query=query, params=(new_status, doc_id, ori_status)
  62. )
  63. class ContentChunks(BaseMySQLClient):
  64. async def insert_chunk(self, chunk: Chunk) -> int:
  65. query = """
  66. INSERT IGNORE INTO content_chunks
  67. (chunk_id, doc_id, text, tokens, topic_purity, text_type, dataset_id)
  68. VALUES (%s, %s, %s, %s, %s, %s, %s);
  69. """
  70. return await self.pool.async_save(
  71. query=query,
  72. params=(
  73. chunk.chunk_id,
  74. chunk.doc_id,
  75. chunk.text,
  76. chunk.tokens,
  77. chunk.topic_purity,
  78. chunk.text_type,
  79. chunk.dataset_id,
  80. ),
  81. )
  82. async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  83. query = """
  84. UPDATE content_chunks
  85. SET chunk_status = %s
  86. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  87. """
  88. return await self.pool.async_save(
  89. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  90. )
  91. async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
  92. query = """
  93. UPDATE content_chunks
  94. SET embedding_status = %s
  95. WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
  96. """
  97. return await self.pool.async_save(
  98. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  99. )
  100. async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
  101. query = """
  102. UPDATE content_chunks
  103. SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s,
  104. keywords = %s, questions = %s, chunk_status = %s, entities = %s
  105. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  106. """
  107. return await self.pool.async_save(
  108. query=query,
  109. params=(
  110. chunk.summary,
  111. chunk.topic,
  112. chunk.domain,
  113. chunk.task_type,
  114. json.dumps(chunk.concepts),
  115. json.dumps(chunk.keywords),
  116. json.dumps(chunk.questions),
  117. new_status,
  118. json.dumps(chunk.entities),
  119. chunk.doc_id,
  120. chunk.chunk_id,
  121. ori_status,
  122. ),
  123. )
  124. async def update_es_status(self, doc_id, chunk_id, ori_status, new_status):
  125. query = """
  126. UPDATE content_chunks SET es_status = %s
  127. WHERE doc_id = %s AND chunk_id = %s AND es_status = %s;
  128. """
  129. return await self.pool.async_save(
  130. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  131. )
  132. async def update_doc_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  133. query = """
  134. UPDATE content_chunks set status = %s
  135. WHERE doc_id = %s AND chunk_id = %s AND status = %s;
  136. """
  137. return await self.pool.async_save(
  138. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  139. )
  140. async def update_doc_status(self, doc_id, ori_status, new_status):
  141. query = """
  142. UPDATE content_chunks set status = %s
  143. WHERE doc_id = %s AND status = %s;
  144. """
  145. return await self.pool.async_save(
  146. query=query, params=(new_status, doc_id, ori_status)
  147. )
  148. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  149. query = """
  150. UPDATE content_chunks set status = %s
  151. WHERE dataset_id = %s AND status = %s;
  152. """
  153. return await self.pool.async_save(
  154. query=query, params=(new_status, dataset_id, ori_status)
  155. )