mapper.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. import json
  2. from applications.config import Chunk
  3. class TaskConst:
  4. INIT_STATUS = 0
  5. PROCESSING_STATUS = 1
  6. FINISHED_STATUS = 2
  7. FAILED_STATUS = 3
  8. CHUNK_USEFUL_STATUS = 1
  9. class BaseMySQLClient(TaskConst):
  10. def __init__(self, pool):
  11. self.pool = pool
  12. class Dataset(BaseMySQLClient):
  13. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  14. query = """
  15. UPDATE dataset SET status = %s WHERE id = %s AND status = %s;
  16. """
  17. return await self.pool.async_save(
  18. query=query, params=(new_status, dataset_id, ori_status)
  19. )
  20. async def select_dataset(self, status=1):
  21. query = """
  22. SELECT * FROM dataset WHERE status = %s;
  23. """
  24. return await self.pool.async_fetch(query=query, params=(status,))
  25. async def add_dataset(self, name):
  26. query = """
  27. INSERT INTO dataset (name) VALUES (%s);
  28. """
  29. return await self.pool.async_save(query=query, params=(name,))
  30. async def select_dataset_by_id(self, id_, status: int = 1):
  31. query = """
  32. SELECT * FROM dataset WHERE id = %s AND status = %s;
  33. """
  34. return await self.pool.async_fetch(query=query, params=(id_, status))
  35. async def select_dataset_by_name(self, name, status: int = 1):
  36. query = """
  37. SELECT * FROM dataset WHERE name = %s AND status = %s;
  38. """
  39. return await self.pool.async_fetch(query=query, params=(name, status))
  40. class Contents(BaseMySQLClient):
  41. async def insert_content(self, doc_id, text, text_type, title, dataset_id):
  42. query = """
  43. INSERT IGNORE INTO contents
  44. (doc_id, text, text_type, title, dataset_id)
  45. VALUES (%s, %s, %s, %s, %s);
  46. """
  47. return await self.pool.async_save(
  48. query=query, params=(doc_id, text, text_type, title, dataset_id)
  49. )
  50. async def update_content_info(self, doc_id, text, text_type, title, dataset_id):
  51. query = """
  52. UPDATE contents
  53. SET text = %s, text_type = %s, title = %s, dataset_id = %s, status = %s
  54. WHERE doc_id = %s;
  55. """
  56. return await self.pool.async_save(
  57. query=query,
  58. params=(text, text_type, title, dataset_id, self.INIT_STATUS, doc_id),
  59. )
  60. async def update_content_status(self, doc_id, ori_status, new_status):
  61. query = """
  62. UPDATE contents
  63. SET status = %s
  64. WHERE doc_id = %s AND status = %s;
  65. """
  66. return await self.pool.async_save(
  67. query=query, params=(new_status, doc_id, ori_status)
  68. )
  69. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  70. query = """
  71. UPDATE contents
  72. SET status = %s
  73. WHERE dataset_id = %s AND status = %s;
  74. """
  75. return await self.pool.async_save(
  76. query=query, params=(new_status, dataset_id, ori_status)
  77. )
  78. async def update_doc_status(self, doc_id, ori_status, new_status):
  79. """
  80. this function is to change the using status of each document
  81. :param doc_id:
  82. :param ori_status:
  83. :param new_status:
  84. :return:
  85. """
  86. query = """
  87. UPDATE contents SET doc_status = %s WHERE doc_id = %s AND doc_status = %s;
  88. """
  89. return await self.pool.async_save(
  90. query=query, params=(new_status, doc_id, ori_status)
  91. )
  92. async def select_count(self, dataset_id, doc_status=1):
  93. query = """
  94. SELECT count(*) AS count FROM contents WHERE dataset_id = %s AND doc_status = %s;
  95. """
  96. rows = await self.pool.async_fetch(query=query, params=(dataset_id, doc_status))
  97. return rows[0]["count"] if rows else 0
  98. async def select_content_by_doc_id(self, doc_id):
  99. query = """SELECT * FROM contents WHERE doc_id = %s;"""
  100. return await self.pool.async_fetch(query=query, params=(doc_id,))
  101. async def select_contents(
  102. self,
  103. page_num: int,
  104. page_size: int,
  105. order_by=None,
  106. dataset_id: int = None,
  107. doc_status: int = 1,
  108. ):
  109. """
  110. 分页查询 contents 表,并返回分页信息
  111. :param page_num: 页码,从 1 开始
  112. :param page_size: 每页数量
  113. :param order_by: 排序条件,例如 {"id": "desc"} 或 {"created_at": "asc"}
  114. :param dataset_id: 数据集 ID
  115. :param doc_status: 文档状态(默认 1)
  116. :return: dict,包含 entities、total_count、page、page_size、total_pages
  117. """
  118. if order_by is None:
  119. order_by = {"id": "desc"}
  120. offset = (page_num - 1) * page_size
  121. # 动态拼接 where 条件
  122. where_clauses = ["doc_status = %s"]
  123. params = [doc_status]
  124. if dataset_id:
  125. where_clauses.append("dataset_id = %s")
  126. params.append(dataset_id)
  127. where_sql = " AND ".join(where_clauses)
  128. # 动态拼接 order by
  129. order_field, order_direction = list(order_by.items())[0]
  130. order_sql = f"ORDER BY {order_field} {order_direction.upper()}"
  131. # 查询总数
  132. count_query = f"SELECT COUNT(*) as total_count FROM contents WHERE {where_sql};"
  133. count_result = await self.pool.async_fetch(
  134. query=count_query, params=tuple(params)
  135. )
  136. total_count = count_result[0]["total_count"] if count_result else 0
  137. # 查询分页数据
  138. query = f"""
  139. SELECT * FROM contents
  140. WHERE {where_sql}
  141. {order_sql}
  142. LIMIT %s OFFSET %s;
  143. """
  144. params.extend([page_size, offset])
  145. entities = await self.pool.async_fetch(query=query, params=tuple(params))
  146. total_pages = (total_count + page_size - 1) // page_size # 向上取整
  147. return {
  148. "entities": entities,
  149. "total_count": total_count,
  150. "page": page_num,
  151. "page_size": page_size,
  152. "total_pages": total_pages,
  153. }
  154. class ContentChunks(BaseMySQLClient):
  155. async def insert_chunk(self, chunk: Chunk) -> int:
  156. query = """
  157. INSERT IGNORE INTO content_chunks
  158. (chunk_id, doc_id, text, tokens, topic_purity, text_type, dataset_id, status)
  159. VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
  160. """
  161. return await self.pool.async_save(
  162. query=query,
  163. params=(
  164. chunk.chunk_id,
  165. chunk.doc_id,
  166. chunk.text,
  167. chunk.tokens,
  168. chunk.topic_purity,
  169. chunk.text_type,
  170. chunk.dataset_id,
  171. chunk.status,
  172. ),
  173. )
  174. async def update_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  175. query = """
  176. UPDATE content_chunks
  177. SET chunk_status = %s
  178. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s and status = %s;
  179. """
  180. return await self.pool.async_save(
  181. query=query,
  182. params=(new_status, doc_id, chunk_id, ori_status, self.CHUNK_USEFUL_STATUS),
  183. )
  184. async def update_embedding_status(self, doc_id, chunk_id, ori_status, new_status):
  185. query = """
  186. UPDATE content_chunks
  187. SET embedding_status = %s
  188. WHERE doc_id = %s AND chunk_id = %s AND embedding_status = %s;
  189. """
  190. return await self.pool.async_save(
  191. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  192. )
  193. async def set_chunk_result(self, chunk: Chunk, ori_status, new_status):
  194. query = """
  195. UPDATE content_chunks
  196. SET summary = %s, topic = %s, domain = %s, task_type = %s, concepts = %s,
  197. keywords = %s, questions = %s, chunk_status = %s, entities = %s
  198. WHERE doc_id = %s AND chunk_id = %s AND chunk_status = %s;
  199. """
  200. return await self.pool.async_save(
  201. query=query,
  202. params=(
  203. chunk.summary,
  204. chunk.topic,
  205. chunk.domain,
  206. chunk.task_type,
  207. json.dumps(chunk.concepts),
  208. json.dumps(chunk.keywords),
  209. json.dumps(chunk.questions),
  210. new_status,
  211. json.dumps(chunk.entities),
  212. chunk.doc_id,
  213. chunk.chunk_id,
  214. ori_status,
  215. ),
  216. )
  217. async def update_es_status(self, doc_id, chunk_id, ori_status, new_status):
  218. query = """
  219. UPDATE content_chunks SET es_status = %s
  220. WHERE doc_id = %s AND chunk_id = %s AND es_status = %s;
  221. """
  222. return await self.pool.async_save(
  223. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  224. )
  225. async def update_doc_chunk_status(self, doc_id, chunk_id, ori_status, new_status):
  226. query = """
  227. UPDATE content_chunks set status = %s
  228. WHERE doc_id = %s AND chunk_id = %s AND status = %s;
  229. """
  230. return await self.pool.async_save(
  231. query=query, params=(new_status, doc_id, chunk_id, ori_status)
  232. )
  233. async def update_doc_status(self, doc_id, ori_status, new_status):
  234. query = """
  235. UPDATE content_chunks set status = %s
  236. WHERE doc_id = %s AND status = %s;
  237. """
  238. return await self.pool.async_save(
  239. query=query, params=(new_status, doc_id, ori_status)
  240. )
  241. async def update_dataset_status(self, dataset_id, ori_status, new_status):
  242. query = """
  243. UPDATE content_chunks set status = %s
  244. WHERE dataset_id = %s AND status = %s;
  245. """
  246. return await self.pool.async_save(
  247. query=query, params=(new_status, dataset_id, ori_status)
  248. )
  249. async def select_chunk_content(self, doc_id, chunk_id):
  250. query = """
  251. SELECT * FROM content_chunks WHERE doc_id = %s AND chunk_id = %s;
  252. """
  253. return await self.pool.async_fetch(query=query, params=(doc_id, chunk_id))
  254. async def select_chunk_contents(
  255. self,
  256. page_num: int,
  257. page_size: int,
  258. order_by: dict = {"chunk_id": "asc"},
  259. doc_id: str = None,
  260. doc_status: int = None,
  261. ):
  262. offset = (page_num - 1) * page_size
  263. # 动态拼接 where 条件
  264. where_clauses = []
  265. params = []
  266. if doc_id:
  267. where_clauses.append("doc_id = %s")
  268. params.append(doc_id)
  269. if doc_status:
  270. where_clauses.append("doc_status = %s")
  271. params.append(doc_status)
  272. where_sql = " AND ".join(where_clauses)
  273. # 动态拼接 order by
  274. order_field, order_direction = list(order_by.items())[0]
  275. order_sql = f"ORDER BY {order_field} {order_direction.upper()}"
  276. # 查询总数
  277. count_query = (
  278. f"SELECT COUNT(*) as total_count FROM content_chunks WHERE {where_sql};"
  279. )
  280. count_result = await self.pool.async_fetch(
  281. query=count_query, params=tuple(params)
  282. )
  283. total_count = count_result[0]["total_count"] if count_result else 0
  284. # 查询分页数据
  285. query = f"""
  286. SELECT * FROM content_chunks
  287. WHERE {where_sql}
  288. {order_sql}
  289. LIMIT %s OFFSET %s;
  290. """
  291. params.extend([page_size, offset])
  292. entities = await self.pool.async_fetch(query=query, params=tuple(params))
  293. total_pages = (total_count + page_size - 1) // page_size # 向上取整
  294. print(total_pages)
  295. return {
  296. "entities": entities,
  297. "total_count": total_count,
  298. "page": page_num,
  299. "page_size": page_size,
  300. "total_pages": total_pages,
  301. }
  302. class ChatResult(BaseMySQLClient):
  303. async def insert_chat_result(
  304. self, query_text, dataset_ids, search_res, chat_res, score, has_answer
  305. ):
  306. query = """
  307. INSERT INTO chat_res
  308. (query, dataset_ids, search_res, chat_res, score, has_answer)
  309. VALUES (%s, %s, %s, %s, %s, %s);
  310. """
  311. return await self.pool.async_save(
  312. query=query,
  313. params=(query_text, dataset_ids, search_res, chat_res, score, has_answer),
  314. )