Selaa lähdekoodia

Merge branch 'master' into feature/luojunhui/2025-09-28-chunk-improve
merge master

luojunhui 1 viikko sitten
vanhempi
commit
f773fad72a

+ 2 - 2
applications/utils/chat/__init__.py

@@ -1,4 +1,4 @@
-from applications.utils.chat.chat_classifier import ChatClassifier
+from applications.utils.chat.rag_chat_agent import RAGChatAgent
 
 
-__all__ = ["ChatClassifier"]
+__all__ = ["RAGChatAgent"]

+ 0 - 65
applications/utils/chat/chat_classifier.py

@@ -1,65 +0,0 @@
-from typing import List
-
-from applications.config import Chunk
-from applications.api import fetch_deepseek_completion
-
-
-class ChatClassifier:
-    @staticmethod
-    def generate_summary_prompt(query, search_results):
-        """
-        生成总结的prompt。交给AI根据搜索结果判断内容是否对回答问题有帮助,
-        并结合内容生成总结和判断是否能回答问题。
-
-        :param query: 问题
-        :param search_results: 搜索结果列表,每个元素包含 'content', 'contentSummary'
-        :return: 生成的总结prompt
-        """
-
-        prompt = f"问题: {query}\n\n请结合以下搜索结果,生成一个总结:\n"
-
-        weighted_summaries = []
-        weighted_contents = []
-
-        # 将所有搜索结果的摘要和内容按相似度排序
-        for result in search_results:
-            content = result["content"]
-            content_summary = result["contentSummary"]
-
-            weighted_summaries.append(content_summary)
-            weighted_contents.append(content)
-
-        # 拼接加权摘要和内容
-        prompt += "\n-- 内容摘要 --\n"
-        for summary in weighted_summaries:
-            prompt += f"摘要: {summary}\n"
-
-        prompt += "\n-- 内容 --\n"
-        for content in weighted_contents:
-            prompt += f"内容: {content}\n"
-
-        # 约束 AI 输出 JSON
-        prompt += """
-    请根据上述内容判断能否回答问题,并生成一个总结,返回 JSON 格式,结构如下:
-
-    {
-      "query": "<原始问题>",
-      "summary": "<简洁总结>",
-      "relevance_score": <0到1之间的小数,表示总结与问题的相关度>,
-      "status": <判断能否回答这个问题,0代表不能回答,1代表可以回答>
-    }
-
-    注意:
-    - 只输出 JSON,不要额外解释。
-    - relevance_score 数字越大,表示总结和问题越相关。
-    - 请根据问题和给定的搜索结果内容,判断是否能回答该问题。返回一个 0 或 1 的 status,表示能否回答问题。
-    """
-
-        return prompt
-
-    async def chat_with_deepseek(self, query, search_results):
-        prompt = self.generate_summary_prompt(query, search_results)
-        response = await fetch_deepseek_completion(
-            model="DeepSeek-V3", prompt=prompt, output_type="json"
-        )
-        return response

+ 143 - 0
applications/utils/chat/rag_chat_agent.py

@@ -0,0 +1,143 @@
+from typing import List
+
+from applications.config import Chunk
+from applications.api import fetch_deepseek_completion
+
+
+class RAGChatAgent:
+    @staticmethod
+    def generate_summary_prompt(query, search_results):
+        """
+        生成总结的prompt。交给AI根据搜索结果判断内容是否对回答问题有帮助,
+        并结合内容生成总结和判断是否能回答问题。
+
+        :param query: 问题
+        :param search_results: 搜索结果列表,每个元素包含 'content', 'contentSummary'
+        :return: 生成的总结prompt
+        """
+
+        prompt = f"问题: {query}\n\n请结合以下搜索结果,生成一个总结:\n"
+
+        weighted_summaries = []
+        weighted_contents = []
+
+        # 将所有搜索结果的摘要和内容按相似度排序
+        for result in search_results:
+            content = result["content"]
+            content_summary = result["contentSummary"]
+
+            weighted_summaries.append(content_summary)
+            weighted_contents.append(content)
+
+        # 拼接加权摘要和内容
+        prompt += "\n-- 内容摘要 --\n"
+        for summary in weighted_summaries:
+            prompt += f"摘要: {summary}\n"
+
+        prompt += "\n-- 内容 --\n"
+        for content in weighted_contents:
+            prompt += f"内容: {content}\n"
+
+        # 约束 AI 输出 JSON
+        prompt += """
+    请根据上述内容判断能否回答问题,并生成一个总结,返回 JSON 格式,结构如下:
+
+    {
+      "query": "<原始问题>",
+      "summary": "<简洁总结>",
+      "relevance_score": <0到1之间的小数,表示总结与问题的相关度>,
+      "status": <判断能否回答这个问题,0代表不能回答,1代表可以回答>
+    }
+
+    注意:
+    - 只输出 JSON,不要额外解释。
+    - relevance_score 数字越大,表示总结和问题越相关。
+    - 请根据问题和给定的搜索结果内容,判断是否能回答该问题。返回一个 0 或 1 的 status,表示能否回答问题。
+    """
+
+        return prompt
+
+    async def chat_with_deepseek(self, query, search_results):
+        prompt = self.generate_summary_prompt(query, search_results)
+        response = await fetch_deepseek_completion(
+            model="DeepSeek-V3", prompt=prompt, output_type="json"
+        )
+        return response
+
+    @staticmethod
+    def create_query_prompt(question: str) -> str:
+        """
+        封装生成查询的 prompt,用于发送给 AI 模型。
+
+        参数:
+        - question: 需要查询的问题(字符串)
+
+        返回:
+        - 返回一个格式化的 prompt 字符串,用于向 AI 提问
+        """
+        prompt = f"""
+    你是一个智能助手,能够帮助我查询任何问题并返回答案。你的任务是接收到我给定的问题后,通过网络查询相关的信息,并以 JSON 格式返回查询结果。
+
+    问题:{question}
+
+    请查询并返回该问题的答案。返回的 JSON 应该包含以下字段:
+    - "question": 问题的原始文本。
+    - "answer": 问题的答案内容。
+    - "source": 如果有来源,提供信息来源链接或描述。
+    - "status": 表示查询是否成功,"1代表成功,0代表失败"。
+
+    如果无法找到答案,返回 "status": "failure",并且 "answer" 字段应为 "No answer found"。
+
+    例子:
+    {{
+        "question": "什么是量子计算?",
+        "answer": "量子计算是一种基于量子力学原理的计算模型。",
+        "source": "https://example.com/quantum-computing",
+        "status": 1
+    }}
+    """
+        return prompt
+
+    async def search_with_deepseek(self, query):
+        prompt = self.create_query_prompt(query)
+        response = await fetch_deepseek_completion(
+            model="DeepSeek-V3", prompt=prompt, output_type="json"
+        )
+        return response
+
+    @staticmethod
+    def select_prompt(chat_res, search_res):
+        # 创建一个 prompt 请求给大模型
+        prompt = f"""
+        这是一个问题的回答任务,以下是两个来源的结果:
+
+        1. **RAG 搜索回答**:
+        问题: {chat_res["query"]}
+        总结: {chat_res["summary"]}
+        相关度评分: {chat_res["relevance_score"]}
+        状态: {"可以回答" if chat_res["status"] == 1 else "无法回答"}
+
+        2. **AI 搜索结果**:
+        问题: {search_res["question"]}
+        答案: {search_res["answer"]}
+        来源: {search_res["source"]}
+        状态: {"可以回答" if search_res["status"] == 1 else "无法回答"}
+
+        基于这两个结果,请你综合判断并生成一个更好的答案,如果可能的话。你可以选择结合 `chat_res` 和 `search_res`,或者选择其中更合适的一个进行回答。如果没有足够的信息可以回答,请用你自己已有的知识回答"。
+        
+         请返回以下格式的 JSON 结果:
+        {{
+            "result": "<最终的答案>",
+            "relevance_score": <0到1之间的小数,表示总结与问题的相关度>,
+            "status": <1代表回答的好,0代表回答的不好>
+        }}
+        """
+
+        return prompt
+
+    async def select_with_deepseek(self, chat_res, search_res):
+        prompt = self.select_prompt(chat_res, search_res)
+        response = await fetch_deepseek_completion(
+            model="DeepSeek-V3", prompt=prompt, output_type="json"
+        )
+        return response

+ 23 - 4
applications/utils/mysql/mapper.py

@@ -37,14 +37,33 @@ class Dataset(BaseMySQLClient):
 
 class ChatResult(BaseMySQLClient):
     async def insert_chat_result(
-        self, query_text, dataset_ids, search_res, chat_res, score, has_answer
+        self,
+        query_text,
+        dataset_ids,
+        search_res,
+        chat_res,
+        score,
+        has_answer,
+        ai_answer,
+        ai_source,
+        ai_status,
     ):
         query = """
                     INSERT INTO chat_res
-                        (query, dataset_ids, search_res, chat_res, score, has_answer) 
-                        VALUES (%s, %s, %s, %s, %s, %s);
+                        (query, dataset_ids, search_res, chat_res, score, has_answer, ai_answer, ai_source, ai_status) 
+                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);
                 """
         return await self.pool.async_save(
             query=query,
-            params=(query_text, dataset_ids, search_res, chat_res, score, has_answer),
+            params=(
+                query_text,
+                dataset_ids,
+                search_res,
+                chat_res,
+                score,
+                has_answer,
+                ai_answer,
+                ai_source,
+                ai_status,
+            ),
         )

+ 14 - 35
mcp_server/server.py

@@ -1,3 +1,4 @@
+import asyncio
 import json
 from typing import Any, Dict, List
 
@@ -5,8 +6,8 @@ import mcp.types as types
 from mcp.server.lowlevel import Server
 
 from applications.resource import get_resource_manager
-from applications.utils.chat import ChatClassifier
-from applications.utils.mysql import ContentChunks, Contents, ChatResult
+from applications.utils.chat import RAGChatAgent
+from applications.utils.mysql import ChatResult
 from routes.buleprint import query_search
 
 
@@ -63,48 +64,26 @@ async def rag_search(query_text: str):
     )
 
     resource = get_resource_manager()
-    content_chunk_mapper = ContentChunks(resource.mysql_client)
-    contents_mapper = Contents(resource.mysql_client)
     chat_result_mapper = ChatResult(resource.mysql_client)
-
-    res = []
-    for result in query_results["results"]:
-        content_chunks = await content_chunk_mapper.select_chunk_content(
-            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
-        )
-        contents = await contents_mapper.select_content_by_doc_id(result["doc_id"])
-        if not content_chunks:
-            return {"status_code": 500, "detail": "content_chunk not found", "data": {}}
-        if not contents:
-            return {"status_code": 500, "detail": "contents not found", "data": {}}
-
-        content_chunk = content_chunks[0]
-        content = contents[0]
-        res.append(
-            {
-                "contentChunk": content_chunk["text"],
-                "contentSummary": content_chunk["summary"],
-                "content": content["text"],
-                "score": result["score"],
-            }
-        )
-
-    chat_classifier = ChatClassifier()
-    chat_res = await chat_classifier.chat_with_deepseek(query_text, res)
-
+    rag_chat_agent = RAGChatAgent()
+    chat_res = await rag_chat_agent.chat_with_deepseek(query_text, query_results)
+    deepseek_search = await rag_chat_agent.search_with_deepseek(query_text)
+    select = await rag_chat_agent.select_with_deepseek(chat_res, deepseek_search)
     data = {
-        "result": chat_res["summary"],
-        "status": chat_res["status"],
-        "relevance_score": chat_res["relevance_score"],
+        "result": select["result"],
+        "status": select["status"],
+        "relevance_score": select["relevance_score"],
     }
-
     await chat_result_mapper.insert_chat_result(
         query_text,
         dataset_id_strs,
-        json.dumps(res, ensure_ascii=False),
+        json.dumps(query_results, ensure_ascii=False),
         chat_res["summary"],
         chat_res["relevance_score"],
         chat_res["status"],
+        deepseek_search["answer"],
+        deepseek_search["source"],
+        deepseek_search["status"],
     )
 
     return data

+ 44 - 129
routes/buleprint.py

@@ -18,7 +18,7 @@ from applications.config import (
 )
 from applications.resource import get_resource_manager
 from applications.search import HybridSearch
-from applications.utils.chat import ChatClassifier
+from applications.utils.chat import RAGChatAgent
 from applications.utils.mysql import Dataset, Contents, ContentChunks, ChatResult
 
 server_bp = Blueprint("api", __name__, url_prefix="/api")
@@ -322,13 +322,33 @@ async def query_search(
                     sort_by=sort_by,
                     milvus_size=milvus_size,
                 )
-                return response
             case "strategy":
                 return None
             case _:
                 return None
     except Exception as e:
         return None
+    if response is None:
+        return None
+    resource = get_resource_manager()
+    content_chunk_mapper = ContentChunks(resource.mysql_client)
+    res = []
+    for result in response["results"]:
+        content_chunks = await content_chunk_mapper.select_chunk_content(
+            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
+        )
+        if content_chunks:
+            content_chunk = content_chunks[0]
+            res.append(
+                {
+                    "docId": content_chunk["doc_id"],
+                    "content": content_chunk["text"],
+                    "contentSummary": content_chunk["summary"],
+                    "score": result["score"],
+                    "datasetId": content_chunk["dataset_id"],
+                }
+            )
+    return res
 
 
 @server_bp.route("/query", methods=["GET"])
@@ -342,39 +362,13 @@ async def query():
         search_type=search_type,
     )
     resource = get_resource_manager()
-    content_chunk_mapper = ContentChunks(resource.mysql_client)
     dataset_mapper = Dataset(resource.mysql_client)
-    res = []
-    for result in query_results["results"]:
-        content_chunks = await content_chunk_mapper.select_chunk_content(
-            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
-        )
-        if not content_chunks:
-            return jsonify(
-                {"status_code": 500, "detail": "content_chunk not found", "data": {}}
-            )
-        content_chunk = content_chunks[0]
-        datasets = await dataset_mapper.select_dataset_by_id(
-            content_chunk["dataset_id"]
-        )
-        if not datasets:
-            return jsonify(
-                {"status_code": 500, "detail": "dataset not found", "data": {}}
-            )
-        dataset = datasets[0]
-        dataset_name = None
-        if dataset:
-            dataset_name = dataset["name"]
-        res.append(
-            {
-                "docId": content_chunk["doc_id"],
-                "content": content_chunk["text"],
-                "contentSummary": content_chunk["summary"],
-                "score": result["score"],
-                "datasetName": dataset_name,
-            }
-        )
-    data = {"results": res}
+    for result in query_results:
+        datasets = await dataset_mapper.select_dataset_by_id(result["datasetId"])
+        if datasets:
+            dataset_name = datasets[0]["name"]
+            result["datasetName"] = dataset_name
+    data = {"results": query_results}
     return jsonify({"status_code": 200, "detail": "success", "data": data})
 
 
@@ -390,43 +384,20 @@ async def chat():
         search_type=search_type,
     )
     resource = get_resource_manager()
-    content_chunk_mapper = ContentChunks(resource.mysql_client)
-    dataset_mapper = Dataset(resource.mysql_client)
     chat_result_mapper = ChatResult(resource.mysql_client)
-    res = []
-    for result in query_results["results"]:
-        content_chunks = await content_chunk_mapper.select_chunk_content(
-            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
-        )
-        if not content_chunks:
-            return jsonify(
-                {"status_code": 500, "detail": "content_chunk not found", "data": {}}
-            )
-        content_chunk = content_chunks[0]
-        datasets = await dataset_mapper.select_dataset_by_id(
-            content_chunk["dataset_id"]
-        )
-        if not datasets:
-            return jsonify(
-                {"status_code": 500, "detail": "dataset not found", "data": {}}
-            )
-        dataset = datasets[0]
-        dataset_name = None
-        if dataset:
-            dataset_name = dataset["name"]
-        res.append(
-            {
-                "docId": content_chunk["doc_id"],
-                "content": content_chunk["text"],
-                "contentSummary": content_chunk["summary"],
-                "score": result["score"],
-                "datasetName": dataset_name,
-            }
-        )
-
-    chat_classifier = ChatClassifier()
-    chat_res = await chat_classifier.chat_with_deepseek(query_text, res)
-    data = {"results": res, "chat_res": chat_res["summary"]}
+    resource = get_resource_manager()
+    dataset_mapper = Dataset(resource.mysql_client)
+    for result in query_results:
+        datasets = await dataset_mapper.select_dataset_by_id(result["datasetId"])
+        if datasets:
+            dataset_name = datasets[0]["name"]
+            result["datasetName"] = dataset_name
+
+    rag_chat_agent = RAGChatAgent()
+    chat_res = await rag_chat_agent.chat_with_deepseek(query_text, query_results)
+    deepseek_search = await rag_chat_agent.search_with_deepseek(query_text)
+    select = await rag_chat_agent.select_with_deepseek(chat_res, deepseek_search)
+    data = {"results": query_results, "chat_res": select["result"]}
     await chat_result_mapper.insert_chat_result(
         query_text,
         dataset_id_strs,
@@ -434,6 +405,9 @@ async def chat():
         chat_res["summary"],
         chat_res["relevance_score"],
         chat_res["status"],
+        deepseek_search["answer"],
+        deepseek_search["source"],
+        deepseek_search["status"],
     )
     return jsonify({"status_code": 200, "detail": "success", "data": data})
 
@@ -485,65 +459,6 @@ async def chunk_list():
     )
 
 
-@server_bp.route("/chat/detail", methods=["POST"])
-async def chat_detail():
-    body = await request.get_json()
-    query_text = body.get("query")
-    dataset_id_strs = "11,12"
-    dataset_ids = dataset_id_strs.split(",")
-    search_type = "hybrid"
-    query_results = await query_search(
-        query_text=query_text,
-        filters={"dataset_id": dataset_ids},
-        search_type=search_type,
-    )
-    resource = get_resource_manager()
-    content_chunk_mapper = ContentChunks(resource.mysql_client)
-    contents_mapper = Contents(resource.mysql_client)
-    chat_result_mapper = ChatResult(resource.mysql_client)
-    res = []
-    for result in query_results["results"]:
-        content_chunks = await content_chunk_mapper.select_chunk_content(
-            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
-        )
-        contents = await contents_mapper.select_content_by_doc_id(result["doc_id"])
-        if not content_chunks:
-            return jsonify(
-                {"status_code": 500, "detail": "content_chunk not found", "data": {}}
-            )
-        if not contents:
-            return jsonify(
-                {"status_code": 500, "detail": "contents not found", "data": {}}
-            )
-        content_chunk = content_chunks[0]
-        content = contents[0]
-        res.append(
-            {
-                "contentChunk": content_chunk["text"],
-                "contentSummary": content_chunk["summary"],
-                "content": content["text"],
-                "score": result["score"],
-            }
-        )
-
-    chat_classifier = ChatClassifier()
-    chat_res = await chat_classifier.chat_with_deepseek(query_text, res)
-    data = {
-        "result": chat_res["summary"],
-        "status": chat_res["status"],
-        "metaData": res,
-    }
-    await chat_result_mapper.insert_chat_result(
-        query_text,
-        dataset_id_strs,
-        json.dumps(data, ensure_ascii=False),
-        chat_res["summary"],
-        chat_res["relevance_score"],
-        chat_res["status"],
-    )
-    return jsonify({"status_code": 200, "detail": "success", "data": data})
-
-
 @server_bp.route("/auto_rechunk", methods=["GET"])
 async def auto_rechunk():
     resource = get_resource_manager()