mapper.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. import json
  2. from applications.config import Chunk
  3. class TaskConst:
  4. INIT_STATUS = 0
  5. PROCESSING_STATUS = 1
  6. FINISHED_STATUS = 2
  7. FAILED_STATUS = 3
  8. CHUNK_USEFUL_STATUS = 1
  9. class BaseMySQLClient(TaskConst):
  10. def __init__(self, pool):
  11. self.pool = pool
  12. class Dataset(BaseMySQLClient):
  13. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  14. query = """
  15. UPDATE dataset set status = %s where id = %s and status = %s;
  16. """
  17. return await self.pool.async_save(
  18. query=query,
  19. params=(new_status, dataset_id, ori_status),
  20. )
  21. class Contents(BaseMySQLClient):
  22. async def insert_content(self, doc_id, text, text_type, title, dataset_id):
  23. query = """
  24. INSERT IGNORE INTO contents
  25. (doc_id, text, text_type, title, dataset_id)
  26. VALUES (%s, %s, %s, %s, %s);
  27. """
  28. return await self.pool.async_save(
  29. query=query, params=(doc_id, text, text_type, title, dataset_id)
  30. )
  31. async def update_content_status(self, doc_id, ori_status, new_status):
  32. query = """
  33. UPDATE contents
  34. SET status = %s
  35. WHERE doc_id = %s AND status = %s;
  36. """
  37. return await self.pool.async_save(
  38. query=query, params=(new_status, doc_id, ori_status)
  39. )
  40. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  41. query = """
  42. UPDATE contents
  43. SET status = %s
  44. WHERE dataset_id = %s AND status = %s;
  45. """
  46. return await self.pool.async_save(
  47. query=query, params=(new_status, dataset_id, ori_status)
  48. )
  49. async def update_doc_status(self, doc_id, ori_status, new_status):
  50. """
  51. this function is to change the using status of each document
  52. :param doc_id:
  53. :param ori_status:
  54. :param new_status:
  55. :return:
  56. """
  57. query = """
  58. UPDATE contents
  59. SET doc_status = %s WHERE doc_id = %s and doc_status = %s;
  60. """
  61. return await self.pool.async_save(
  62. query=query, params=(new_status, doc_id, ori_status)
  63. )
  64. class ContentChunks(BaseMySQLClient):
  65. async def insert_chunk(self, chunk: Chunk) -> int:
  66. query = """
  67. INSERT IGNORE INTO content_chunks
  68. (chunk_id, doc_id, text, tokens, topic_purity, text_type, dataset_id, status)
  69. VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
  70. """
  71. return await self.pool.async_save(
  72. query=query,
  73. params=(
  74. chunk.chunk_id,
  75. chunk.doc_id,
  76. chunk.text,
  77. chunk.tokens,
  78. chunk.topic_purity,
  79. chunk.text_type,
  80. chunk.dataset_id,
  81. chunk.status,
  82. ),
  83. )
  84. async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  85. query = """
  86. UPDATE content_chunks
  87. SET chunk_status = %s
  88. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s and status = %s;
  89. """
  90. return await self.pool.async_save(
  91. query=query, params=(new_status, doc_id, chunk_id, ori_status, self.CHUNK_USEFUL_STATUS)
  92. )
  93. async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
  94. query = """
  95. UPDATE content_chunks
  96. SET embedding_status = %s
  97. WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
  98. """
  99. return await self.pool.async_save(
  100. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  101. )
  102. async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
  103. query = """
  104. UPDATE content_chunks
  105. SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s,
  106. keywords = %s, questions = %s, chunk_status = %s, entities = %s
  107. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  108. """
  109. return await self.pool.async_save(
  110. query=query,
  111. params=(
  112. chunk.summary,
  113. chunk.topic,
  114. chunk.domain,
  115. chunk.task_type,
  116. json.dumps(chunk.concepts),
  117. json.dumps(chunk.keywords),
  118. json.dumps(chunk.questions),
  119. new_status,
  120. json.dumps(chunk.entities),
  121. chunk.doc_id,
  122. chunk.chunk_id,
  123. ori_status,
  124. ),
  125. )
  126. async def update_es_status(self, doc_id, chunk_id, ori_status, new_status):
  127. query = """
  128. UPDATE content_chunks SET es_status = %s
  129. WHERE doc_id = %s AND chunk_id = %s AND es_status = %s;
  130. """
  131. return await self.pool.async_save(
  132. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  133. )
  134. async def update_doc_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  135. query = """
  136. UPDATE content_chunks set status = %s
  137. WHERE doc_id = %s AND chunk_id = %s AND status = %s;
  138. """
  139. return await self.pool.async_save(
  140. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  141. )
  142. async def update_doc_status(self, doc_id, ori_status, new_status):
  143. query = """
  144. UPDATE content_chunks set status = %s
  145. WHERE doc_id = %s AND status = %s;
  146. """
  147. return await self.pool.async_save(
  148. query=query, params=(new_status, doc_id, ori_status)
  149. )
  150. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  151. query = """
  152. UPDATE content_chunks set status = %s
  153. WHERE dataset_id = %s AND status = %s;
  154. """
  155. return await self.pool.async_save(
  156. query=query, params=(new_status, dataset_id, ori_status)
  157. )