import asyncio import json from typing import Any, Dict, List import mcp.types as types from mcp.server.lowlevel import Server from applications.resource import get_resource_manager from applications.utils.chat import RAGChatAgent from applications.utils.mysql import ChatResult from applications.utils.spider.study import study from routes.buleprint import query_search def create_mcp_server() -> Server: """创建并配置MCP服务器""" app = Server("mcp-rag-server") @app.call_tool() async def call_tool( name: str, arguments: Dict[str, Any] ) -> List[types.TextContent]: """处理工具调用""" # ctx = app.request_context if name == "rag-search": data = await rag_search(arguments["query_text"]) result = json.dumps(data, ensure_ascii=False, indent=2) else: raise ValueError(f"Unknown tool: {name}") return [types.TextContent(type="text", text=result)] @app.list_tools() async def list_tools() -> List[types.Tool]: return [ types.Tool( name="rag-search", title="RAG搜索", description="搜索内容并生成总结", inputSchema={ "type": "object", "properties": { "query_text": { "type": "string", "description": "用户输入的查询文本", } }, "required": ["query_text"], # 只强制 query_text 必填 "additionalProperties": False, }, ), ] return app async def rag_search(query_text: str): dataset_id_strs = "11,12" dataset_ids = dataset_id_strs.split(",") search_type = "hybrid" query_results = await query_search( query_text=query_text, filters={"dataset_id": dataset_ids}, search_type=search_type, ) resource = get_resource_manager() chat_result_mapper = ChatResult(resource.mysql_client) rag_chat_agent = RAGChatAgent() chat_result = await rag_chat_agent.chat_with_deepseek(query_text, query_results) study_task_id = None if chat_result["status"] == 0: study_task_id = study(query_text)['task_id'] llm_search_result = await rag_chat_agent.llm_search(query_text) decision = await rag_chat_agent.make_decision(chat_result, llm_search_result) data = { "result": decision["result"], "status": decision["status"], "relevance_score": decision["relevance_score"], } await chat_result_mapper.insert_chat_result( query_text, dataset_id_strs, json.dumps(query_results, ensure_ascii=False), chat_result["summary"], chat_result["relevance_score"], chat_result["status"], llm_search_result["answer"], llm_search_result["source"], llm_search_result["status"], decision["result"], study_task_id ) return data