async_task.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. import asyncio
  2. import json
  3. import os
  4. import uuid
  5. from applications.api import get_basic_embedding
  6. from applications.api.qwen import QwenClient
  7. from applications.async_task import ChunkBooksTask
  8. from applications.config import BASE_MILVUS_SEARCH_PARAMS, DEFAULT_MODEL
  9. from applications.resource import get_resource_manager
  10. from applications.search import HybridSearch
  11. from applications.utils.mysql import Books, ChatResult, ContentChunks
  12. from applications.utils.oss.oss_client import OSSClient
  13. from applications.utils.pdf.book_extract import book_extract
  14. from applications.utils.spider.study import study
  15. async def handle_books():
  16. try:
  17. # 获取资源管理器和客户端
  18. resource = get_resource_manager()
  19. books_mapper = Books(resource.mysql_client)
  20. oss_client = OSSClient()
  21. # 获取待处理的书籍列表
  22. books = await books_mapper.select_init_books()
  23. for book in books:
  24. book_id = book.get("book_id")
  25. doc_id = book.get("doc_id")
  26. # 获取提取状态
  27. extract_status = (await books_mapper.select_book_extract_status(book_id))[
  28. 0
  29. ].get("extract_status")
  30. if extract_status == 0:
  31. # 更新提取状态为处理中
  32. await books_mapper.update_book_extract_status(book_id, 1)
  33. book_path = os.path.join("/tmp", book_id + ".pdf")
  34. if not os.path.exists(book_path):
  35. oss_path = f"rag/pdfs/{book_id}.pdf"
  36. try:
  37. # 下载书籍文件
  38. await oss_client.download_file(oss_path, book_path)
  39. except Exception as e:
  40. continue # 如果下载失败,跳过该书籍
  41. try:
  42. # 提取书籍内容
  43. res = await book_extract(book_path, book_id)
  44. if res:
  45. content_list = (
  46. res.get("results", {})
  47. .get(book_id, {})
  48. .get("content_list", [])
  49. )
  50. if content_list:
  51. # 更新提取结果
  52. await books_mapper.update_book_extract_result(
  53. book_id,
  54. json.dumps(
  55. json.loads(content_list), ensure_ascii=False
  56. ),
  57. )
  58. except Exception as e:
  59. await books_mapper.update_book_extract_status(book_id, 99)
  60. continue # 如果提取过程失败,跳过该书籍
  61. # 创建文档 ID
  62. chunk_task = ChunkBooksTask(doc_id=doc_id, resource=resource)
  63. # 处理分片任务
  64. body = {"book_id": book_id}
  65. await chunk_task.deal(body) # 异步执行分片任务
  66. except Exception as e:
  67. # 捕获整体异常
  68. print(f"处理请求失败,错误: {e}")
  69. async def process_question(question, resource, qwen_client, rag_chat_agent):
  70. try:
  71. dataset_id_strs = "11,12"
  72. dataset_ids = dataset_id_strs.split(",")
  73. search_type = "hybrid"
  74. # 执行查询任务
  75. query_results = await query_search(
  76. query_text=question,
  77. filters={"dataset_id": dataset_ids},
  78. search_type=search_type,
  79. )
  80. chat_result_mapper = ChatResult(resource.mysql_client)
  81. chat_task = rag_chat_agent.get_chat_res(question, query_results)
  82. llm_task = qwen_client.search_and_chat(
  83. user_prompt=question, search_strategy="agent"
  84. )
  85. chat_result, llm_search = await asyncio.gather(chat_task, llm_task)
  86. study_task_id = None
  87. if chat_result["status"] == 0:
  88. study_task_id = study(question)["task_id"]
  89. decision = await rag_chat_agent.make_decision(question, chat_result, llm_search)
  90. # 构建返回的数据
  91. data = {
  92. "query": question,
  93. "result": decision["result"],
  94. "status": decision["status"],
  95. "relevance_score": decision["relevance_score"],
  96. }
  97. # 插入数据库
  98. await chat_result_mapper.insert_chat_result(
  99. question,
  100. dataset_id_strs,
  101. json.dumps(query_results, ensure_ascii=False),
  102. chat_result["summary"],
  103. decision["relevance_score"],
  104. chat_result["status"],
  105. llm_search["content"],
  106. json.dumps(llm_search["search_results"], ensure_ascii=False),
  107. 1,
  108. decision["result"],
  109. study_task_id,
  110. )
  111. return data
  112. except Exception as e:
  113. print(f"Error processing question: {question}. Error: {str(e)}")
  114. return {"query": question, "error": str(e)}
  115. async def query_search(
  116. query_text,
  117. filters=None,
  118. search_type="",
  119. anns_field="vector_text",
  120. search_params=BASE_MILVUS_SEARCH_PARAMS,
  121. _source=False,
  122. es_size=10000,
  123. sort_by=None,
  124. milvus_size=20,
  125. limit=10,
  126. ):
  127. if filters is None:
  128. filters = {}
  129. query_vector = await get_basic_embedding(text=query_text, model=DEFAULT_MODEL)
  130. resource = get_resource_manager()
  131. search_engine = HybridSearch(
  132. milvus_pool=resource.milvus_client,
  133. es_pool=resource.es_client,
  134. graph_pool=resource.graph_client,
  135. )
  136. try:
  137. match search_type:
  138. case "base":
  139. response = await search_engine.base_vector_search(
  140. query_vec=query_vector,
  141. anns_field=anns_field,
  142. search_params=search_params,
  143. limit=limit,
  144. )
  145. return response
  146. case "hybrid":
  147. response = await search_engine.hybrid_search(
  148. filters=filters,
  149. query_vec=query_vector,
  150. anns_field=anns_field,
  151. search_params=search_params,
  152. es_size=es_size,
  153. sort_by=sort_by,
  154. milvus_size=milvus_size,
  155. )
  156. case "strategy":
  157. return None
  158. case _:
  159. return None
  160. except Exception as e:
  161. return None
  162. if response is None:
  163. return None
  164. resource = get_resource_manager()
  165. content_chunk_mapper = ContentChunks(resource.mysql_client)
  166. res = []
  167. for result in response["results"]:
  168. content_chunks = await content_chunk_mapper.select_chunk_content(
  169. doc_id=result["doc_id"], chunk_id=result["chunk_id"]
  170. )
  171. if content_chunks:
  172. content_chunk = content_chunks[0]
  173. res.append(
  174. {
  175. "id": content_chunk["id"],
  176. "docId": content_chunk["doc_id"],
  177. "content": content_chunk["text"],
  178. "contentSummary": content_chunk["summary"],
  179. "score": result["score"],
  180. "datasetId": content_chunk["dataset_id"],
  181. }
  182. )
  183. return res[:limit]