Procházet zdrojové kódy

增加AI搜索,选择更好的答案返回

xueyiming před 1 týdnem
rodič
revize
43a3b49ee3
3 změnil soubory, kde provedl 127 přidání a 155 odebrání
  1. 78 0
      applications/utils/chat/chat_classifier.py
  2. 11 33
      mcp_server/server.py
  3. 38 122
      routes/buleprint.py

+ 78 - 0
applications/utils/chat/chat_classifier.py

@@ -63,3 +63,81 @@ class ChatClassifier:
             model="DeepSeek-V3", prompt=prompt, output_type="json"
         )
         return response
+
+    @staticmethod
+    def create_query_prompt(question: str) -> str:
+        """
+        封装生成查询的 prompt,用于发送给 AI 模型。
+
+        参数:
+        - question: 需要查询的问题(字符串)
+
+        返回:
+        - 返回一个格式化的 prompt 字符串,用于向 AI 提问
+        """
+        prompt = f"""
+    你是一个智能助手,能够帮助我查询任何问题并返回答案。你的任务是接收到我给定的问题后,通过网络查询相关的信息,并以 JSON 格式返回查询结果。
+
+    问题:{question}
+
+    请查询并返回该问题的答案。返回的 JSON 应该包含以下字段:
+    - "question": 问题的原始文本。
+    - "answer": 问题的答案内容。
+    - "source": 如果有来源,提供信息来源链接或描述。
+    - "status": 表示查询是否成功,"success" 或 "failure"。
+
+    如果无法找到答案,返回 "status": "failure",并且 "answer" 字段应为 "No answer found"。
+
+    例子:
+    {{
+        "question": "什么是量子计算?",
+        "answer": "量子计算是一种基于量子力学原理的计算模型。",
+        "source": "https://example.com/quantum-computing",
+        "status": "success"
+    }}
+    """
+        return prompt
+
+    async def search_with_deepseek(self, query):
+        prompt = self.create_query_prompt(query)
+        response = await fetch_deepseek_completion(
+            model="DeepSeek-V3", prompt=prompt, output_type="json"
+        )
+        return response
+
+    @staticmethod
+    def select_prompt(chat_res, search_res):
+        # 创建一个 prompt 请求给大模型
+        prompt = f"""
+        这是一个问题的回答任务,以下是两个来源的结果:
+
+        1. **RAG 搜索回答**:
+        问题: {chat_res['query']}
+        总结: {chat_res['summary']}
+        相关度评分: {chat_res['relevance_score']}
+        状态: {'可以回答' if chat_res['status'] == 1 else '无法回答'}
+
+        2. **AI 搜索结果**:
+        问题: {search_res['question']}
+        答案: {search_res['answer']}
+        来源: {search_res['source']}
+        查询状态: {search_res['status']}
+
+        基于这两个结果,请你综合判断并生成一个更好的答案,如果可能的话。你可以选择结合 `chat_res` 和 `search_res`,或者选择其中更合适的一个进行回答。如果没有足够的信息可以回答,请用你自己已有的知识回答"。
+        
+         请返回以下格式的 JSON 结果:
+        {{
+            "result": "<最终的答案>",
+            "relevance_score": <0到1之间的小数,表示总结与问题的相关度>,
+            "status": <1代表回答的好,0代表回答的不好>
+        }}
+        """
+
+        return prompt
+
+    async def select_with_deepseek(self, chat_res, search_res):
+        prompt = self.select_prompt(chat_res, search_res)
+        response = await fetch_deepseek_completion(
+            model="DeepSeek-V3", prompt=prompt, output_type="json"
+        )
+        return response

+ 11 - 33
mcp_server/server.py

@@ -1,3 +1,4 @@
+import asyncio
 import json
 from typing import Any, Dict, List
 
@@ -6,7 +7,7 @@ from mcp.server.lowlevel import Server
 
 from applications.resource import get_resource_manager
 from applications.utils.chat import ChatClassifier
-from applications.utils.mysql import ContentChunks, Contents, ChatResult
+from applications.utils.mysql import ChatResult
 from routes.buleprint import query_search
 
 
@@ -61,45 +62,20 @@ async def rag_search(query_text: str) :
     )
 
     resource = get_resource_manager()
-    content_chunk_mapper = ContentChunks(resource.mysql_client)
-    contents_mapper = Contents(resource.mysql_client)
     chat_result_mapper = ChatResult(resource.mysql_client)
-
-    res = []
-    for result in query_results["results"]:
-        content_chunks = await content_chunk_mapper.select_chunk_content(
-            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
-        )
-        contents = await contents_mapper.select_content_by_doc_id(result["doc_id"])
-        if not content_chunks:
-            return {"status_code": 500, "detail": "content_chunk not found", "data": {}}
-        if not contents:
-            return {"status_code": 500, "detail": "contents not found", "data": {}}
-
-        content_chunk = content_chunks[0]
-        content = contents[0]
-        res.append(
-            {
-                "contentChunk": content_chunk["text"],
-                "contentSummary": content_chunk["summary"],
-                "content": content["text"],
-                "score": result["score"],
-            }
-        )
-
     chat_classifier = ChatClassifier()
-    chat_res = await chat_classifier.chat_with_deepseek(query_text, res)
-
+    chat_res = await chat_classifier.chat_with_deepseek(query_text, query_results)
+    deepseek_search = await chat_classifier.search_with_deepseek(query_text)
+    select = await chat_classifier.select_with_deepseek(chat_res, deepseek_search)
     data = {
-        "result": chat_res["summary"],
-        "status": chat_res["status"],
-        "relevance_score": chat_res["relevance_score"],
+        "result": select["result"],
+        "status": select["status"],
+        "relevance_score": select["relevance_score"],
     }
-
     await chat_result_mapper.insert_chat_result(
         query_text,
         dataset_id_strs,
-        json.dumps(res, ensure_ascii=False),
+        json.dumps(query_results, ensure_ascii=False),
         chat_res["summary"],
         chat_res["relevance_score"],
         chat_res["status"],
@@ -110,3 +86,5 @@ async def rag_search(query_text: str) :
 
 
 
+
+

+ 38 - 122
routes/buleprint.py

@@ -322,13 +322,33 @@ async def query_search(
                     sort_by=sort_by,
                     milvus_size=milvus_size,
                 )
-                return response
             case "strategy":
                 return None
             case _:
                 return None
     except Exception as e:
         return None
+    if response is None:
+        return None
+    resource = get_resource_manager()
+    content_chunk_mapper = ContentChunks(resource.mysql_client)
+    res = []
+    for result in response["results"]:
+        content_chunks = await content_chunk_mapper.select_chunk_content(
+            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
+        )
+        if content_chunks:
+            content_chunk = content_chunks[0]
+            res.append(
+                {
+                    "docId": content_chunk["doc_id"],
+                    "content": content_chunk["text"],
+                    "contentSummary": content_chunk["summary"],
+                    "score": result["score"],
+                    "datasetId": content_chunk["dataset_id"],
+                }
+            )
+    return res
 
 
 @server_bp.route("/query", methods=["GET"])
@@ -342,39 +362,15 @@ async def query():
         search_type=search_type,
     )
     resource = get_resource_manager()
-    content_chunk_mapper = ContentChunks(resource.mysql_client)
     dataset_mapper = Dataset(resource.mysql_client)
-    res = []
-    for result in query_results["results"]:
-        content_chunks = await content_chunk_mapper.select_chunk_content(
-            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
-        )
-        if not content_chunks:
-            return jsonify(
-                {"status_code": 500, "detail": "content_chunk not found", "data": {}}
-            )
-        content_chunk = content_chunks[0]
+    for result in query_results:
         datasets = await dataset_mapper.select_dataset_by_id(
-            content_chunk["dataset_id"]
+            result["datasetId"]
         )
-        if not datasets:
-            return jsonify(
-                {"status_code": 500, "detail": "dataset not found", "data": {}}
-            )
-        dataset = datasets[0]
-        dataset_name = None
-        if dataset:
-            dataset_name = dataset["name"]
-        res.append(
-            {
-                "docId": content_chunk["doc_id"],
-                "content": content_chunk["text"],
-                "contentSummary": content_chunk["summary"],
-                "score": result["score"],
-                "datasetName": dataset_name,
-            }
-        )
-    data = {"results": res}
+        if datasets:
+            dataset_name = datasets[0]["name"]
+            result["datasetName"] = dataset_name
+    data = {"results": query_results}
     return jsonify({"status_code": 200, "detail": "success", "data": data})
 
 
@@ -390,43 +386,22 @@ async def chat():
         search_type=search_type,
     )
     resource = get_resource_manager()
-    content_chunk_mapper = ContentChunks(resource.mysql_client)
-    dataset_mapper = Dataset(resource.mysql_client)
     chat_result_mapper = ChatResult(resource.mysql_client)
-    res = []
-    for result in query_results["results"]:
-        content_chunks = await content_chunk_mapper.select_chunk_content(
-            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
-        )
-        if not content_chunks:
-            return jsonify(
-                {"status_code": 500, "detail": "content_chunk not found", "data": {}}
-            )
-        content_chunk = content_chunks[0]
+    resource = get_resource_manager()
+    dataset_mapper = Dataset(resource.mysql_client)
+    for result in query_results:
         datasets = await dataset_mapper.select_dataset_by_id(
-            content_chunk["dataset_id"]
-        )
-        if not datasets:
-            return jsonify(
-                {"status_code": 500, "detail": "dataset not found", "data": {}}
-            )
-        dataset = datasets[0]
-        dataset_name = None
-        if dataset:
-            dataset_name = dataset["name"]
-        res.append(
-            {
-                "docId": content_chunk["doc_id"],
-                "content": content_chunk["text"],
-                "contentSummary": content_chunk["summary"],
-                "score": result["score"],
-                "datasetName": dataset_name,
-            }
+            result["datasetId"]
         )
+        if datasets:
+            dataset_name = datasets[0]["name"]
+            result["datasetName"] = dataset_name
 
     chat_classifier = ChatClassifier()
-    chat_res = await chat_classifier.chat_with_deepseek(query_text, res)
-    data = {"results": res, "chat_res": chat_res["summary"]}
+    chat_res = await chat_classifier.chat_with_deepseek(query_text, query_results)
+    deepseek_search = await chat_classifier.search_with_deepseek(query_text)
+    select = await chat_classifier.select_with_deepseek(chat_res, deepseek_search)
+    data = {"results": query_results, "chat_res": select["result"]}
     await chat_result_mapper.insert_chat_result(
         query_text,
         dataset_id_strs,
@@ -485,65 +460,6 @@ async def chunk_list():
     )
 
 
-@server_bp.route("/chat/detail", methods=["POST"])
-async def chat_detail():
-    body = await request.get_json()
-    query_text = body.get("query")
-    dataset_id_strs = "11,12"
-    dataset_ids = dataset_id_strs.split(",")
-    search_type = "hybrid"
-    query_results = await query_search(
-        query_text=query_text,
-        filters={"dataset_id": dataset_ids},
-        search_type=search_type,
-    )
-    resource = get_resource_manager()
-    content_chunk_mapper = ContentChunks(resource.mysql_client)
-    contents_mapper = Contents(resource.mysql_client)
-    chat_result_mapper = ChatResult(resource.mysql_client)
-    res = []
-    for result in query_results["results"]:
-        content_chunks = await content_chunk_mapper.select_chunk_content(
-            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
-        )
-        contents = await contents_mapper.select_content_by_doc_id(result["doc_id"])
-        if not content_chunks:
-            return jsonify(
-                {"status_code": 500, "detail": "content_chunk not found", "data": {}}
-            )
-        if not contents:
-            return jsonify(
-                {"status_code": 500, "detail": "contents not found", "data": {}}
-            )
-        content_chunk = content_chunks[0]
-        content = contents[0]
-        res.append(
-            {
-                "contentChunk": content_chunk["text"],
-                "contentSummary": content_chunk["summary"],
-                "content": content["text"],
-                "score": result["score"],
-            }
-        )
-
-    chat_classifier = ChatClassifier()
-    chat_res = await chat_classifier.chat_with_deepseek(query_text, res)
-    data = {
-        "result": chat_res["summary"],
-        "status": chat_res["status"],
-        "metaData": res,
-    }
-    await chat_result_mapper.insert_chat_result(
-        query_text,
-        dataset_id_strs,
-        json.dumps(data, ensure_ascii=False),
-        chat_res["summary"],
-        chat_res["relevance_score"],
-        chat_res["status"],
-    )
-    return jsonify({"status_code": 200, "detail": "success", "data": data})
-
-
 @server_bp.route("/auto_rechunk", methods=["GET"])
 async def auto_rechunk():
     resource = get_resource_manager()