mapper.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. import json
  2. from applications.config import Chunk
  3. class TaskConst:
  4. INIT_STATUS = 0
  5. PROCESSING_STATUS = 1
  6. FINISHED_STATUS = 2
  7. FAILED_STATUS = 3
  8. CHUNK_USEFUL_STATUS = 1
  9. class BaseMySQLClient(TaskConst):
  10. def __init__(self, pool):
  11. self.pool = pool
  12. class Dataset(BaseMySQLClient):
  13. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  14. query = """
  15. UPDATE dataset SET status = %s WHERE id = %s AND status = %s;
  16. """
  17. return await self.pool.async_save(
  18. query=query, params=(new_status, dataset_id, ori_status)
  19. )
  20. async def select_dataset(self, status=1):
  21. query = """
  22. SELECT * FROM dataset WHERE status = %s;
  23. """
  24. return await self.pool.async_fetch(query=query, params=(status,))
  25. async def add_dataset(self, name):
  26. query = """
  27. INSERT INTO dataset (name) VALUES (%s);
  28. """
  29. return await self.pool.async_save(query=query, params=(name,))
  30. async def select_dataset_by_id(self, id_, status: int = 1):
  31. query = """
  32. SELECT * FROM dataset WHERE id = %s AND status = %s;
  33. """
  34. return await self.pool.async_fetch(query=query, params=(id_, status))
  35. class Contents(BaseMySQLClient):
  36. async def insert_content(self, doc_id, text, text_type, title, dataset_id):
  37. query = """
  38. INSERT IGNORE INTO contents
  39. (doc_id, text, text_type, title, dataset_id)
  40. VALUES (%s, %s, %s, %s, %s);
  41. """
  42. return await self.pool.async_save(
  43. query=query, params=(doc_id, text, text_type, title, dataset_id)
  44. )
  45. async def update_content_info(self, doc_id, text, text_type, title, dataset_id):
  46. query = """
  47. UPDATE contents
  48. SET text = %s, text_type = %s, title = %s, dataset_id = %s, status = %s
  49. WHERE doc_id = %s;
  50. """
  51. return await self.pool.async_save(
  52. query=query,
  53. params=(text, text_type, title, dataset_id, self.INIT_STATUS, doc_id),
  54. )
  55. async def update_content_status(self, doc_id, ori_status, new_status):
  56. query = """
  57. UPDATE contents
  58. SET status = %s
  59. WHERE doc_id = %s AND status = %s;
  60. """
  61. return await self.pool.async_save(
  62. query=query, params=(new_status, doc_id, ori_status)
  63. )
  64. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  65. query = """
  66. UPDATE contents
  67. SET status = %s
  68. WHERE dataset_id = %s AND status = %s;
  69. """
  70. return await self.pool.async_save(
  71. query=query, params=(new_status, dataset_id, ori_status)
  72. )
  73. async def update_doc_status(self, doc_id, ori_status, new_status):
  74. """
  75. this function is to change the using status of each document
  76. :param doc_id:
  77. :param ori_status:
  78. :param new_status:
  79. :return:
  80. """
  81. query = """
  82. UPDATE contents SET doc_status = %s WHERE doc_id = %s AND doc_status = %s;
  83. """
  84. return await self.pool.async_save(
  85. query=query, params=(new_status, doc_id, ori_status)
  86. )
  87. async def select_count(self, dataset_id, doc_status=1):
  88. query = """
  89. SELECT count(*) AS count FROM contents WHERE dataset_id = %s AND doc_status = %s;
  90. """
  91. rows = await self.pool.async_fetch(query=query, params=(dataset_id, doc_status))
  92. return rows[0]["count"] if rows else 0
  93. async def select_content_by_doc_id(self, doc_id):
  94. query = """SELECT * FROM contents WHERE doc_id = %s;"""
  95. return await self.pool.async_fetch(query=query, params=(doc_id,))
  96. async def select_contents(
  97. self,
  98. page_num: int,
  99. page_size: int,
  100. order_by=None,
  101. dataset_id: int = None,
  102. doc_status: int = 1,
  103. ):
  104. """
  105. 分页查询 contents 表,并返回分页信息
  106. :param page_num: 页码,从 1 开始
  107. :param page_size: 每页数量
  108. :param order_by: 排序条件,例如 {"id": "desc"} 或 {"created_at": "asc"}
  109. :param dataset_id: 数据集 ID
  110. :param doc_status: 文档状态(默认 1)
  111. :return: dict,包含 entities、total_count、page、page_size、total_pages
  112. """
  113. if order_by is None:
  114. order_by = {"id": "desc"}
  115. offset = (page_num - 1) * page_size
  116. # 动态拼接 where 条件
  117. where_clauses = ["doc_status = %s"]
  118. params = [doc_status]
  119. if dataset_id:
  120. where_clauses.append("dataset_id = %s")
  121. params.append(dataset_id)
  122. where_sql = " AND ".join(where_clauses)
  123. # 动态拼接 order by
  124. order_field, order_direction = list(order_by.items())[0]
  125. order_sql = f"ORDER BY {order_field} {order_direction.upper()}"
  126. # 查询总数
  127. count_query = f"SELECT COUNT(*) as total_count FROM contents WHERE {where_sql};"
  128. count_result = await self.pool.async_fetch(
  129. query=count_query, params=tuple(params)
  130. )
  131. total_count = count_result[0]["total_count"] if count_result else 0
  132. # 查询分页数据
  133. query = f"""
  134. SELECT * FROM contents
  135. WHERE {where_sql}
  136. {order_sql}
  137. LIMIT %s OFFSET %s;
  138. """
  139. params.extend([page_size, offset])
  140. entities = await self.pool.async_fetch(query=query, params=tuple(params))
  141. total_pages = (total_count + page_size - 1) // page_size # 向上取整
  142. return {
  143. "entities": entities,
  144. "total_count": total_count,
  145. "page": page_num,
  146. "page_size": page_size,
  147. "total_pages": total_pages,
  148. }
  149. class ContentChunks(BaseMySQLClient):
  150. async def insert_chunk(self, chunk: Chunk) -> int:
  151. query = """
  152. INSERT IGNORE INTO content_chunks
  153. (chunk_id, doc_id, text, tokens, topic_purity, text_type, dataset_id, status)
  154. VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
  155. """
  156. return await self.pool.async_save(
  157. query=query,
  158. params=(
  159. chunk.chunk_id,
  160. chunk.doc_id,
  161. chunk.text,
  162. chunk.tokens,
  163. chunk.topic_purity,
  164. chunk.text_type,
  165. chunk.dataset_id,
  166. chunk.status,
  167. ),
  168. )
  169. async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  170. query = """
  171. UPDATE content_chunks
  172. SET chunk_status = %s
  173. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s and status = %s;
  174. """
  175. return await self.pool.async_save(
  176. query=query,
  177. params=(new_status, doc_id, chunk_id, ori_status, self.CHUNK_USEFUL_STATUS),
  178. )
  179. async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
  180. query = """
  181. UPDATE content_chunks
  182. SET embedding_status = %s
  183. WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
  184. """
  185. return await self.pool.async_save(
  186. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  187. )
  188. async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
  189. query = """
  190. UPDATE content_chunks
  191. SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s,
  192. keywords = %s, questions = %s, chunk_status = %s, entities = %s
  193. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  194. """
  195. return await self.pool.async_save(
  196. query=query,
  197. params=(
  198. chunk.summary,
  199. chunk.topic,
  200. chunk.domain,
  201. chunk.task_type,
  202. json.dumps(chunk.concepts),
  203. json.dumps(chunk.keywords),
  204. json.dumps(chunk.questions),
  205. new_status,
  206. json.dumps(chunk.entities),
  207. chunk.doc_id,
  208. chunk.chunk_id,
  209. ori_status,
  210. ),
  211. )
  212. async def update_es_status(self, doc_id, chunk_id, ori_status, new_status):
  213. query = """
  214. UPDATE content_chunks SET es_status = %s
  215. WHERE doc_id = %s AND chunk_id = %s AND es_status = %s;
  216. """
  217. return await self.pool.async_save(
  218. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  219. )
  220. async def update_doc_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  221. query = """
  222. UPDATE content_chunks set status = %s
  223. WHERE doc_id = %s AND chunk_id = %s AND status = %s;
  224. """
  225. return await self.pool.async_save(
  226. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  227. )
  228. async def update_doc_status(self, doc_id, ori_status, new_status):
  229. query = """
  230. UPDATE content_chunks set status = %s
  231. WHERE doc_id = %s AND status = %s;
  232. """
  233. return await self.pool.async_save(
  234. query=query, params=(new_status, doc_id, ori_status)
  235. )
  236. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  237. query = """
  238. UPDATE content_chunks set status = %s
  239. WHERE dataset_id = %s AND status = %s;
  240. """
  241. return await self.pool.async_save(
  242. query=query, params=(new_status, dataset_id, ori_status)
  243. )
  244. async def select_chunk_content(self, doc_id, chunk_id):
  245. query = """
  246. SELECT * FROM content_chunks WHERE doc_id = %s AND chunk_id = %s;
  247. """
  248. return await self.pool.async_fetch(query=query, params=(doc_id, chunk_id))
  249. async def select_chunk_contents(
  250. self,
  251. page_num: int,
  252. page_size: int,
  253. order_by: dict = {"chunk_id": "asc"},
  254. doc_id: str = None,
  255. doc_status: int = None,
  256. ):
  257. offset = (page_num - 1) * page_size
  258. # 动态拼接 where 条件
  259. where_clauses = []
  260. params = []
  261. if doc_id:
  262. where_clauses.append("doc_id = %s")
  263. params.append(doc_id)
  264. if doc_status:
  265. where_clauses.append("doc_status = %s")
  266. params.append(doc_status)
  267. where_sql = " AND ".join(where_clauses)
  268. # 动态拼接 order by
  269. order_field, order_direction = list(order_by.items())[0]
  270. order_sql = f"ORDER BY {order_field} {order_direction.upper()}"
  271. # 查询总数
  272. count_query = (
  273. f"SELECT COUNT(*) as total_count FROM content_chunks WHERE {where_sql};"
  274. )
  275. count_result = await self.pool.async_fetch(
  276. query=count_query, params=tuple(params)
  277. )
  278. total_count = count_result[0]["total_count"] if count_result else 0
  279. # 查询分页数据
  280. query = f"""
  281. SELECT * FROM content_chunks
  282. WHERE {where_sql}
  283. {order_sql}
  284. LIMIT %s OFFSET %s;
  285. """
  286. params.extend([page_size, offset])
  287. entities = await self.pool.async_fetch(query=query, params=tuple(params))
  288. total_pages = (total_count + page_size - 1) // page_size # 向上取整
  289. print(total_pages)
  290. return {
  291. "entities": entities,
  292. "total_count": total_count,
  293. "page": page_num,
  294. "page_size": page_size,
  295. "total_pages": total_pages,
  296. }
  297. class ChatResult(BaseMySQLClient):
  298. async def insert_chat_result(
  299. self, query_text, dataset_ids, search_res, chat_res, score
  300. ):
  301. query = """
  302. INSERT INTO chat_res
  303. (query, dataset_ids, search_res, chat_res, score)
  304. VALUES (%s, %s, %s, %s, %s);
  305. """
  306. return await self.pool.async_save(
  307. query=query, params=(query_text, dataset_ids, search_res, chat_res, score)
  308. )