mapper.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. import json
  2. from datetime import datetime
  3. from applications.config import Chunk
  4. class TaskConst:
  5. INIT_STATUS = 0
  6. PROCESSING_STATUS = 1
  7. FINISHED_STATUS = 2
  8. FAILED_STATUS = 3
  9. CHUNK_USEFUL_STATUS = 1
  10. class BaseMySQLClient(TaskConst):
  11. def __init__(self, pool):
  12. self.pool = pool
  13. class Dataset(BaseMySQLClient):
  14. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  15. query = """
  16. UPDATE dataset set status = %s where id = %s and status = %s;
  17. """
  18. return await self.pool.async_save(
  19. query=query,
  20. params=(new_status, dataset_id, ori_status),
  21. )
  22. async def select_dataset(self, status=1):
  23. query = """
  24. select * from dataset where status = %s;
  25. """
  26. return await self.pool.async_fetch(query=query, params=(status,))
  27. async def add_dataset(self, name):
  28. query = """
  29. insert into dataset (name, created_at, updated_at, status) values (%s, %s, %s, %s);
  30. """
  31. return await self.pool.async_save(
  32. query=query, params=(name, datetime.now(), datetime.now(), 1)
  33. )
  34. async def select_dataset_by_id(self, id, status=1):
  35. query = """
  36. select * from dataset where id = %s and status = %s;
  37. """
  38. return await self.pool.async_fetch(query=query, params=(id, status))
  39. class Contents(BaseMySQLClient):
  40. async def insert_content(self, doc_id, text, text_type, title, dataset_id):
  41. query = """
  42. INSERT IGNORE INTO contents
  43. (doc_id, text, text_type, title, dataset_id)
  44. VALUES (%s, %s, %s, %s, %s);
  45. """
  46. return await self.pool.async_save(
  47. query=query, params=(doc_id, text, text_type, title, dataset_id)
  48. )
  49. async def update_content_status(self, doc_id, ori_status, new_status):
  50. query = """
  51. UPDATE contents
  52. SET status = %s
  53. WHERE doc_id = %s AND status = %s;
  54. """
  55. return await self.pool.async_save(
  56. query=query, params=(new_status, doc_id, ori_status)
  57. )
  58. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  59. query = """
  60. UPDATE contents
  61. SET status = %s
  62. WHERE dataset_id = %s AND status = %s;
  63. """
  64. return await self.pool.async_save(
  65. query=query, params=(new_status, dataset_id, ori_status)
  66. )
  67. async def update_doc_status(self, doc_id, ori_status, new_status):
  68. """
  69. this function is to change the using status of each document
  70. :param doc_id:
  71. :param ori_status:
  72. :param new_status:
  73. :return:
  74. """
  75. query = """
  76. UPDATE contents
  77. SET doc_status = %s WHERE doc_id = %s and doc_status = %s;
  78. """
  79. return await self.pool.async_save(
  80. query=query, params=(new_status, doc_id, ori_status)
  81. )
  82. async def select_count(self, dataset_id, doc_status=1):
  83. query = """
  84. select count(*) as count from contents where dataset_id = %s and doc_status = %s;
  85. """
  86. rows = await self.pool.async_fetch(query=query, params=(dataset_id, doc_status))
  87. return rows[0]["count"] if rows else 0
  88. async def select_content_by_doc_id(self, doc_id):
  89. query = """
  90. select * from contents where doc_id = %s;
  91. """
  92. return await self.pool.async_fetch(query=query, params=(doc_id,))
  93. async def select_contents(
  94. self,
  95. page_num: int,
  96. page_size: int,
  97. order_by: dict = {"id": "desc"},
  98. dataset_id: int = None,
  99. doc_status: int = 1,
  100. ):
  101. """
  102. 分页查询 contents 表,并返回分页信息
  103. :param page_num: 页码,从 1 开始
  104. :param page_size: 每页数量
  105. :param order_by: 排序条件,例如 {"id": "desc"} 或 {"created_at": "asc"}
  106. :param dataset_id: 数据集 ID
  107. :param doc_status: 文档状态(默认 1)
  108. :return: dict,包含 entities、total_count、page、page_size、total_pages
  109. """
  110. offset = (page_num - 1) * page_size
  111. # 动态拼接 where 条件
  112. where_clauses = ["doc_status = %s"]
  113. params = [doc_status]
  114. if dataset_id:
  115. where_clauses.append("dataset_id = %s")
  116. params.append(dataset_id)
  117. where_sql = " AND ".join(where_clauses)
  118. # 动态拼接 order by
  119. order_field, order_direction = list(order_by.items())[0]
  120. order_sql = f"ORDER BY {order_field} {order_direction.upper()}"
  121. # 查询总数
  122. count_query = f"SELECT COUNT(*) as total_count FROM contents WHERE {where_sql};"
  123. count_result = await self.pool.async_fetch(
  124. query=count_query, params=tuple(params)
  125. )
  126. total_count = count_result[0]["total_count"] if count_result else 0
  127. # 查询分页数据
  128. query = f"""
  129. SELECT * FROM contents
  130. WHERE {where_sql}
  131. {order_sql}
  132. LIMIT %s OFFSET %s;
  133. """
  134. params.extend([page_size, offset])
  135. entities = await self.pool.async_fetch(query=query, params=tuple(params))
  136. total_pages = (total_count + page_size - 1) // page_size # 向上取整
  137. return {
  138. "entities": entities,
  139. "total_count": total_count,
  140. "page": page_num,
  141. "page_size": page_size,
  142. "total_pages": total_pages,
  143. }
  144. class ContentChunks(BaseMySQLClient):
  145. async def insert_chunk(self, chunk: Chunk) -> int:
  146. query = """
  147. INSERT IGNORE INTO content_chunks
  148. (chunk_id, doc_id, text, tokens, topic_purity, text_type, dataset_id, status)
  149. VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
  150. """
  151. return await self.pool.async_save(
  152. query=query,
  153. params=(
  154. chunk.chunk_id,
  155. chunk.doc_id,
  156. chunk.text,
  157. chunk.tokens,
  158. chunk.topic_purity,
  159. chunk.text_type,
  160. chunk.dataset_id,
  161. chunk.status,
  162. ),
  163. )
  164. async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  165. query = """
  166. UPDATE content_chunks
  167. SET chunk_status = %s
  168. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s and status = %s;
  169. """
  170. return await self.pool.async_save(
  171. query=query,
  172. params=(new_status, doc_id, chunk_id, ori_status, self.CHUNK_USEFUL_STATUS),
  173. )
  174. async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
  175. query = """
  176. UPDATE content_chunks
  177. SET embedding_status = %s
  178. WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
  179. """
  180. return await self.pool.async_save(
  181. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  182. )
  183. async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
  184. query = """
  185. UPDATE content_chunks
  186. SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s,
  187. keywords = %s, questions = %s, chunk_status = %s, entities = %s
  188. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  189. """
  190. return await self.pool.async_save(
  191. query=query,
  192. params=(
  193. chunk.summary,
  194. chunk.topic,
  195. chunk.domain,
  196. chunk.task_type,
  197. json.dumps(chunk.concepts),
  198. json.dumps(chunk.keywords),
  199. json.dumps(chunk.questions),
  200. new_status,
  201. json.dumps(chunk.entities),
  202. chunk.doc_id,
  203. chunk.chunk_id,
  204. ori_status,
  205. ),
  206. )
  207. async def update_es_status(self, doc_id, chunk_id, ori_status, new_status):
  208. query = """
  209. UPDATE content_chunks SET es_status = %s
  210. WHERE doc_id = %s AND chunk_id = %s AND es_status = %s;
  211. """
  212. return await self.pool.async_save(
  213. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  214. )
  215. async def update_doc_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  216. query = """
  217. UPDATE content_chunks set status = %s
  218. WHERE doc_id = %s AND chunk_id = %s AND status = %s;
  219. """
  220. return await self.pool.async_save(
  221. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  222. )
  223. async def update_doc_status(self, doc_id, ori_status, new_status):
  224. query = """
  225. UPDATE content_chunks set status = %s
  226. WHERE doc_id = %s AND status = %s;
  227. """
  228. return await self.pool.async_save(
  229. query=query, params=(new_status, doc_id, ori_status)
  230. )
  231. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  232. query = """
  233. UPDATE content_chunks set status = %s
  234. WHERE dataset_id = %s AND status = %s;
  235. """
  236. return await self.pool.async_save(
  237. query=query, params=(new_status, dataset_id, ori_status)
  238. )
  239. async def select_chunk_content(self, doc_id, chunk_id, status=1):
  240. query = """
  241. select * from content_chunks where doc_id = %s and chunk_id = %s and status = %s;
  242. """
  243. return await self.pool.async_fetch(
  244. query=query, params=(doc_id, chunk_id, status)
  245. )