import asyncio import json from typing import Any, Dict, List import mcp.types as types from mcp.server.lowlevel import Server from applications.resource import get_resource_manager from applications.utils.chat import RAGChatAgent from applications.utils.mysql import ChatResult from routes.buleprint import query_search def create_mcp_server() -> Server: """创建并配置MCP服务器""" app = Server("mcp-rag-server") @app.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: """处理工具调用""" # ctx = app.request_context if name == "rag-search": data = await rag_search(arguments["query_text"]) result = json.dumps(data, ensure_ascii=False, indent=2) else: raise ValueError(f"Unknown tool: {name}") return [types.TextContent(type="text", text=result)] @app.list_tools() async def list_tools() -> List[types.Tool]: return [ types.Tool( name="rag-search", title = 'RAG搜索', description="搜索内容并生成总结", inputSchema={ "type": "object", "properties": { "query_text": { "type": "string", "description": "用户输入的查询文本", } }, "required": ["query_text"], # 只强制 query_text 必填 "additionalProperties": False, }, ), ] return app async def rag_search(query_text: str) : dataset_id_strs = "11,12" dataset_ids = dataset_id_strs.split(",") search_type = "hybrid" query_results = await query_search( query_text=query_text, filters={"dataset_id": dataset_ids}, search_type=search_type, ) resource = get_resource_manager() chat_result_mapper = ChatResult(resource.mysql_client) rag_chat_agent = RAGChatAgent() chat_res = await rag_chat_agent.chat_with_deepseek(query_text, query_results) deepseek_search = await rag_chat_agent.search_with_deepseek(query_text) select = await rag_chat_agent.select_with_deepseek(chat_res, deepseek_search) data = { "result": select["result"], "status": select["status"], "relevance_score": select["relevance_score"], } await chat_result_mapper.insert_chat_result( query_text, dataset_id_strs, json.dumps(query_results, ensure_ascii=False), chat_res["summary"], chat_res["relevance_score"], chat_res["status"], deepseek_search['answer'], deepseek_search['source'], deepseek_search['status'] ) return data