Explorar o código

Merge branch 'feature/luojunhui/2025-10-16-chat-improve' of Server/rag_server into master

xueyiming hai 3 semanas
pai
achega
0fca4203a3

+ 6 - 15
applications/api/qwen.py

@@ -1,11 +1,11 @@
+import asyncio
 import dashscope
 
-
 class QwenClient:
     def __init__(self):
         self.api_key = "sk-1022fe8e15ff4e0e9abc20541b281165"
 
-    def search_and_chat(
+    async def search_and_chat(
         self,
         model="qwen3-max",
         system_prompt="You are a helpful assistant.",
@@ -13,18 +13,9 @@ class QwenClient:
         search_strategy="max",
     ):
         """
-        搜索并聊天
-
-        Args:
-            model: 模型名称,默认为qwen3-max
-            system_prompt: 系统提示词
-            user_prompt: 用户提示词
-            search_strategy: 搜索策略,可选值: turbo, max, agent
-
-        Returns:
-            dict: 包含回复内容和搜索结果
+        异步搜索并聊天(async 包装)
         """
-        try:
+        def _call_api():
             messages = [
                 {"role": "system", "content": system_prompt},
                 {"role": "user", "content": user_prompt},
@@ -54,8 +45,8 @@ class QwenClient:
 
             return {"content": content, "search_results": search_results}
 
-        except Exception as e:
-            raise Exception(f"QwenClient search_and_chat失败: {str(e)}")
+        # 🔹 用 to_thread 包装同步逻辑变成异步任务
+        return await asyncio.to_thread(_call_api)
 
 
 if __name__ == "__main__":

+ 3 - 0
applications/prompts/__init__.py

@@ -1 +1,4 @@
 from .build_graph import extract_entity_and_graph
+from .chat_prompts import build_rag_prompt
+
+

+ 202 - 0
applications/prompts/chat_prompts.py

@@ -0,0 +1,202 @@
+from typing import List, Dict, Optional, Any
+
+
+def map_prompt(question: str, format_context: str) -> str:
+    prompt = (
+        f"【任务说明】:针对下方单个文档片段,提炼与“问题”直接相关的‘可被引用的事实要点’\n"
+        f"【问题】: {question}\n"
+        f"【输入】:{format_context}\n"
+        "【输出】:只输出 JSON,不要多余文本,格式如下:\n"
+        "{\n"
+        '  "id": "id",\n'
+        '  "claims": [\n'
+        '    {"point": "事实要点1(尽量原文转述/精准改写)"},\n'
+        '    {"point": "事实要点2"}\n'
+        '  ],\n'
+        '  "conflicts_or_limits": ["该片段的限制/含糊点(如时间、定义口径、版本号等)"]\n'
+        "}"
+    )
+    return prompt
+
+
+def reduce_prompt(question: str, mapped_results_json_list: Optional[str]) -> str:
+    prompt = (
+        "【任务】:合并多份 Map 结果,完成以下三点:\n"
+        "1) 去重并合并同义要点;\n"
+        "2) 标注并归纳冲突点;\n"
+        "3) 输出最终回答(含引用)。\n"
+        f"【问题】:{question}\n"
+        f"【Map 结果】:{mapped_results_json_list or '[]'}\n"
+        "【输出(Markdown)】:\n"
+        "- 简要结论(2-4句)\n"
+        "- 关键要点(每点附主要引用,如 [Chunk-12] [Chunk-4],用 map 结果中的 id 来表示)\n"
+        "- 证据信息不一致(如有):列出冲突内容、涉及的 doc_id、可能原因\n"
+        "- 信息缺口(如有)\n"
+    )
+    return prompt
+
+
+def verify_prompt(question: str, draft: str, formatted_contexts: str) -> str:
+    prompt = (
+        "【任务】: 对“初稿答案”的关键断言逐条核验,严格限定仅使用上下文;"
+        "请标注每条断言是否被证据‘支持/相矛盾/证据不足’,必要时修正结论。\n"
+        f"【问题】\n{question}\n"
+        f"【初稿答案】\n{draft}\n"
+        f"【上下文】\n{formatted_contexts}\n"
+        "【输出(只输出 JSON)】\n"
+        "{\n"
+        '  "verdicts": [\n'
+        '    {"claim": "断言内容", "status": "supported|contradicted|insufficient", "citations": ["[Chunk-{id}]"]}\n'
+        "  ],\n"
+        '  "final_answer": "(如需修正,请给出修正后的简明答案,并附引用)"\n'
+        "}"
+    )
+    return prompt
+
+
+def merge_prompt(question: str, rag_answer: str, formatted_contexts: str) -> str:
+    """
+    合并策略:
+    1) 先判定 RAG 初稿是否“足够/部分/不足”;
+    2) 若不足或部分,用通用大模型(允许常识/公开资料)补充‘外部要点’,并标注为 [EXT];
+    3) 产出融合后的最终答案,优先采用带 [C{id}] 的已证据支撑结论;
+    4) 输出审计区:区分 RAG 引用与外部补充,列出信息缺口。
+    """
+    prompt = (
+        "【任务】:先判断 RAG 初稿是否足以回答问题;若不足或仅部分覆盖,则在“允许使用常识/公开资料”的前提下补充,并与上下文证据进行“无缝融合”,产出一份单一成稿(禁止把上下文与外部补充分栏或分章节呈现)。\n"
+        "【必须遵守】\n"
+        "- 严禁以“基于上下文/外部补充/对标/工具列表”等标题分隔来源内容;最终仅保留一份融合后的成稿。\n"
+        "- 句内引用:凡是有依据的关键信息,紧随其后附上来源标记;上下文证据用 [Chunk-{id}],外部补充用 [EXT],可选在括号中加来源类型(如[EXT-百科]、[EXT-官网])。\n"
+        "- 证据优先级:优先采用【上下文】已被证实的结论;若与外部资料冲突,优先保留【上下文】并在句末以 [EXT-Conflict] 标注冲突存在。\n"
+        "- 去重与融合:对上下文与外部的相似要点进行合并表述,避免重复罗列;统一术语与口径。\n"
+        "- 若信息仍不足,明确列出“信息缺口”。\n"
+        "- 最终答案必须是可直接给用户的完整回应(可含小标题/列表),但不得按照来源划分结构。\n\n"
+        f"【问题】\n{question}\n\n"
+        f"【RAG 初稿】\n{rag_answer}\n\n"
+        f"【上下文】\n{formatted_contexts}\n\n"
+        # "【写作要求】\n"
+        # "- 开头 1–2 句给出结论或核心抓手;随后组织为自然段或要点清单(3–8 条)。\n"
+        # "- 每条关键结论或数据点后紧跟其来源标记;能不标就不写空标注。\n"
+        # "- 语气务实、可执行,避免流水账与堆砌名词。\n"
+        # "- 禁止出现“基于上下文…/外部补充…”等来源分栏表述。\n\n"
+        "【输出(只输出 JSON)】\n"
+        "{\n"
+        '  "status": "判断最终结果是否能回答用户问题,若能回答输出 1,若不能则输出 0",\n'
+        '  "reason": "判断的理由(例如:核心问题是否被覆盖、关键数据/定义是否缺失)",\n'
+        '  "final_answer_markdown": "融合后的最终答案,不要输出信息缺口(每一条信息需标注 [Chunk-{id}] 与 [EXT])",\n'
+        '  "audit": {\n'
+        '    "rag_citations": ["[Chunk-12]", "[Chunk-4]"],\n'
+        '    "external_claims": [\n'
+        '      {"claim":"外部补充要点1","note":"来源类型/可信度(如:官网/百科/标准) [EXT]"},\n'
+        '      {"claim":"外部补充要点2","note":"… [EXT]"}\n'
+        '    ],\n'
+        '    "gaps": ["仍然缺失或无法确认的信息"]\n'
+        "  }\n"
+        "}\n"
+        "【合规自检(执行后自检,不输出给用户)】:若发现答案结构仍以来源分栏或出现“外部补充/基于上下文”等分栏标题,则立即重写为单一成稿。"
+    )
+
+    return prompt
+
+
+def build_rag_prompt(
+    question: str,
+    contexts: List[Dict],
+    mode: str = "single",  # single | map | reduce | rerank | verify | merge
+    max_chars_per_chunk: int = 800,
+    draft_answer: Optional[str] = None,              # verify/merge 用:RAG 初稿或草稿
+    mapped_results: Optional[str] = None,  # reduce 用
+) -> Dict[str, Any]:
+    """
+    生成 RAG 聚合阶段所需的提示词(Prompt)。
+    返回值依据 mode 不同而不同:
+      - single: {"system": str, "user": str}
+      - map:    {"system": str, "user_list": List[str]}   # 每个片段一条 Map 提示
+      - reduce: {"system": str, "user": str}
+      - rerank: {"system": str, "user": str}
+      - verify: {"system": str, "user": str}
+      - merge:  {"system": str, "user": str}
+    """
+
+    # —— 辅助函数 ——
+    def _trim(text: str, limit: int) -> str:
+        text = (text or "").strip().replace("\n", " ")
+        return text if len(text) <= limit else text[: max(0, limit - 1)] + "…"
+
+    def _format_each_chunk(chunk: Dict) -> str:
+        # 兼容不同返回字段:content / text
+        content = chunk.get("content") or chunk.get("text") or ""
+        bits = [f"Chunk id={chunk.get('id', '')}"]
+        if "score" in chunk and chunk["score"] is not None:
+            try:
+                bits.append(f"score={round(float(chunk['score']), 4)}")
+            except Exception:
+                pass
+        if chunk.get("url"):
+            bits.append(f"url={chunk['url']}")
+        prefix = "[" + " ".join(bits) + "]"
+        snippet = _trim(content, max_chars_per_chunk)
+        return f"{prefix}\n{snippet}\n"
+
+    def _format_contexts(chunks: List[Dict]) -> str:
+        return "\n".join(_format_each_chunk(c) for c in chunks).strip()
+
+    # —— 统一 System 约束(全中文) ——
+    system_text = (
+        "你是一位“基于证据的助手”。你必须只使用我提供的【上下文】来回答:\n"
+        "- 不得使用外部常识或臆测;\n"
+        "- 若上下文不足,请明确输出“信息不足”,并指出缺失的信息类型;\n"
+        "- 对关键结论附 [Chunk-{id}] 形式的出处;\n"
+        "- 如存在冲突证据,请列出冲突并给出谨慎结论与采信依据;\n"
+        "- 用中文回答,保持简洁、结构化。"
+    )
+
+    formatted_contexts = _format_contexts(contexts)
+
+    # —— 各模式分发 ——
+    if mode == "single":
+        user_text = (
+            f"【问题】\n{question}\n\n"
+            f"【上下文(已按相关性排序)】\n{formatted_contexts}\n\n"
+            "【请按以下结构作答】\n"
+            "1) 简要结论(2-4句)\n"
+            "2) 关键要点(每点附1-2个引用,如 [C3])\n"
+            "3) 证据信息不一致(如有)\n"
+            "4) 信息缺口(如有)"
+        )
+        return {"system": system_text, "user": user_text}
+
+    if mode == "map":
+        map_user_list = []
+        for ctx in contexts:
+            format_context = _format_each_chunk(ctx)
+            map_user_list.append(map_prompt(question, format_context))
+        return {"system": system_text, "user_list": map_user_list}
+
+    if mode == "reduce":
+        return {"system": system_text, "user": reduce_prompt(question, mapped_results)}
+
+    if mode == "rerank":
+        rerank_system = "你是一位严谨的重排器。请只输出 JSON。"
+        rerank_user = (
+            f"请比较下列候选段与问题“{question}”的相关性,仅打分并排序(不做总结)。\n"
+            "评分标准(由高到低):直接回答性 > 主题一致性 > 细节重合度 > 时间匹配。\n\n"
+            f"【候选段】\n{formatted_contexts}\n\n"
+            "【只输出 JSON,格式如下(按 score 从高到低)】\n"
+            '[{"id":"DOC_ID","score":X.X}]'
+        )
+        return {"system": rerank_system, "user": rerank_user}
+
+    if mode == "verify":
+        draft = draft_answer or "(此处为初稿答案)"
+        return {"system": system_text, "user": verify_prompt(question, draft, formatted_contexts)}
+
+    if mode == "merge":
+        if not draft_answer:
+            raise ValueError("merge 模式需要提供 draft_answer(即 RAG 初稿答案)。")
+        return {"system": system_text, "user": merge_prompt(question, draft_answer, formatted_contexts)}
+
+    raise ValueError(f"不支持的模式:{mode}")
+
+
+__all__ = ["build_rag_prompt"]

+ 29 - 6
applications/utils/chat/rag_chat_agent.py

@@ -1,9 +1,8 @@
 import asyncio
 import json
-from typing import List
 
-from applications.config import Chunk
 from applications.api import fetch_deepseek_completion
+from applications.prompts import build_rag_prompt
 
 
 class RAGChatAgent:
@@ -116,7 +115,6 @@ class RAGChatAgent:
         1. **RAG 搜索回答**:
         问题: {query}
         总结: {chat_res["summary"]}
-        相关度评分: {chat_res["relevance_score"]}
         状态: {"可以回答" if chat_res["status"] == 1 else "无法回答"}
 
         2. **AI 搜索结果**:
@@ -125,14 +123,12 @@ class RAGChatAgent:
         搜索结果: {json.dumps(search_res["search_results"], ensure_ascii=False)}
         
         基于这两个结果,请你综合判断并生成一个更好的答案,如果可能的话。你可以选择结合 `chat_res` 和 `search_res`,或者选择其中更合适的一个进行回答。如果没有足够的信息可以回答,请用你自己已有的知识回答"。
-        基于回答的结果,总结回答的答案中使用的工具,名称以及用途,如果没有涉及到工具的使用,则不需要总结
 
         请返回以下格式的 JSON 结果:
         {{
             "result": "<最终的答案,输出markdown格式>",
             "relevance_score": <0到1之间的小数,表示总结与问题的相关度>,
-            "status": <1代表回答的好,0代表回答的不好>,
-            "used_tools": ["工具名1: 用途描述", "工具名2: 用途描述"]
+            "status": <1代表回答的好,0代表回答的不好>
         }}
         """
 
@@ -184,3 +180,30 @@ class RAGChatAgent:
             model="DeepSeek-V3", prompt=prompt, output_type="json"
         )
         return response
+
+    async def get_chat_res(self, question, raw_list):
+        pack = build_rag_prompt(question, raw_list, mode="map")
+        user_list = pack["user_list"]
+
+        map_raw_outputs = await asyncio.gather(*[
+            fetch_deepseek_completion(model="default", prompt=prompt, output_type="json")
+            for prompt in user_list
+        ])
+
+        # 3) Reduce
+        mapped_results_json_list = json.dumps(map_raw_outputs, ensure_ascii=False)
+        reduce_pack = build_rag_prompt(
+            question, [], mode="reduce", mapped_results=mapped_results_json_list
+        )
+        reduce_response = await fetch_deepseek_completion(
+            model="default", prompt=reduce_pack['system'] + reduce_pack['user']
+        )
+        # final result
+        merge_pack = build_rag_prompt(question, raw_list, mode="merge", draft_answer=reduce_response)
+        final = await fetch_deepseek_completion(
+            model="default", prompt=merge_pack['system'] + merge_pack['user'], output_type="json"
+        )
+        status = final['status']
+        final_answer = final['final_answer_markdown']
+        return {"summary": final_answer, "status": status}
+

+ 10 - 14
applications/utils/task/async_task.py

@@ -1,3 +1,4 @@
+import asyncio
 import json
 import os
 import uuid
@@ -79,7 +80,7 @@ async def handle_books():
         print(f"处理请求失败,错误: {e}")
 
 
-async def process_question(question, query_text, rag_chat_agent):
+async def process_question(question, resource, qwen_client, rag_chat_agent):
     try:
         dataset_id_strs = "11,12"
         dataset_ids = dataset_id_strs.split(",")
@@ -92,19 +93,15 @@ async def process_question(question, query_text, rag_chat_agent):
             search_type=search_type,
         )
 
-        resource = get_resource_manager()
         chat_result_mapper = ChatResult(resource.mysql_client)
-
-        # 异步执行 chat 与 deepseek 的对话
-        chat_result = await rag_chat_agent.chat_with_deepseek(question, query_results)
-
-        # # 判断是否需要执行 study
+        chat_task = rag_chat_agent.get_chat_res(question, query_results)
+        llm_task = qwen_client.search_and_chat(
+            user_prompt=question, search_strategy="agent"
+        )
+        chat_result, llm_search = await asyncio.gather(chat_task, llm_task)
         study_task_id = None
         if chat_result["status"] == 0:
             study_task_id = study(question)["task_id"]
-
-        qwen_client = QwenClient()
-        llm_search = qwen_client.search_and_chat(user_prompt=question)
         decision = await rag_chat_agent.make_decision(question, chat_result, llm_search)
 
         # 构建返回的数据
@@ -112,8 +109,7 @@ async def process_question(question, query_text, rag_chat_agent):
             "query": question,
             "result": decision["result"],
             "status": decision["status"],
-            "relevance_score": decision["relevance_score"],
-            # "used_tools": decision["used_tools"],
+            "relevance_score": decision["relevance_score"]
         }
 
         # 插入数据库
@@ -122,7 +118,7 @@ async def process_question(question, query_text, rag_chat_agent):
             dataset_id_strs,
             json.dumps(query_results, ensure_ascii=False),
             chat_result["summary"],
-            chat_result["relevance_score"],
+            decision["relevance_score"],
             chat_result["status"],
             llm_search["content"],
             json.dumps(llm_search["search_results"], ensure_ascii=False),
@@ -188,7 +184,6 @@ async def query_search(
     resource = get_resource_manager()
     content_chunk_mapper = ContentChunks(resource.mysql_client)
     res = []
-    print(json.dumps(response["results"], ensure_ascii=False, indent=2))
     for result in response["results"]:
         content_chunks = await content_chunk_mapper.select_chunk_content(
             doc_id=result["doc_id"], chunk_id=result["chunk_id"]
@@ -197,6 +192,7 @@ async def query_search(
             content_chunk = content_chunks[0]
             res.append(
                 {
+                    "id": content_chunk["id"],
                     "docId": content_chunk["doc_id"],
                     "content": content_chunk["text"],
                     "contentSummary": content_chunk["summary"],

+ 5 - 64
mcp_server/server.py

@@ -10,7 +10,7 @@ from applications.utils.chat import RAGChatAgent
 from applications.utils.mysql import ChatResult
 from applications.api.qwen import QwenClient
 from applications.utils.spider.study import study
-from applications.utils.task.async_task import query_search
+from applications.utils.task.async_task import query_search, process_question
 
 
 def create_mcp_server() -> Server:
@@ -53,67 +53,6 @@ def create_mcp_server() -> Server:
 
     return app
 
-
-async def process_question(question, query_text, rag_chat_agent):
-    try:
-        dataset_id_strs = "11,12"
-        dataset_ids = dataset_id_strs.split(",")
-        search_type = "hybrid"
-
-        # 执行查询任务
-        query_results = await query_search(
-            query_text=question,
-            filters={"dataset_id": dataset_ids},
-            search_type=search_type,
-        )
-
-        resource = get_resource_manager()
-        chat_result_mapper = ChatResult(resource.mysql_client)
-
-        # 异步执行 chat 与 deepseek 的对话
-        chat_result = await rag_chat_agent.chat_with_deepseek(question, query_results)
-
-        # # 判断是否需要执行 study
-        study_task_id = None
-        if chat_result["status"] == 0:
-            study_task_id = study(question)["task_id"]
-
-        qwen_client = QwenClient()
-        llm_search = qwen_client.search_and_chat(
-            user_prompt=question, search_strategy="agent"
-        )
-
-        # 执行决策逻辑
-        decision = await rag_chat_agent.make_decision(question, chat_result, llm_search)
-
-        # 构建返回的数据
-        data = {
-            "query": question,
-            "result": decision["result"],
-            "status": decision["status"],
-            "relevance_score": decision["relevance_score"],
-        }
-
-        # 插入数据库
-        await chat_result_mapper.insert_chat_result(
-            question,
-            dataset_id_strs,
-            json.dumps(query_results, ensure_ascii=False),
-            chat_result["summary"],
-            chat_result["relevance_score"],
-            chat_result["status"],
-            llm_search["content"],
-            json.dumps(llm_search["search_results"], ensure_ascii=False),
-            1,
-            decision["result"],
-            study_task_id,
-        )
-        return data
-    except Exception as e:
-        print(f"Error processing question: {question}. Error: {str(e)}")
-        return {"query": question, "error": str(e)}
-
-
 async def rag_search(query_text: str):
     rag_chat_agent = RAGChatAgent()
     split_questions = []
@@ -121,12 +60,14 @@ async def rag_search(query_text: str):
     # split_questions = spilt_query["split_questions"]
     split_questions.append(query_text)
 
+    resource = get_resource_manager()
+    qwen_client = QwenClient()
+    rag_chat_agent = RAGChatAgent()
     # 使用asyncio.gather并行处理每个问题
     tasks = [
-        process_question(question, query_text, rag_chat_agent)
+        process_question(question, resource, qwen_client, rag_chat_agent)
         for question in split_questions
     ]
-
     # 等待所有任务完成并收集结果
     data_list = await asyncio.gather(*tasks)
     return data_list

+ 8 - 9
routes/blueprint.py

@@ -374,10 +374,11 @@ async def chat():
 
     rag_chat_agent = RAGChatAgent()
     qwen_client = QwenClient()
-    chat_result = await rag_chat_agent.chat_with_deepseek(query_text, query_results)
-    llm_search = qwen_client.search_and_chat(
+    chat_task = rag_chat_agent.get_chat_res(query_text, query_results)
+    llm_task = qwen_client.search_and_chat(
         user_prompt=query_text, search_strategy="agent"
     )
+    chat_result, llm_search = await asyncio.gather(chat_task, llm_task)
     decision = await rag_chat_agent.make_decision(query_text, chat_result, llm_search)
     data = {
         "results": query_results,
@@ -391,7 +392,7 @@ async def chat():
         dataset_id_strs,
         json.dumps(query_results, ensure_ascii=False),
         chat_result["summary"],
-        chat_result["relevance_score"],
+        decision["relevance_score"],
         chat_result["status"],
         llm_search["content"],
         json.dumps(llm_search["search_results"], ensure_ascii=False),
@@ -481,18 +482,16 @@ async def delete_task():
 async def rag_search():
     body = await request.get_json()
     query_text = body.get("queryText")
-    rag_chat_agent = RAGChatAgent()
-    # spilt_query = await rag_chat_agent.split_query(query_text)
-    # split_questions = spilt_query["split_questions"]
-    # split_questions.append(query_text)
     split_questions = []
     split_questions.append(query_text)
+    resource = get_resource_manager()
+    qwen_client = QwenClient()
+    rag_chat_agent = RAGChatAgent()
     # 使用asyncio.gather并行处理每个问题
     tasks = [
-        process_question(question, query_text, rag_chat_agent)
+        process_question(question, resource, qwen_client, rag_chat_agent)
         for question in split_questions
     ]
-
     # 等待所有任务完成并收集结果
     data_list = await asyncio.gather(*tasks)
     return jsonify({"status_code": 200, "detail": "success", "data": data_list})