async_task.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. import json
  2. import os
  3. import uuid
  4. from applications.api import get_basic_embedding
  5. from applications.api.qwen import QwenClient
  6. from applications.async_task import ChunkBooksTask
  7. from applications.config import BASE_MILVUS_SEARCH_PARAMS, DEFAULT_MODEL
  8. from applications.resource import get_resource_manager
  9. from applications.search import HybridSearch
  10. from applications.utils.mysql import Books, ChatResult, ContentChunks
  11. from applications.utils.oss.oss_client import OSSClient
  12. from applications.utils.pdf.book_extract import book_extract
  13. from applications.utils.spider.study import study
  14. async def handle_books():
  15. try:
  16. # 获取资源管理器和客户端
  17. resource = get_resource_manager()
  18. books_mapper = Books(resource.mysql_client)
  19. oss_client = OSSClient()
  20. # 获取待处理的书籍列表
  21. books = await books_mapper.select_init_books()
  22. for book in books:
  23. book_id = book.get("book_id")
  24. # 获取提取状态
  25. extract_status = (await books_mapper.select_book_extract_status(book_id))[
  26. 0
  27. ].get("extract_status")
  28. if extract_status == 0:
  29. # 更新提取状态为处理中
  30. await books_mapper.update_book_extract_status(book_id, 1)
  31. book_path = os.path.join("/tmp", book_id)
  32. if not os.path.exists(book_path):
  33. oss_path = f"rag/pdfs/{book_id}"
  34. try:
  35. # 下载书籍文件
  36. await oss_client.download_file(oss_path, book_path)
  37. except Exception as e:
  38. continue # 如果下载失败,跳过该书籍
  39. try:
  40. # 提取书籍内容
  41. res = await book_extract(book_path, book_id)
  42. if res:
  43. content_list = (
  44. res.get("results", {})
  45. .get(book_id, {})
  46. .get("content_list", [])
  47. )
  48. if content_list:
  49. # 更新提取结果
  50. await books_mapper.update_book_extract_result(
  51. book_id, content_list
  52. )
  53. # 创建文档 ID
  54. doc_id = f"doc-{uuid.uuid4()}"
  55. chunk_task = ChunkBooksTask(doc_id=doc_id, resource=resource)
  56. # 处理分片任务
  57. body = {"book_id": book_id}
  58. await chunk_task.deal(body) # 异步执行分片任务
  59. except Exception as e:
  60. continue # 如果提取过程失败,跳过该书籍
  61. except Exception as e:
  62. # 捕获整体异常
  63. print(f"处理请求失败,错误: {e}")
  64. async def process_question(question, query_text, rag_chat_agent):
  65. try:
  66. dataset_id_strs = "11,12"
  67. dataset_ids = dataset_id_strs.split(",")
  68. search_type = "hybrid"
  69. # 执行查询任务
  70. query_results = await query_search(
  71. query_text=question,
  72. filters={"dataset_id": dataset_ids},
  73. search_type=search_type,
  74. )
  75. resource = get_resource_manager()
  76. chat_result_mapper = ChatResult(resource.mysql_client)
  77. # 异步执行 chat 与 deepseek 的对话
  78. chat_result = await rag_chat_agent.chat_with_deepseek(question, query_results)
  79. # # 判断是否需要执行 study
  80. study_task_id = None
  81. if chat_result["status"] == 0:
  82. study_task_id = study(question)["task_id"]
  83. qwen_client = QwenClient()
  84. llm_search = qwen_client.search_and_chat(user_prompt=question)
  85. decision = await rag_chat_agent.make_decision(question, chat_result, llm_search)
  86. # 构建返回的数据
  87. data = {
  88. "query": question,
  89. "result": decision["result"],
  90. "status": decision["status"],
  91. "relevance_score": decision["relevance_score"],
  92. # "used_tools": decision["used_tools"],
  93. }
  94. # 插入数据库
  95. await chat_result_mapper.insert_chat_result(
  96. question,
  97. dataset_id_strs,
  98. json.dumps(query_results, ensure_ascii=False),
  99. chat_result["summary"],
  100. chat_result["relevance_score"],
  101. chat_result["status"],
  102. llm_search["content"],
  103. json.dumps(llm_search["search_results"], ensure_ascii=False),
  104. 1,
  105. decision["result"],
  106. study_task_id,
  107. )
  108. return data
  109. except Exception as e:
  110. print(f"Error processing question: {question}. Error: {str(e)}")
  111. return {"query": question, "error": str(e)}
  112. async def query_search(
  113. query_text,
  114. filters=None,
  115. search_type="",
  116. anns_field="vector_text",
  117. search_params=BASE_MILVUS_SEARCH_PARAMS,
  118. _source=False,
  119. es_size=10000,
  120. sort_by=None,
  121. milvus_size=20,
  122. limit=10,
  123. ):
  124. if filters is None:
  125. filters = {}
  126. query_vector = await get_basic_embedding(text=query_text, model=DEFAULT_MODEL)
  127. resource = get_resource_manager()
  128. search_engine = HybridSearch(
  129. milvus_pool=resource.milvus_client,
  130. es_pool=resource.es_client,
  131. graph_pool=resource.graph_client,
  132. )
  133. try:
  134. match search_type:
  135. case "base":
  136. response = await search_engine.base_vector_search(
  137. query_vec=query_vector,
  138. anns_field=anns_field,
  139. search_params=search_params,
  140. limit=limit,
  141. )
  142. return response
  143. case "hybrid":
  144. response = await search_engine.hybrid_search(
  145. filters=filters,
  146. query_vec=query_vector,
  147. anns_field=anns_field,
  148. search_params=search_params,
  149. es_size=es_size,
  150. sort_by=sort_by,
  151. milvus_size=milvus_size,
  152. )
  153. case "strategy":
  154. return None
  155. case _:
  156. return None
  157. except Exception as e:
  158. return None
  159. if response is None:
  160. return None
  161. resource = get_resource_manager()
  162. content_chunk_mapper = ContentChunks(resource.mysql_client)
  163. res = []
  164. for result in response["results"]:
  165. content_chunks = await content_chunk_mapper.select_chunk_content(
  166. doc_id=result["doc_id"], chunk_id=result["chunk_id"]
  167. )
  168. if content_chunks:
  169. content_chunk = content_chunks[0]
  170. res.append(
  171. {
  172. "docId": content_chunk["doc_id"],
  173. "content": content_chunk["text"],
  174. "contentSummary": content_chunk["summary"],
  175. "score": result["score"],
  176. "datasetId": content_chunk["dataset_id"],
  177. }
  178. )
  179. return res[:limit]