import asyncio import json from typing import Any, Dict, List import mcp.types as types from mcp.server.lowlevel import Server from applications.resource import get_resource_manager from applications.utils.chat import RAGChatAgent from applications.utils.mysql import ChatResult from applications.api.qwen import QwenClient from applications.utils.spider.study import study from applications.utils.task.async_task import query_search, process_question def create_mcp_server() -> Server: """创建并配置MCP服务器""" app = Server("mcp-rag-server") @app.call_tool() async def call_tool( name: str, arguments: Dict[str, Any] ) -> List[types.TextContent]: """处理工具调用""" # ctx = app.request_context if name == "rag-search": data = await rag_search(arguments["query_text"]) result = json.dumps(data, ensure_ascii=False, indent=2) else: raise ValueError(f"Unknown tool: {name}") return [types.TextContent(type="text", text=result)] @app.list_tools() async def list_tools() -> List[types.Tool]: return [ types.Tool( name="rag-search", title="RAG搜索", description="搜索内容并生成总结", inputSchema={ "type": "object", "properties": { "query_text": { "type": "string", "description": "用户输入的查询文本", } }, "required": ["query_text"], # 只强制 query_text 必填 "additionalProperties": False, }, ), ] return app async def rag_search(query_text: str): rag_chat_agent = RAGChatAgent() split_questions = [] # spilt_query = await rag_chat_agent.split_query(query_text) # split_questions = spilt_query["split_questions"] split_questions.append(query_text) resource = get_resource_manager() qwen_client = QwenClient() rag_chat_agent = RAGChatAgent() # 使用asyncio.gather并行处理每个问题 tasks = [ process_question(question, resource, qwen_client, rag_chat_agent) for question in split_questions ] # 等待所有任务完成并收集结果 data_list = await asyncio.gather(*tasks) return data_list