Bladeren bron

Merge branch 'feature/xueyiming/2025-09-24-mcp' of Server/rag_server into master

luojunhui 2 weken geleden
bovenliggende
commit
2652cb85e8
6 gewijzigde bestanden met toevoegingen van 202 en 4 verwijderingen
  1. 3 2
      Dockerfile
  2. 11 1
      docker-compose.yml
  3. 74 0
      mcp_app.py
  4. 0 0
      mcp_server/__init__.py
  5. 112 0
      mcp_server/server.py
  6. 2 1
      requirements.txt

+ 3 - 2
Dockerfile

@@ -27,7 +27,8 @@ RUN pip install --no-cache-dir -r requirements.txt -i https://mirrors.aliyun.com
 COPY . .
 
 # 暴露端口
-EXPOSE 8001
+EXPOSE 8001 8002
 
 # 启动命令
-CMD ["hypercorn", "vector_app:app", "--config", "config.toml"]
+#CMD python3 mcp_app.py && hypercorn vector_app:app --config config.toml
+

+ 11 - 1
docker-compose.yml

@@ -18,7 +18,17 @@ services:
   # 后端服务
   vector-app:
     build: .
+    command: hypercorn vector_app:app --config config.toml
     container_name: vector-app
     ports:
       - "8001:8001"
-    restart: always
+    restart: always
+
+  # mcp server
+  mcp-app:
+    build: .
+    command: python3 mcp_app.py
+    container_name: mcp-app
+    ports:
+      - "8002:8002"
+    restart: always

+ 74 - 0
mcp_app.py

@@ -0,0 +1,74 @@
+import contextlib
+import logging
+from collections.abc import AsyncIterator
+
+import click
+from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
+from starlette.applications import Starlette
+from starlette.routing import Mount
+from starlette.types import Receive, Scope, Send
+
+from applications.config import ES_HOSTS, ELASTIC_SEARCH_INDEX, ES_PASSWORD, MILVUS_CONFIG
+from applications.resource import init_resource_manager
+from mcp_server.server import create_mcp_server
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+)
+logger = logging.getLogger(__name__)
+
+
+# 初始化资源管理器
+resource_manager = init_resource_manager(
+    es_hosts=ES_HOSTS,
+    es_index=ELASTIC_SEARCH_INDEX,
+    es_password=ES_PASSWORD,
+    milvus_config=MILVUS_CONFIG,
+)
+
+
+@click.command()
+@click.option("--port", default=8002, help="服务器监听端口")
+@click.option("--host", default="0.0.0.0", help="服务器监听地址")
+@click.option("--json-response", is_flag=True, help="使用JSON响应而不是SSE流")
+def main(port: int, host: str, json_response: bool) -> int:
+    """启动MCP"""
+    # 创建MCP服务器
+    app = create_mcp_server()
+
+    # 创建会话管理器
+    session_manager = StreamableHTTPSessionManager(
+        app=app,
+        event_store=None,
+        json_response=json_response,
+        stateless=True,
+    )
+
+    # 处理Streamable HTTP请求
+    async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:
+        await session_manager.handle_request(scope, receive, send)
+
+    # 定义生命周期管理
+    @contextlib.asynccontextmanager
+    async def lifespan(app: Starlette) -> AsyncIterator[None]:
+        await resource_manager.startup()
+        async with session_manager.run():
+            yield
+
+    # 创建ASGI应用
+    starlette_app = Starlette(
+        debug=True,
+        routes=[Mount("/mcp", app=handle_streamable_http)],
+        lifespan=lifespan,
+    )
+
+    # 启动服务器
+    import uvicorn
+    uvicorn.run(starlette_app, host=host, port=port)
+    return 0
+
+
+if __name__ == "__main__":
+    main()

+ 0 - 0
mcp_server/__init__.py


+ 112 - 0
mcp_server/server.py

@@ -0,0 +1,112 @@
+import json
+from typing import Any, Dict, List
+
+import mcp.types as types
+from mcp.server.lowlevel import Server
+
+from applications.resource import get_resource_manager
+from applications.utils.chat import ChatClassifier
+from applications.utils.mysql import ContentChunks, Contents, ChatResult
+from routes.buleprint import query_search
+
+
+def create_mcp_server() -> Server:
+    """创建并配置MCP服务器"""
+    app = Server("mcp-rag-server")
+
+    @app.call_tool()
+    async def call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]:
+        """处理工具调用"""
+        # ctx = app.request_context
+        if name == "rag-search":
+            data = await rag_search(arguments["query_text"])
+            result = json.dumps(data, ensure_ascii=False, indent=2)
+        else:
+            raise ValueError(f"Unknown tool: {name}")
+        return [types.TextContent(type="text", text=result)]
+
+    @app.list_tools()
+    async def list_tools() -> List[types.Tool]:
+        return [
+            types.Tool(
+                name="rag-search",
+                title = 'RAG搜索',
+                description="搜索内容并生成总结",
+                inputSchema={
+                    "type": "object",
+                    "properties": {
+                        "query_text": {
+                            "type": "string",
+                            "description": "用户输入的查询文本",
+                        }
+                    },
+                    "required": ["query_text"],  # 只强制 query_text 必填
+                    "additionalProperties": False,
+                },
+            ),
+        ]
+
+    return app
+
+
+async def rag_search(query_text: str) :
+    dataset_id_strs = "11,12"
+    dataset_ids = dataset_id_strs.split(",")
+    search_type = "hybrid"
+
+    query_results = await query_search(
+        query_text=query_text,
+        filters={"dataset_id": dataset_ids},
+        search_type=search_type,
+    )
+
+    resource = get_resource_manager()
+    content_chunk_mapper = ContentChunks(resource.mysql_client)
+    contents_mapper = Contents(resource.mysql_client)
+    chat_result_mapper = ChatResult(resource.mysql_client)
+
+    res = []
+    for result in query_results["results"]:
+        content_chunks = await content_chunk_mapper.select_chunk_content(
+            doc_id=result["doc_id"], chunk_id=result["chunk_id"]
+        )
+        contents = await contents_mapper.select_content_by_doc_id(result["doc_id"])
+        if not content_chunks:
+            return {"status_code": 500, "detail": "content_chunk not found", "data": {}}
+        if not contents:
+            return {"status_code": 500, "detail": "contents not found", "data": {}}
+
+        content_chunk = content_chunks[0]
+        content = contents[0]
+        res.append(
+            {
+                "contentChunk": content_chunk["text"],
+                "contentSummary": content_chunk["summary"],
+                "content": content["text"],
+                "score": result["score"],
+            }
+        )
+
+    chat_classifier = ChatClassifier()
+    chat_res = await chat_classifier.chat_with_deepseek(query_text, res)
+
+    data = {
+        "result": chat_res["summary"],
+        "status": chat_res["status"],
+        "relevance_score": chat_res["relevance_score"],
+    }
+
+    await chat_result_mapper.insert_chat_result(
+        query_text,
+        dataset_id_strs,
+        json.dumps(res, ensure_ascii=False),
+        chat_res["summary"],
+        chat_res["relevance_score"],
+        chat_res["status"],
+    )
+
+    return data
+
+
+
+

+ 2 - 1
requirements.txt

@@ -24,4 +24,5 @@ scikit-learn==1.7.2
 neo4j==5.28.2
 langchain==0.3.27
 langchain-core==0.3.76
-langchain-text-splitters==0.3.11
+langchain-text-splitters==0.3.11
+mcp==1.14.1