Bladeren bron

修改方法为后台异步处理

xueyiming 1 dag geleden
bovenliggende
commit
8ae26b7f5d
4 gewijzigde bestanden met toevoegingen van 203 en 158 verwijderingen
  1. 0 0
      applications/utils/task/__init__.py
  2. 197 0
      applications/utils/task/async_task.py
  3. 1 1
      mcp_server/server.py
  4. 5 157
      routes/blueprint.py

+ 0 - 0
applications/utils/task/__init__.py


+ 197 - 0
applications/utils/task/async_task.py

@@ -0,0 +1,197 @@
+import json
+import os
+import uuid
+
+from applications.api import get_basic_embedding
+from applications.api.qwen import QwenClient
+from applications.async_task import ChunkBooksTask
+from applications.config import BASE_MILVUS_SEARCH_PARAMS, DEFAULT_MODEL
+from applications.resource import get_resource_manager
+from applications.search import HybridSearch
+from applications.utils.mysql import Books, ChatResult, ContentChunks
+from applications.utils.oss.oss_client import OSSClient
+from applications.utils.pdf.book_extract import book_extract
+from applications.utils.spider.study import study
+
+async def handle_books():
+    try:
+        # 获取资源管理器和客户端
+        resource = get_resource_manager()
+        books_mapper = Books(resource.mysql_client)
+        oss_client = OSSClient()
+
+        # 获取待处理的书籍列表
+        books = await books_mapper.select_init_books()
+
+        for book in books:
+            book_id = book.get("book_id")
+            # 获取提取状态
+            extract_status = (await books_mapper.select_book_extract_status(book_id))[0].get("extract_status")
+
+            if extract_status == 0:
+                # 更新提取状态为处理中
+                await books_mapper.update_book_extract_status(book_id, 1)
+                book_path = os.path.join("/tmp", book_id)
+
+                if not os.path.exists(book_path):
+                    oss_path = f"rag/pdfs/{book_id}"
+                    try:
+                        # 下载书籍文件
+                        await oss_client.download_file(oss_path, book_path)
+                    except Exception as e:
+                        continue  # 如果下载失败,跳过该书籍
+
+                try:
+                    # 提取书籍内容
+                    res = await book_extract(book_path, book_id)
+                    if res:
+                        content_list = res.get("results", {}).get(book_id, {}).get("content_list", [])
+                        if content_list:
+                            # 更新提取结果
+                            await books_mapper.update_book_extract_result(book_id, content_list)
+
+                        # 创建文档 ID
+                        doc_id = f"doc-{uuid.uuid4()}"
+                        chunk_task = ChunkBooksTask(doc_id=doc_id, resource=resource)
+
+                        # 处理分片任务
+                        body = {"book_id": book_id}
+                        await chunk_task.deal(body)  # 异步执行分片任务
+
+                except Exception as e:
+                    continue  # 如果提取过程失败,跳过该书籍
+
+    except Exception as e:
+        # 捕获整体异常
+        print(f"处理请求失败,错误: {e}")
+
+
+async def process_question(question, query_text, rag_chat_agent):
+    try:
+        dataset_id_strs = "11,12"
+        dataset_ids = dataset_id_strs.split(",")
+        search_type = "hybrid"
+
+        # 执行查询任务
+        query_results = await query_search(
+            query_text=question,
+            filters={"dataset_id": dataset_ids},
+            search_type=search_type,
+        )
+
+        resource = get_resource_manager()
+        chat_result_mapper = ChatResult(resource.mysql_client)
+
+        # 异步执行 chat 与 deepseek 的对话
+        chat_result = await rag_chat_agent.chat_with_deepseek(question, query_results)
+
+        # # 判断是否需要执行 study
+        study_task_id = None
+        if chat_result["status"] == 0:
+            study_task_id = study(question)["task_id"]
+
+        qwen_client = QwenClient()
+        llm_search = qwen_client.search_and_chat(
+            user_prompt=question
+        )
+        decision = await rag_chat_agent.make_decision(
+            question, chat_result, llm_search
+        )
+
+        # 构建返回的数据
+        data = {
+            "query": question,
+            "result": decision["result"],
+            "status": decision["status"],
+            "relevance_score": decision["relevance_score"],
+            # "used_tools": decision["used_tools"],
+        }
+
+        # 插入数据库
+        await chat_result_mapper.insert_chat_result(
+            question,
+            dataset_id_strs,
+            json.dumps(query_results, ensure_ascii=False),
+            chat_result["summary"],
+            chat_result["relevance_score"],
+            chat_result["status"],
+            llm_search["content"],
+            json.dumps(llm_search["search_results"], ensure_ascii=False),
+            1,
+            decision["result"],
+            study_task_id,
+        )
+        return data
+    except Exception as e:
+        print(f"Error processing question: {question}. Error: {str(e)}")
+        return {"query": question, "error": str(e)}
+
+
+async def query_search(
+        query_text,
+        filters=None,
+        search_type="",
+        anns_field="vector_text",
+        search_params=BASE_MILVUS_SEARCH_PARAMS,
+        _source=False,
+        es_size=10000,
+        sort_by=None,
+        milvus_size=20,
+        limit=10,
+):
+    if filters is None:
+        filters = {}
+    query_vector = await get_basic_embedding(text=query_text, model=DEFAULT_MODEL)
+    resource = get_resource_manager()
+    search_engine = HybridSearch(
+        milvus_pool=resource.milvus_client,
+        es_pool=resource.es_client,
+        graph_pool=resource.graph_client,
+    )
+    try:
+        match search_type:
+            case "base":
+                response = await search_engine.base_vector_search(
+                    query_vec=query_vector,
+                    anns_field=anns_field,
+                    search_params=search_params,
+                    limit=limit,
+                )
+                return response
+            case "hybrid":
+                response = await search_engine.hybrid_search(
+                    filters=filters,
+                    query_vec=query_vector,
+                    anns_field=anns_field,
+                    search_params=search_params,
+                    es_size=es_size,
+                    sort_by=sort_by,
+                    milvus_size=milvus_size,
+                )
+            case "strategy":
+                return None
+            case _:
+                return None
+    except Exception as e:
+        return None
+    if response is None:
+        return None
+    resource = get_resource_manager()
+    content_chunk_mapper = ContentChunks(resource.mysql_client)
+    res = []
+    for result in response["results"]:
+        content_chunks = await content_chunk_mapper.select_chunk_content(
+            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
+        )
+        if content_chunks:
+            content_chunk = content_chunks[0]
+            res.append(
+                {
+                    "docId": content_chunk["doc_id"],
+                    "content": content_chunk["text"],
+                    "contentSummary": content_chunk["summary"],
+                    "score": result["score"],
+                    "datasetId": content_chunk["dataset_id"],
+                }
+            )
+    return res[:limit]

+ 1 - 1
mcp_server/server.py

@@ -10,7 +10,7 @@ from applications.utils.chat import RAGChatAgent
 from applications.utils.mysql import ChatResult
 from applications.api.qwen import QwenClient
 from applications.utils.spider.study import study
-from routes.blueprint import query_search
+from applications.utils.task.async_task import query_search
 
 
 def create_mcp_server() -> Server:

+ 5 - 157
routes/blueprint.py

@@ -23,8 +23,7 @@ from applications.utils.chat import RAGChatAgent
 from applications.utils.mysql import Dataset, Contents, ContentChunks, ChatResult, Books
 from applications.api.qwen import QwenClient
 from applications.utils.oss.oss_client import OSSClient
-from applications.utils.pdf.book_extract import book_extract
-from applications.utils.spider.study import study
+from applications.utils.task.async_task import handle_books, process_question, query_search
 
 server_bp = Blueprint("api", __name__, url_prefix="/api")
 server_bp = cors(server_bp, allow_origin="*")
@@ -315,76 +314,6 @@ async def content_list():
     )
 
 
-async def query_search(
-    query_text,
-    filters=None,
-    search_type="",
-    anns_field="vector_text",
-    search_params=BASE_MILVUS_SEARCH_PARAMS,
-    _source=False,
-    es_size=10000,
-    sort_by=None,
-    milvus_size=20,
-    limit=10,
-):
-    if filters is None:
-        filters = {}
-    query_vector = await get_basic_embedding(text=query_text, model=DEFAULT_MODEL)
-    resource = get_resource_manager()
-    search_engine = HybridSearch(
-        milvus_pool=resource.milvus_client,
-        es_pool=resource.es_client,
-        graph_pool=resource.graph_client,
-    )
-    try:
-        match search_type:
-            case "base":
-                response = await search_engine.base_vector_search(
-                    query_vec=query_vector,
-                    anns_field=anns_field,
-                    search_params=search_params,
-                    limit=limit,
-                )
-                return response
-            case "hybrid":
-                response = await search_engine.hybrid_search(
-                    filters=filters,
-                    query_vec=query_vector,
-                    anns_field=anns_field,
-                    search_params=search_params,
-                    es_size=es_size,
-                    sort_by=sort_by,
-                    milvus_size=milvus_size,
-                )
-            case "strategy":
-                return None
-            case _:
-                return None
-    except Exception as e:
-        return None
-    if response is None:
-        return None
-    resource = get_resource_manager()
-    content_chunk_mapper = ContentChunks(resource.mysql_client)
-    res = []
-    for result in response["results"]:
-        content_chunks = await content_chunk_mapper.select_chunk_content(
-            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
-        )
-        if content_chunks:
-            content_chunk = content_chunks[0]
-            res.append(
-                {
-                    "docId": content_chunk["doc_id"],
-                    "content": content_chunk["text"],
-                    "contentSummary": content_chunk["summary"],
-                    "score": result["score"],
-                    "datasetId": content_chunk["dataset_id"],
-                }
-            )
-    return res[:limit]
-
-
 @server_bp.route("/query", methods=["GET"])
 async def query():
     query_text = request.args.get("query")
@@ -552,66 +481,6 @@ async def rag_search():
     return jsonify({"status_code": 200, "detail": "success", "data": data_list})
 
 
-async def process_question(question, query_text, rag_chat_agent):
-    try:
-        dataset_id_strs = "11,12"
-        dataset_ids = dataset_id_strs.split(",")
-        search_type = "hybrid"
-
-        # 执行查询任务
-        query_results = await query_search(
-            query_text=question,
-            filters={"dataset_id": dataset_ids},
-            search_type=search_type,
-        )
-
-        resource = get_resource_manager()
-        chat_result_mapper = ChatResult(resource.mysql_client)
-
-        # 异步执行 chat 与 deepseek 的对话
-        chat_result = await rag_chat_agent.chat_with_deepseek(question, query_results)
-
-        # # 判断是否需要执行 study
-        study_task_id = None
-        if chat_result["status"] == 0:
-            study_task_id = study(question)["task_id"]
-
-        qwen_client = QwenClient()
-        llm_search = qwen_client.search_and_chat(
-            user_prompt=question
-        )
-        decision = await rag_chat_agent.make_decision(
-            question, chat_result, llm_search
-        )
-
-        # 构建返回的数据
-        data = {
-            "query": question,
-            "result": decision["result"],
-            "status": decision["status"],
-            "relevance_score": decision["relevance_score"],
-            # "used_tools": decision["used_tools"],
-        }
-
-        # 插入数据库
-        await chat_result_mapper.insert_chat_result(
-            question,
-            dataset_id_strs,
-            json.dumps(query_results, ensure_ascii=False),
-            chat_result["summary"],
-            chat_result["relevance_score"],
-            chat_result["status"],
-            llm_search["content"],
-            json.dumps(llm_search["search_results"], ensure_ascii=False),
-            1,
-            decision["result"],
-            study_task_id,
-        )
-        return data
-    except Exception as e:
-        print(f"Error processing question: {question}. Error: {str(e)}")
-        return {"query": question, "error": str(e)}
-
 
 @server_bp.route("/chat/history", methods=["GET"])
 async def chat_history():
@@ -695,28 +564,7 @@ async def upload_pdf():
 
 @server_bp.route("/process/book", methods=["GET"])
 async def process_book():
-    resource = get_resource_manager()
-    books_mapper = Books(resource.mysql_client)
-    oss_client = OSSClient()
-    books = await books_mapper.select_init_books()
-    for book in books:
-        extract_status = books_mapper.select_book_extract_status(book.get("book_id"))[
-            0
-        ]["extract_status"]
-        if extract_status == 0:
-            await books_mapper.update_book_extract_status(book.get("book_id"), 1)
-            book_id = book.get("book_id")
-            book_path = os.path.join("/tmp", book.get("book_id"))
-            if not os.path.exists(book_path):
-                oss_path = f"rag/pdfs/{book_id}"
-                oss_client.download_file(oss_path, book_path)
-            res = await book_extract(book_path, book_id)
-            if res:
-                await books_mapper.update_book_extract_result(
-                    book_id, res.get("results").get(book_id).get("content_list")
-                )
-                doc_id = f"doc-{uuid.uuid4()}"
-                chunk_task = ChunkBooksTask(doc_id=doc_id, resource=resource)
-                body = {"book_id": book_id}
-                await chunk_task.deal(body)
-    return jsonify({"status": "success"})
+    # 创建异步任务来后台处理书籍
+    asyncio.create_task(handle_books())
+    # 返回立即响应
+    return jsonify({"status": "success", "message": "任务已提交后台处理"}), 200