server.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import asyncio
  2. import json
  3. from typing import Any, Dict, List
  4. import mcp.types as types
  5. from mcp.server.lowlevel import Server
  6. from applications.resource import get_resource_manager
  7. from applications.utils.chat import RAGChatAgent
  8. from applications.utils.mysql import ChatResult
  9. from routes.buleprint import query_search
  10. def create_mcp_server() -> Server:
  11. """创建并配置MCP服务器"""
  12. app = Server("mcp-rag-server")
  13. @app.call_tool()
  14. async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
  15. """处理工具调用"""
  16. # ctx = app.request_context
  17. if name == "rag-search":
  18. data = await rag_search(arguments["query_text"])
  19. result = json.dumps(data, ensure_ascii=False, indent=2)
  20. else:
  21. raise ValueError(f"Unknown tool: {name}")
  22. return [types.TextContent(type="text", text=result)]
  23. @app.list_tools()
  24. async def list_tools() -> List[types.Tool]:
  25. return [
  26. types.Tool(
  27. name="rag-search",
  28. title = 'RAG搜索',
  29. description="搜索内容并生成总结",
  30. inputSchema={
  31. "type": "object",
  32. "properties": {
  33. "query_text": {
  34. "type": "string",
  35. "description": "用户输入的查询文本",
  36. }
  37. },
  38. "required": ["query_text"], # 只强制 query_text 必填
  39. "additionalProperties": False,
  40. },
  41. ),
  42. ]
  43. return app
  44. async def rag_search(query_text: str) :
  45. dataset_id_strs = "11,12"
  46. dataset_ids = dataset_id_strs.split(",")
  47. search_type = "hybrid"
  48. query_results = await query_search(
  49. query_text=query_text,
  50. filters={"dataset_id": dataset_ids},
  51. search_type=search_type,
  52. )
  53. resource = get_resource_manager()
  54. chat_result_mapper = ChatResult(resource.mysql_client)
  55. rag_chat_agent = RAGChatAgent()
  56. chat_res = await rag_chat_agent.chat_with_deepseek(query_text, query_results)
  57. deepseek_search = await rag_chat_agent.search_with_deepseek(query_text)
  58. select = await rag_chat_agent.select_with_deepseek(chat_res, deepseek_search)
  59. data = {
  60. "result": select["result"],
  61. "status": select["status"],
  62. "relevance_score": select["relevance_score"],
  63. }
  64. await chat_result_mapper.insert_chat_result(
  65. query_text,
  66. dataset_id_strs,
  67. json.dumps(query_results, ensure_ascii=False),
  68. chat_res["summary"],
  69. chat_res["relevance_score"],
  70. chat_res["status"],
  71. deepseek_search['answer'],
  72. deepseek_search['source'],
  73. deepseek_search['status']
  74. )
  75. return data