ソースを参照

使用优化后的总结prompt

xueyiming 3 週間 前
コミット
f5a76027c3

+ 6 - 15
applications/api/qwen.py

@@ -1,11 +1,11 @@
+import asyncio
 import dashscope
 
-
 class QwenClient:
     def __init__(self):
         self.api_key = "sk-1022fe8e15ff4e0e9abc20541b281165"
 
-    def search_and_chat(
+    async def search_and_chat(
         self,
         model="qwen3-max",
         system_prompt="You are a helpful assistant.",
@@ -13,18 +13,9 @@ class QwenClient:
         search_strategy="max",
     ):
         """
-        搜索并聊天
-
-        Args:
-            model: 模型名称,默认为qwen3-max
-            system_prompt: 系统提示词
-            user_prompt: 用户提示词
-            search_strategy: 搜索策略,可选值: turbo, max, agent
-
-        Returns:
-            dict: 包含回复内容和搜索结果
+        异步搜索并聊天(async 包装)
         """
-        try:
+        def _call_api():
             messages = [
                 {"role": "system", "content": system_prompt},
                 {"role": "user", "content": user_prompt},
@@ -54,8 +45,8 @@ class QwenClient:
 
             return {"content": content, "search_results": search_results}
 
-        except Exception as e:
-            raise Exception(f"QwenClient search_and_chat失败: {str(e)}")
+        # 🔹 用 to_thread 包装同步逻辑变成异步任务
+        return await asyncio.to_thread(_call_api)
 
 
 if __name__ == "__main__":

+ 30 - 4
applications/utils/chat/rag_chat_agent.py

@@ -1,6 +1,8 @@
+import asyncio
 import json
 
 from applications.api import fetch_deepseek_completion
+from applications.prompts import build_rag_prompt
 
 
 class RAGChatAgent:
@@ -113,7 +115,6 @@ class RAGChatAgent:
         1. **RAG 搜索回答**:
         问题: {query}
         总结: {chat_res["summary"]}
-        相关度评分: {chat_res["relevance_score"]}
         状态: {"可以回答" if chat_res["status"] == 1 else "无法回答"}
 
         2. **AI 搜索结果**:
@@ -122,14 +123,12 @@ class RAGChatAgent:
         搜索结果: {json.dumps(search_res["search_results"], ensure_ascii=False)}
         
         基于这两个结果,请你综合判断并生成一个更好的答案,如果可能的话。你可以选择结合 `chat_res` 和 `search_res`,或者选择其中更合适的一个进行回答。如果没有足够的信息可以回答,请用你自己已有的知识回答"。
-        基于回答的结果,总结回答的答案中使用的工具,名称以及用途,如果没有涉及到工具的使用,则不需要总结
 
         请返回以下格式的 JSON 结果:
         {{
             "result": "<最终的答案,输出markdown格式>",
             "relevance_score": <0到1之间的小数,表示总结与问题的相关度>,
-            "status": <1代表回答的好,0代表回答的不好>,
-            "used_tools": ["工具名1: 用途描述", "工具名2: 用途描述"]
+            "status": <1代表回答的好,0代表回答的不好>
         }}
         """
 
@@ -181,3 +180,30 @@ class RAGChatAgent:
             model="DeepSeek-V3", prompt=prompt, output_type="json"
         )
         return response
+
+    async def get_chat_res(self, question, raw_list):
+        pack = build_rag_prompt(question, raw_list, mode="map")
+        user_list = pack["user_list"]
+
+        map_raw_outputs = await asyncio.gather(*[
+            fetch_deepseek_completion(model="default", prompt=prompt, output_type="json")
+            for prompt in user_list
+        ])
+
+        # 3) Reduce
+        mapped_results_json_list = json.dumps(map_raw_outputs, ensure_ascii=False)
+        reduce_pack = build_rag_prompt(
+            question, [], mode="reduce", mapped_results=mapped_results_json_list
+        )
+        reduce_response = await fetch_deepseek_completion(
+            model="default", prompt=reduce_pack['system'] + reduce_pack['user']
+        )
+        # final result
+        merge_pack = build_rag_prompt(question, raw_list, mode="merge", draft_answer=reduce_response)
+        final = await fetch_deepseek_completion(
+            model="default", prompt=merge_pack['system'] + merge_pack['user'], output_type="json"
+        )
+        status = final['status']
+        final_answer = final['final_answer_markdown']
+        return {"summary": final_answer, "status": status}
+

+ 10 - 14
applications/utils/task/async_task.py

@@ -1,3 +1,4 @@
+import asyncio
 import json
 import os
 import uuid
@@ -79,7 +80,7 @@ async def handle_books():
         print(f"处理请求失败,错误: {e}")
 
 
-async def process_question(question, query_text, rag_chat_agent):
+async def process_question(question, resource, qwen_client, rag_chat_agent):
     try:
         dataset_id_strs = "11,12"
         dataset_ids = dataset_id_strs.split(",")
@@ -92,19 +93,15 @@ async def process_question(question, query_text, rag_chat_agent):
             search_type=search_type,
         )
 
-        resource = get_resource_manager()
         chat_result_mapper = ChatResult(resource.mysql_client)
-
-        # 异步执行 chat 与 deepseek 的对话
-        chat_result = await rag_chat_agent.chat_with_deepseek(question, query_results)
-
-        # # 判断是否需要执行 study
+        chat_task = rag_chat_agent.get_chat_res(question, query_results)
+        llm_task = qwen_client.search_and_chat(
+            user_prompt=question, search_strategy="agent"
+        )
+        chat_result, llm_search = await asyncio.gather(chat_task, llm_task)
         study_task_id = None
         if chat_result["status"] == 0:
             study_task_id = study(question)["task_id"]
-
-        qwen_client = QwenClient()
-        llm_search = qwen_client.search_and_chat(user_prompt=question)
         decision = await rag_chat_agent.make_decision(question, chat_result, llm_search)
 
         # 构建返回的数据
@@ -112,8 +109,7 @@ async def process_question(question, query_text, rag_chat_agent):
             "query": question,
             "result": decision["result"],
             "status": decision["status"],
-            "relevance_score": decision["relevance_score"],
-            # "used_tools": decision["used_tools"],
+            "relevance_score": decision["relevance_score"]
         }
 
         # 插入数据库
@@ -122,7 +118,7 @@ async def process_question(question, query_text, rag_chat_agent):
             dataset_id_strs,
             json.dumps(query_results, ensure_ascii=False),
             chat_result["summary"],
-            chat_result["relevance_score"],
+            decision["relevance_score"],
             chat_result["status"],
             llm_search["content"],
             json.dumps(llm_search["search_results"], ensure_ascii=False),
@@ -188,7 +184,6 @@ async def query_search(
     resource = get_resource_manager()
     content_chunk_mapper = ContentChunks(resource.mysql_client)
     res = []
-    print(json.dumps(response["results"], ensure_ascii=False, indent=2))
     for result in response["results"]:
         content_chunks = await content_chunk_mapper.select_chunk_content(
             doc_id=result["doc_id"], chunk_id=result["chunk_id"]
@@ -197,6 +192,7 @@ async def query_search(
             content_chunk = content_chunks[0]
             res.append(
                 {
+                    "id": content_chunk["id"],
                     "docId": content_chunk["doc_id"],
                     "content": content_chunk["text"],
                     "contentSummary": content_chunk["summary"],

+ 5 - 64
mcp_server/server.py

@@ -10,7 +10,7 @@ from applications.utils.chat import RAGChatAgent
 from applications.utils.mysql import ChatResult
 from applications.api.qwen import QwenClient
 from applications.utils.spider.study import study
-from applications.utils.task.async_task import query_search
+from applications.utils.task.async_task import query_search, process_question
 
 
 def create_mcp_server() -> Server:
@@ -53,67 +53,6 @@ def create_mcp_server() -> Server:
 
     return app
 
-
-async def process_question(question, query_text, rag_chat_agent):
-    try:
-        dataset_id_strs = "11,12"
-        dataset_ids = dataset_id_strs.split(",")
-        search_type = "hybrid"
-
-        # 执行查询任务
-        query_results = await query_search(
-            query_text=question,
-            filters={"dataset_id": dataset_ids},
-            search_type=search_type,
-        )
-
-        resource = get_resource_manager()
-        chat_result_mapper = ChatResult(resource.mysql_client)
-
-        # 异步执行 chat 与 deepseek 的对话
-        chat_result = await rag_chat_agent.chat_with_deepseek(question, query_results)
-
-        # # 判断是否需要执行 study
-        study_task_id = None
-        if chat_result["status"] == 0:
-            study_task_id = study(question)["task_id"]
-
-        qwen_client = QwenClient()
-        llm_search = qwen_client.search_and_chat(
-            user_prompt=question, search_strategy="agent"
-        )
-
-        # 执行决策逻辑
-        decision = await rag_chat_agent.make_decision(question, chat_result, llm_search)
-
-        # 构建返回的数据
-        data = {
-            "query": question,
-            "result": decision["result"],
-            "status": decision["status"],
-            "relevance_score": decision["relevance_score"],
-        }
-
-        # 插入数据库
-        await chat_result_mapper.insert_chat_result(
-            question,
-            dataset_id_strs,
-            json.dumps(query_results, ensure_ascii=False),
-            chat_result["summary"],
-            chat_result["relevance_score"],
-            chat_result["status"],
-            llm_search["content"],
-            json.dumps(llm_search["search_results"], ensure_ascii=False),
-            1,
-            decision["result"],
-            study_task_id,
-        )
-        return data
-    except Exception as e:
-        print(f"Error processing question: {question}. Error: {str(e)}")
-        return {"query": question, "error": str(e)}
-
-
 async def rag_search(query_text: str):
     rag_chat_agent = RAGChatAgent()
     split_questions = []
@@ -121,12 +60,14 @@ async def rag_search(query_text: str):
     # split_questions = spilt_query["split_questions"]
     split_questions.append(query_text)
 
+    resource = get_resource_manager()
+    qwen_client = QwenClient()
+    rag_chat_agent = RAGChatAgent()
     # 使用asyncio.gather并行处理每个问题
     tasks = [
-        process_question(question, query_text, rag_chat_agent)
+        process_question(question, resource, qwen_client, rag_chat_agent)
         for question in split_questions
     ]
-
     # 等待所有任务完成并收集结果
     data_list = await asyncio.gather(*tasks)
     return data_list

+ 8 - 9
routes/blueprint.py

@@ -374,10 +374,11 @@ async def chat():
 
     rag_chat_agent = RAGChatAgent()
     qwen_client = QwenClient()
-    chat_result = await rag_chat_agent.chat_with_deepseek(query_text, query_results)
-    llm_search = qwen_client.search_and_chat(
+    chat_task = rag_chat_agent.get_chat_res(query_text, query_results)
+    llm_task = qwen_client.search_and_chat(
         user_prompt=query_text, search_strategy="agent"
     )
+    chat_result, llm_search = await asyncio.gather(chat_task, llm_task)
     decision = await rag_chat_agent.make_decision(query_text, chat_result, llm_search)
     data = {
         "results": query_results,
@@ -391,7 +392,7 @@ async def chat():
         dataset_id_strs,
         json.dumps(query_results, ensure_ascii=False),
         chat_result["summary"],
-        chat_result["relevance_score"],
+        decision["relevance_score"],
         chat_result["status"],
         llm_search["content"],
         json.dumps(llm_search["search_results"], ensure_ascii=False),
@@ -481,18 +482,16 @@ async def delete_task():
 async def rag_search():
     body = await request.get_json()
     query_text = body.get("queryText")
-    rag_chat_agent = RAGChatAgent()
-    # spilt_query = await rag_chat_agent.split_query(query_text)
-    # split_questions = spilt_query["split_questions"]
-    # split_questions.append(query_text)
     split_questions = []
     split_questions.append(query_text)
+    resource = get_resource_manager()
+    qwen_client = QwenClient()
+    rag_chat_agent = RAGChatAgent()
     # 使用asyncio.gather并行处理每个问题
     tasks = [
-        process_question(question, query_text, rag_chat_agent)
+        process_question(question, resource, qwen_client, rag_chat_agent)
         for question in split_questions
     ]
-
     # 等待所有任务完成并收集结果
     data_list = await asyncio.gather(*tasks)
     return jsonify({"status_code": 200, "detail": "success", "data": data_list})