mapper.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. import json
  2. from datetime import datetime
  3. from applications.config import Chunk
  4. class TaskConst:
  5. INIT_STATUS = 0
  6. PROCESSING_STATUS = 1
  7. FINISHED_STATUS = 2
  8. FAILED_STATUS = 3
  9. class BaseMySQLClient:
  10. def __init__(self, pool):
  11. self.pool = pool
  12. class Dataset(BaseMySQLClient):
  13. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  14. query = """
  15. UPDATE dataset set status = %s where id = %s and status = %s;
  16. """
  17. return await self.pool.async_save(
  18. query=query,
  19. params=(new_status, dataset_id, ori_status),
  20. )
  21. async def select_dataset(self, status=1):
  22. query = """
  23. select * from dataset where status = %s;
  24. """
  25. return await self.pool.async_fetch(
  26. query=query,
  27. params=(status,)
  28. )
  29. async def add_dataset(self, name):
  30. query = """
  31. insert into dataset (name, created_at, updated_at, status) values (%s, %s, %s, %s);
  32. """
  33. return await self.pool.async_save(
  34. query=query,
  35. params=(name, datetime.now(), datetime.now(), 1)
  36. )
  37. async def select_dataset_by_id(self, id, status=1):
  38. query = """
  39. select * from dataset where id = %s and status = %s;
  40. """
  41. return await self.pool.async_fetch(
  42. query=query,
  43. params=(id, status)
  44. )
  45. class Contents(BaseMySQLClient):
  46. async def insert_content(self, doc_id, text, text_type, title, dataset_id):
  47. query = """
  48. INSERT IGNORE INTO contents
  49. (doc_id, text, text_type, title, dataset_id)
  50. VALUES (%s, %s, %s, %s, %s);
  51. """
  52. return await self.pool.async_save(
  53. query=query, params=(doc_id, text, text_type, title, dataset_id)
  54. )
  55. async def update_content_status(self, doc_id, ori_status, new_status):
  56. query = """
  57. UPDATE contents
  58. SET status = %s
  59. WHERE doc_id = %s AND status = %s;
  60. """
  61. return await self.pool.async_save(
  62. query=query, params=(new_status, doc_id, ori_status)
  63. )
  64. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  65. query = """
  66. UPDATE contents
  67. SET status = %s
  68. WHERE dataset_id = %s AND status = %s;
  69. """
  70. return await self.pool.async_save(
  71. query=query, params=(new_status, dataset_id, ori_status)
  72. )
  73. async def update_doc_status(self, doc_id, ori_status, new_status):
  74. """
  75. this function is to change the using status of each document
  76. :param doc_id:
  77. :param ori_status:
  78. :param new_status:
  79. :return:
  80. """
  81. query = """
  82. UPDATE contents
  83. SET doc_status = %s WHERE doc_id = %s and doc_status = %s;
  84. """
  85. return await self.pool.async_save(
  86. query=query, params=(new_status, doc_id, ori_status)
  87. )
  88. async def select_count(self, dataset_id, doc_status=1):
  89. query = """
  90. select count(*) as count from contents where dataset_id = %s and doc_status = %s;
  91. """
  92. rows = await self.pool.async_fetch(query=query, params=(dataset_id, doc_status))
  93. return rows[0]["count"] if rows else 0
  94. async def select_content_by_doc_id(self, doc_id):
  95. query = """
  96. select * from contents where doc_id = %s;
  97. """
  98. return await self.pool.async_fetch(
  99. query=query,
  100. params=(doc_id,)
  101. )
  102. async def select_contents(
  103. self,
  104. page_num: int,
  105. page_size: int,
  106. order_by: dict = {"id": "desc"},
  107. dataset_id: int = None,
  108. doc_status: int = 1,
  109. ):
  110. """
  111. 分页查询 contents 表,并返回分页信息
  112. :param page_num: 页码,从 1 开始
  113. :param page_size: 每页数量
  114. :param order_by: 排序条件,例如 {"id": "desc"} 或 {"created_at": "asc"}
  115. :param dataset_id: 数据集 ID
  116. :param doc_status: 文档状态(默认 1)
  117. :return: dict,包含 entities、total_count、page、page_size、total_pages
  118. """
  119. offset = (page_num - 1) * page_size
  120. # 动态拼接 where 条件
  121. where_clauses = ["doc_status = %s"]
  122. params = [doc_status]
  123. if dataset_id:
  124. where_clauses.append("dataset_id = %s")
  125. params.append(dataset_id)
  126. where_sql = " AND ".join(where_clauses)
  127. # 动态拼接 order by
  128. order_field, order_direction = list(order_by.items())[0]
  129. order_sql = f"ORDER BY {order_field} {order_direction.upper()}"
  130. # 查询总数
  131. count_query = f"SELECT COUNT(*) as total_count FROM contents WHERE {where_sql};"
  132. count_result = await self.pool.async_fetch(query=count_query, params=tuple(params))
  133. total_count = count_result[0]["total_count"] if count_result else 0
  134. # 查询分页数据
  135. query = f"""
  136. SELECT * FROM contents
  137. WHERE {where_sql}
  138. {order_sql}
  139. LIMIT %s OFFSET %s;
  140. """
  141. params.extend([page_size, offset])
  142. entities = await self.pool.async_fetch(query=query, params=tuple(params))
  143. total_pages = (total_count + page_size - 1) // page_size # 向上取整
  144. return {
  145. "entities": entities,
  146. "total_count": total_count,
  147. "page": page_num,
  148. "page_size": page_size,
  149. "total_pages": total_pages
  150. }
  151. class ContentChunks(BaseMySQLClient):
  152. async def insert_chunk(self, chunk: Chunk) -> int:
  153. query = """
  154. INSERT IGNORE INTO content_chunks
  155. (chunk_id, doc_id, text, tokens, topic_purity, text_type, dataset_id)
  156. VALUES (%s, %s, %s, %s, %s, %s, %s);
  157. """
  158. return await self.pool.async_save(
  159. query=query,
  160. params=(
  161. chunk.chunk_id,
  162. chunk.doc_id,
  163. chunk.text,
  164. chunk.tokens,
  165. chunk.topic_purity,
  166. chunk.text_type,
  167. chunk.dataset_id,
  168. ),
  169. )
  170. async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  171. query = """
  172. UPDATE content_chunks
  173. SET chunk_status = %s
  174. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  175. """
  176. return await self.pool.async_save(
  177. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  178. )
  179. async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
  180. query = """
  181. UPDATE content_chunks
  182. SET embedding_status = %s
  183. WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
  184. """
  185. return await self.pool.async_save(
  186. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  187. )
  188. async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
  189. query = """
  190. UPDATE content_chunks
  191. SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s,
  192. keywords = %s, questions = %s, chunk_status = %s, entities = %s
  193. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  194. """
  195. return await self.pool.async_save(
  196. query=query,
  197. params=(
  198. chunk.summary,
  199. chunk.topic,
  200. chunk.domain,
  201. chunk.task_type,
  202. json.dumps(chunk.concepts),
  203. json.dumps(chunk.keywords),
  204. json.dumps(chunk.questions),
  205. new_status,
  206. json.dumps(chunk.entities),
  207. chunk.doc_id,
  208. chunk.chunk_id,
  209. ori_status,
  210. ),
  211. )
  212. async def update_es_status(self, doc_id, chunk_id, ori_status, new_status):
  213. query = """
  214. UPDATE content_chunks SET es_status = %s
  215. WHERE doc_id = %s AND chunk_id = %s AND es_status = %s;
  216. """
  217. return await self.pool.async_save(
  218. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  219. )
  220. async def update_doc_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  221. query = """
  222. UPDATE content_chunks set status = %s
  223. WHERE doc_id = %s AND chunk_id = %s AND status = %s;
  224. """
  225. return await self.pool.async_save(
  226. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  227. )
  228. async def update_doc_status(self, doc_id, ori_status, new_status):
  229. query = """
  230. UPDATE content_chunks set status = %s
  231. WHERE doc_id = %s AND status = %s;
  232. """
  233. return await self.pool.async_save(
  234. query=query, params=(new_status, doc_id, ori_status)
  235. )
  236. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  237. query = """
  238. UPDATE content_chunks set status = %s
  239. WHERE dataset_id = %s AND status = %s;
  240. """
  241. return await self.pool.async_save(
  242. query=query, params=(new_status, dataset_id, ori_status)
  243. )
  244. async def select_chunk_content(self, doc_id, chunk_id, status=1):
  245. query = """
  246. select * from content_chunks where doc_id = %s and chunk_id = %s and status = %s;
  247. """
  248. return await self.pool.async_fetch(
  249. query=query, params=(doc_id, chunk_id, status)
  250. )