123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- import json
- from typing import Any, Dict, List
- import mcp.types as types
- from mcp.server.lowlevel import Server
- from applications.resource import get_resource_manager
- from applications.utils.chat import ChatClassifier
- from applications.utils.mysql import ContentChunks, Contents, ChatResult
- from routes.buleprint import query_search
- import json
- from typing import Any, Dict, List
- import mcp.types as types
- from mcp.server.lowlevel import Server
- from applications.resource import get_resource_manager
- from applications.utils.chat import ChatClassifier
- from applications.utils.mysql import ContentChunks, Contents, ChatResult
- from routes.buleprint import query_search
- def create_mcp_server() -> Server:
- """创建并配置MCP服务器"""
- app = Server("mcp-rag-server")
- @app.call_tool()
- async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
- """处理工具调用"""
- # ctx = app.request_context
- if name == "rag-search":
- data = await rag_search(arguments["query_text"])
- result = json.dumps(data, ensure_ascii=False, indent=2)
- else:
- raise ValueError(f"Unknown tool: {name}")
- return [types.TextContent(type="text", text=result)]
- @app.list_tools()
- async def list_tools() -> List[types.Tool]:
- return [
- types.Tool(
- name="rag-search",
- title = 'RAG搜索',
- description="搜索内容并生成总结",
- inputSchema={
- "type": "object",
- "properties": {
- "query_text": {
- "type": "string",
- "description": "用户输入的查询文本",
- }
- },
- "required": ["query_text"], # 只强制 query_text 必填
- "additionalProperties": False,
- },
- ),
- ]
- return app
- async def rag_search(query_text: str) :
- dataset_id_strs = "11,12"
- dataset_ids = dataset_id_strs.split(",")
- search_type = "hybrid"
- query_results = await query_search(
- query_text=query_text,
- filters={"dataset_id": dataset_ids},
- search_type=search_type,
- )
- resource = get_resource_manager()
- content_chunk_mapper = ContentChunks(resource.mysql_client)
- contents_mapper = Contents(resource.mysql_client)
- chat_result_mapper = ChatResult(resource.mysql_client)
- res = []
- for result in query_results["results"]:
- content_chunks = await content_chunk_mapper.select_chunk_content(
- doc_id=result["doc_id"], chunk_id=result["chunk_id"]
- )
- contents = await contents_mapper.select_content_by_doc_id(result["doc_id"])
- if not content_chunks:
- return {"status_code": 500, "detail": "content_chunk not found", "data": {}}
- if not contents:
- return {"status_code": 500, "detail": "contents not found", "data": {}}
- content_chunk = content_chunks[0]
- content = contents[0]
- res.append(
- {
- "contentChunk": content_chunk["text"],
- "contentSummary": content_chunk["summary"],
- "content": content["text"],
- "score": result["score"],
- }
- )
- chat_classifier = ChatClassifier()
- chat_res = await chat_classifier.chat_with_deepseek(query_text, res)
- data = {
- "result": chat_res["summary"],
- "status": chat_res["status"],
- "relevance_score": chat_res["relevance_score"],
- }
- await chat_result_mapper.insert_chat_result(
- query_text,
- dataset_id_strs,
- json.dumps(res, ensure_ascii=False),
- chat_res["summary"],
- chat_res["relevance_score"],
- chat_res["status"],
- )
- return data
|