1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- import asyncio
- import json
- from typing import Any, Dict, List
- import mcp.types as types
- from mcp.server.lowlevel import Server
- from applications.resource import get_resource_manager
- from applications.utils.chat import RAGChatAgent
- from applications.utils.mysql import ChatResult
- from routes.buleprint import query_search
- def create_mcp_server() -> Server:
- """创建并配置MCP服务器"""
- app = Server("mcp-rag-server")
- @app.call_tool()
- async def call_tool(
- name: str, arguments: Dict[str, Any]
- ) -> List[types.TextContent]:
- """处理工具调用"""
- # ctx = app.request_context
- if name == "rag-search":
- data = await rag_search(arguments["query_text"])
- result = json.dumps(data, ensure_ascii=False, indent=2)
- else:
- raise ValueError(f"Unknown tool: {name}")
- return [types.TextContent(type="text", text=result)]
- @app.list_tools()
- async def list_tools() -> List[types.Tool]:
- return [
- types.Tool(
- name="rag-search",
- title="RAG搜索",
- description="搜索内容并生成总结",
- inputSchema={
- "type": "object",
- "properties": {
- "query_text": {
- "type": "string",
- "description": "用户输入的查询文本",
- }
- },
- "required": ["query_text"], # 只强制 query_text 必填
- "additionalProperties": False,
- },
- ),
- ]
- return app
- async def rag_search(query_text: str):
- dataset_id_strs = "11,12"
- dataset_ids = dataset_id_strs.split(",")
- search_type = "hybrid"
- query_results = await query_search(
- query_text=query_text,
- filters={"dataset_id": dataset_ids},
- search_type=search_type,
- )
- resource = get_resource_manager()
- chat_result_mapper = ChatResult(resource.mysql_client)
- rag_chat_agent = RAGChatAgent()
- chat_res = await rag_chat_agent.chat_with_deepseek(query_text, query_results)
- deepseek_search = await rag_chat_agent.search_with_deepseek(query_text)
- select = await rag_chat_agent.select_with_deepseek(chat_res, deepseek_search)
- data = {
- "result": select["result"],
- "status": select["status"],
- "relevance_score": select["relevance_score"],
- }
- await chat_result_mapper.insert_chat_result(
- query_text,
- dataset_id_strs,
- json.dumps(query_results, ensure_ascii=False),
- chat_res["summary"],
- chat_res["relevance_score"],
- chat_res["status"],
- deepseek_search["answer"],
- deepseek_search["source"],
- deepseek_search["status"],
- )
- return data
|