import json from typing import Any, Dict, List import mcp.types as types from mcp.server.lowlevel import Server from applications.resource import get_resource_manager from applications.utils.chat import ChatClassifier from applications.utils.mysql import ContentChunks, Contents, ChatResult from routes.buleprint import query_search def create_mcp_server() -> Server: """创建并配置MCP服务器""" app = Server("mcp-rag-server") @app.call_tool() async def call_tool( name: str, arguments: Dict[str, Any] ) -> List[types.TextContent]: """处理工具调用""" # ctx = app.request_context if name == "rag-search": data = await rag_search(arguments["query_text"]) result = json.dumps(data, ensure_ascii=False, indent=2) else: raise ValueError(f"Unknown tool: {name}") return [types.TextContent(type="text", text=result)] @app.list_tools() async def list_tools() -> List[types.Tool]: return [ types.Tool( name="rag-search", title="RAG搜索", description="搜索内容并生成总结", inputSchema={ "type": "object", "properties": { "query_text": { "type": "string", "description": "用户输入的查询文本", } }, "required": ["query_text"], # 只强制 query_text 必填 "additionalProperties": False, }, ), ] return app async def rag_search(query_text: str): dataset_id_strs = "11,12" dataset_ids = dataset_id_strs.split(",") search_type = "hybrid" query_results = await query_search( query_text=query_text, filters={"dataset_id": dataset_ids}, search_type=search_type, ) resource = get_resource_manager() content_chunk_mapper = ContentChunks(resource.mysql_client) contents_mapper = Contents(resource.mysql_client) chat_result_mapper = ChatResult(resource.mysql_client) res = [] for result in query_results["results"]: content_chunks = await content_chunk_mapper.select_chunk_content( doc_id=result["doc_id"], chunk_id=result["chunk_id"] ) contents = await contents_mapper.select_content_by_doc_id(result["doc_id"]) if not content_chunks: return {"status_code": 500, "detail": "content_chunk not found", "data": {}} if not contents: return {"status_code": 500, "detail": "contents not found", "data": {}} content_chunk = content_chunks[0] content = contents[0] res.append( { "contentChunk": content_chunk["text"], "contentSummary": content_chunk["summary"], "content": content["text"], "score": result["score"], } ) chat_classifier = ChatClassifier() chat_res = await chat_classifier.chat_with_deepseek(query_text, res) data = { "result": chat_res["summary"], "status": chat_res["status"], "relevance_score": chat_res["relevance_score"], } await chat_result_mapper.insert_chat_result( query_text, dataset_id_strs, json.dumps(res, ensure_ascii=False), chat_res["summary"], chat_res["relevance_score"], chat_res["status"], ) return data