xueyiming 2 tygodni temu
rodzic
commit
8d7bcd50ad
4 zmienionych plików z 85 dodań i 73 usunięć
  1. 1 0
      Dockerfile
  2. 74 0
      mcp_app.py
  3. 10 17
      mcp_server/server.py
  4. 0 56
      vector_app.py

+ 1 - 0
Dockerfile

@@ -31,3 +31,4 @@ EXPOSE 8001 8002
 
 # 启动命令
 CMD ["hypercorn", "vector_app:app", "--config", "config.toml"]
+CMD ["python3", "mcp_app.py"]

+ 74 - 0
mcp_app.py

@@ -0,0 +1,74 @@
+import contextlib
+import logging
+from collections.abc import AsyncIterator
+
+import click
+from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
+from starlette.applications import Starlette
+from starlette.routing import Mount
+from starlette.types import Receive, Scope, Send
+
+from applications.config import ES_HOSTS, ELASTIC_SEARCH_INDEX, ES_PASSWORD, MILVUS_CONFIG
+from applications.resource import init_resource_manager
+from mcp_server.server import create_mcp_server
+
+# 配置日志
+logging.basicConfig(
+    level=logging.INFO,
+    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+)
+logger = logging.getLogger(__name__)
+
+
+# 初始化资源管理器
+resource_manager = init_resource_manager(
+    es_hosts=ES_HOSTS,
+    es_index=ELASTIC_SEARCH_INDEX,
+    es_password=ES_PASSWORD,
+    milvus_config=MILVUS_CONFIG,
+)
+
+
+@click.command()
+@click.option("--port", default=8002, help="服务器监听端口")
+@click.option("--host", default="0.0.0.0", help="服务器监听地址")
+@click.option("--json-response", is_flag=True, help="使用JSON响应而不是SSE流")
+def main(port: int, host: str, json_response: bool) -> int:
+    """启动MCP电灯开关服务器"""
+    # 创建MCP服务器
+    app = create_mcp_server()
+
+    # 创建会话管理器
+    session_manager = StreamableHTTPSessionManager(
+        app=app,
+        event_store=None,
+        json_response=json_response,
+        stateless=True,
+    )
+
+    # 处理Streamable HTTP请求
+    async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:
+        await session_manager.handle_request(scope, receive, send)
+
+    # 定义生命周期管理
+    @contextlib.asynccontextmanager
+    async def lifespan(app: Starlette) -> AsyncIterator[None]:
+        await resource_manager.startup()
+        async with session_manager.run():
+            yield
+
+    # 创建ASGI应用
+    starlette_app = Starlette(
+        debug=True,
+        routes=[Mount("/mcp", app=handle_streamable_http)],
+        lifespan=lifespan,
+    )
+
+    # 启动服务器
+    import uvicorn
+    uvicorn.run(starlette_app, host=host, port=port)
+    return 0
+
+
+if __name__ == "__main__":
+    main()

+ 10 - 17
mcp_server/server.py

@@ -1,30 +1,23 @@
 import json
-import logging
 from typing import Any, Dict, List
 
 import mcp.types as types
 from mcp.server.lowlevel import Server
 
-from applications.config import ES_HOSTS, ELASTIC_SEARCH_INDEX, ES_PASSWORD, MILVUS_CONFIG
-from applications.resource import get_resource_manager, init_resource_manager
+from applications.resource import get_resource_manager
 from applications.utils.chat import ChatClassifier
 from applications.utils.mysql import ContentChunks, Contents, ChatResult
 from routes.buleprint import query_search
+import json
+from typing import Any, Dict, List
+
+import mcp.types as types
+from mcp.server.lowlevel import Server
 
-# 配置日志
-logging.basicConfig(
-    level=logging.INFO,
-    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
-)
-logger = logging.getLogger(__name__)
-
-# 初始化资源管理器
-resource_manager = init_resource_manager(
-    es_hosts=ES_HOSTS,
-    es_index=ELASTIC_SEARCH_INDEX,
-    es_password=ES_PASSWORD,
-    milvus_config=MILVUS_CONFIG,
-)
+from applications.resource import get_resource_manager
+from applications.utils.chat import ChatClassifier
+from applications.utils.mysql import ContentChunks, Contents, ChatResult
+from routes.buleprint import query_search
 
 
 def create_mcp_server() -> Server:

+ 0 - 56
vector_app.py

@@ -1,21 +1,10 @@
-import asyncio
-import contextlib
-from typing import AsyncIterator
-
-import anyio
 import jieba
-from uvicorn import Config, Server
-from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
 from quart import Quart
-from starlette.applications import Starlette
-from starlette.routing import Mount
-from starlette.types import Receive, Send, Scope
 
 from applications.config import LOCAL_MODEL_CONFIG, DEFAULT_MODEL
 from applications.config import ES_HOSTS, ES_PASSWORD, ELASTIC_SEARCH_INDEX
 from applications.config import MILVUS_CONFIG
 from applications.resource import init_resource_manager
-from mcp_server.server import create_mcp_server, logger
 
 app = Quart(__name__)
 
@@ -30,57 +19,12 @@ resource_manager = init_resource_manager(
 )
 
 
-# 确保没有重复事件循环
-async def start_mcp_server(host: str, port: int, json_response: bool):
-    try:
-        # 创建应用
-        app = create_mcp_server()
-        session_manager = StreamableHTTPSessionManager(
-            app=app,
-            event_store=None,
-            json_response=json_response,
-            stateless=True,
-        )
-        logger.info("Session Manager created and ready.")
-
-        async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:
-            try:
-                logger.info(f"Started processing request: {scope}")
-                await session_manager.handle_request(scope, receive, send)
-                logger.info(f"Finished processing request: {scope}")
-            except anyio.ClosedResourceError:
-                logger.error("Stream closed unexpectedly during request.")
-            except Exception as e:
-                logger.error(f"Unexpected error: {e}")
-
-        # 启动应用生命周期管理
-        @contextlib.asynccontextmanager
-        async def lifespan(starlette_app: Starlette) -> AsyncIterator[None]:
-            async with session_manager.run():
-                yield
-
-        # 配置Starlette应用
-        starlette_app = Starlette(
-            debug=True,
-            routes=[Mount("/mcp", app=handle_streamable_http)],
-            lifespan=lifespan,
-        )
-        config = Config(app=starlette_app, host=host, port=port)
-        server = Server(config=config)
-        await server.serve()
-
-    except Exception as e:
-        logger.error(f"Error in start_mcp_server: {e}")
-        raise
-
 @app.before_serving
 async def startup():
     await resource_manager.startup()
     print("Resource manager is ready.")
     jieba.initialize()
     print("Jieba dictionary loaded successfully")
-    # 使用 asyncio.create_task 来异步启动 mcp_server
-    asyncio.create_task(start_mcp_server("0.0.0.0", 8002, json_response=True))
 
 
 @app.after_serving