mapper.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. import json
  2. from applications.config import Chunk
  3. class TaskConst:
  4. INIT_STATUS = 0
  5. PROCESSING_STATUS = 1
  6. FINISHED_STATUS = 2
  7. FAILED_STATUS = 3
  8. CHUNK_USEFUL_STATUS = 1
  9. class BaseMySQLClient(TaskConst):
  10. def __init__(self, pool):
  11. self.pool = pool
  12. class Dataset(BaseMySQLClient):
  13. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  14. query = """
  15. UPDATE dataset SET status = %s WHERE id = %s AND status = %s;
  16. """
  17. return await self.pool.async_save(
  18. query=query, params=(new_status, dataset_id, ori_status)
  19. )
  20. async def select_dataset(self, status=1):
  21. query = """
  22. SELECT * FROM dataset WHERE status = %s;
  23. """
  24. return await self.pool.async_fetch(query=query, params=(status,))
  25. async def add_dataset(self, name):
  26. query = """
  27. INSERT INTO dataset (name) VALUES (%s);
  28. """
  29. return await self.pool.async_save(query=query, params=(name,))
  30. async def select_dataset_by_id(self, id_, status: int = 1):
  31. query = """
  32. SELECT * FROM dataset WHERE id = %s AND status = %s;
  33. """
  34. return await self.pool.async_fetch(query=query, params=(id_, status))
  35. class Contents(BaseMySQLClient):
  36. async def insert_content(self, doc_id, text, text_type, title, dataset_id):
  37. query = """
  38. INSERT IGNORE INTO contents
  39. (doc_id, text, text_type, title, dataset_id)
  40. VALUES (%s, %s, %s, %s, %s);
  41. """
  42. return await self.pool.async_save(
  43. query=query, params=(doc_id, text, text_type, title, dataset_id)
  44. )
  45. async def update_content_status(self, doc_id, ori_status, new_status):
  46. query = """
  47. UPDATE contents
  48. SET status = %s
  49. WHERE doc_id = %s AND status = %s;
  50. """
  51. return await self.pool.async_save(
  52. query=query, params=(new_status, doc_id, ori_status)
  53. )
  54. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  55. query = """
  56. UPDATE contents
  57. SET status = %s
  58. WHERE dataset_id = %s AND status = %s;
  59. """
  60. return await self.pool.async_save(
  61. query=query, params=(new_status, dataset_id, ori_status)
  62. )
  63. async def update_doc_status(self, doc_id, ori_status, new_status):
  64. """
  65. this function is to change the using status of each document
  66. :param doc_id:
  67. :param ori_status:
  68. :param new_status:
  69. :return:
  70. """
  71. query = """
  72. UPDATE contents SET doc_status = %s WHERE doc_id = %s AND doc_status = %s;
  73. """
  74. return await self.pool.async_save(
  75. query=query, params=(new_status, doc_id, ori_status)
  76. )
  77. async def select_count(self, dataset_id, doc_status=1):
  78. query = """
  79. SELECT count(*) AS count FROM contents WHERE dataset_id = %s AND doc_status = %s;
  80. """
  81. rows = await self.pool.async_fetch(query=query, params=(dataset_id, doc_status))
  82. return rows[0]["count"] if rows else 0
  83. async def select_content_by_doc_id(self, doc_id):
  84. query = """SELECT * FROM contents WHERE doc_id = %s;"""
  85. return await self.pool.async_fetch(query=query, params=(doc_id,))
  86. async def select_contents(
  87. self,
  88. page_num: int,
  89. page_size: int,
  90. order_by=None,
  91. dataset_id: int = None,
  92. doc_status: int = 1,
  93. ):
  94. """
  95. 分页查询 contents 表,并返回分页信息
  96. :param page_num: 页码,从 1 开始
  97. :param page_size: 每页数量
  98. :param order_by: 排序条件,例如 {"id": "desc"} 或 {"created_at": "asc"}
  99. :param dataset_id: 数据集 ID
  100. :param doc_status: 文档状态(默认 1)
  101. :return: dict,包含 entities、total_count、page、page_size、total_pages
  102. """
  103. if order_by is None:
  104. order_by = {"id": "desc"}
  105. offset = (page_num - 1) * page_size
  106. # 动态拼接 where 条件
  107. where_clauses = ["doc_status = %s"]
  108. params = [doc_status]
  109. if dataset_id:
  110. where_clauses.append("dataset_id = %s")
  111. params.append(dataset_id)
  112. where_sql = " AND ".join(where_clauses)
  113. # 动态拼接 order by
  114. order_field, order_direction = list(order_by.items())[0]
  115. order_sql = f"ORDER BY {order_field} {order_direction.upper()}"
  116. # 查询总数
  117. count_query = f"SELECT COUNT(*) as total_count FROM contents WHERE {where_sql};"
  118. count_result = await self.pool.async_fetch(
  119. query=count_query, params=tuple(params)
  120. )
  121. total_count = count_result[0]["total_count"] if count_result else 0
  122. # 查询分页数据
  123. query = f"""
  124. SELECT * FROM contents
  125. WHERE {where_sql}
  126. {order_sql}
  127. LIMIT %s OFFSET %s;
  128. """
  129. params.extend([page_size, offset])
  130. entities = await self.pool.async_fetch(query=query, params=tuple(params))
  131. total_pages = (total_count + page_size - 1) // page_size # 向上取整
  132. return {
  133. "entities": entities,
  134. "total_count": total_count,
  135. "page": page_num,
  136. "page_size": page_size,
  137. "total_pages": total_pages,
  138. }
  139. class ContentChunks(BaseMySQLClient):
  140. async def insert_chunk(self, chunk: Chunk) -> int:
  141. query = """
  142. INSERT IGNORE INTO content_chunks
  143. (chunk_id, doc_id, text, tokens, topic_purity, text_type, dataset_id, status)
  144. VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
  145. """
  146. return await self.pool.async_save(
  147. query=query,
  148. params=(
  149. chunk.chunk_id,
  150. chunk.doc_id,
  151. chunk.text,
  152. chunk.tokens,
  153. chunk.topic_purity,
  154. chunk.text_type,
  155. chunk.dataset_id,
  156. chunk.status,
  157. ),
  158. )
  159. async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  160. query = """
  161. UPDATE content_chunks
  162. SET chunk_status = %s
  163. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s and status = %s;
  164. """
  165. return await self.pool.async_save(
  166. query=query,
  167. params=(new_status, doc_id, chunk_id, ori_status, self.CHUNK_USEFUL_STATUS),
  168. )
  169. async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
  170. query = """
  171. UPDATE content_chunks
  172. SET embedding_status = %s
  173. WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
  174. """
  175. return await self.pool.async_save(
  176. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  177. )
  178. async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
  179. query = """
  180. UPDATE content_chunks
  181. SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s,
  182. keywords = %s, questions = %s, chunk_status = %s, entities = %s
  183. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  184. """
  185. return await self.pool.async_save(
  186. query=query,
  187. params=(
  188. chunk.summary,
  189. chunk.topic,
  190. chunk.domain,
  191. chunk.task_type,
  192. json.dumps(chunk.concepts),
  193. json.dumps(chunk.keywords),
  194. json.dumps(chunk.questions),
  195. new_status,
  196. json.dumps(chunk.entities),
  197. chunk.doc_id,
  198. chunk.chunk_id,
  199. ori_status,
  200. ),
  201. )
  202. async def update_es_status(self, doc_id, chunk_id, ori_status, new_status):
  203. query = """
  204. UPDATE content_chunks SET es_status = %s
  205. WHERE doc_id = %s AND chunk_id = %s AND es_status = %s;
  206. """
  207. return await self.pool.async_save(
  208. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  209. )
  210. async def update_doc_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  211. query = """
  212. UPDATE content_chunks set status = %s
  213. WHERE doc_id = %s AND chunk_id = %s AND status = %s;
  214. """
  215. return await self.pool.async_save(
  216. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  217. )
  218. async def update_doc_status(self, doc_id, ori_status, new_status):
  219. query = """
  220. UPDATE content_chunks set status = %s
  221. WHERE doc_id = %s AND status = %s;
  222. """
  223. return await self.pool.async_save(
  224. query=query, params=(new_status, doc_id, ori_status)
  225. )
  226. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  227. query = """
  228. UPDATE content_chunks set status = %s
  229. WHERE dataset_id = %s AND status = %s;
  230. """
  231. return await self.pool.async_save(
  232. query=query, params=(new_status, dataset_id, ori_status)
  233. )
  234. async def select_chunk_content(self, doc_id, chunk_id):
  235. query = """
  236. SELECT * FROM content_chunks WHERE doc_id = %s AND chunk_id = %s;
  237. """
  238. return await self.pool.async_fetch(query=query, params=(doc_id, chunk_id))