| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- import asyncio
- import json
- from typing import Any, Dict, List
- import mcp.types as types
- from mcp.server.lowlevel import Server
- from applications.resource import get_resource_manager
- from applications.utils.chat import RAGChatAgent
- from applications.utils.mysql import ChatResult
- from applications.api.qwen import QwenClient
- from applications.utils.spider.study import study
- from applications.utils.task.async_task import query_search, process_question
- def create_mcp_server() -> Server:
- """创建并配置MCP服务器"""
- app = Server("mcp-rag-server")
- @app.call_tool()
- async def call_tool(
- name: str, arguments: Dict[str, Any]
- ) -> List[types.TextContent]:
- """处理工具调用"""
- # ctx = app.request_context
- if name == "rag-search":
- data = await rag_search(arguments["query_text"])
- result = json.dumps(data, ensure_ascii=False, indent=2)
- else:
- raise ValueError(f"Unknown tool: {name}")
- return [types.TextContent(type="text", text=result)]
- @app.list_tools()
- async def list_tools() -> List[types.Tool]:
- return [
- types.Tool(
- name="rag-search",
- title="RAG搜索",
- description="搜索内容并生成总结",
- inputSchema={
- "type": "object",
- "properties": {
- "query_text": {
- "type": "string",
- "description": "用户输入的查询文本",
- }
- },
- "required": ["query_text"], # 只强制 query_text 必填
- "additionalProperties": False,
- },
- ),
- ]
- return app
- async def rag_search(query_text: str):
- rag_chat_agent = RAGChatAgent()
- split_questions = []
- # spilt_query = await rag_chat_agent.split_query(query_text)
- # split_questions = spilt_query["split_questions"]
- split_questions.append(query_text)
- resource = get_resource_manager()
- qwen_client = QwenClient()
- rag_chat_agent = RAGChatAgent()
- # 使用asyncio.gather并行处理每个问题
- tasks = [
- process_question(question, resource, qwen_client, rag_chat_agent)
- for question in split_questions
- ]
- # 等待所有任务完成并收集结果
- data_list = await asyncio.gather(*tasks)
- return data_list
|