import asyncio import contextlib from typing import AsyncIterator import anyio import jieba from uvicorn import Config, Server from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from quart import Quart from starlette.applications import Starlette from starlette.routing import Mount from starlette.types import Receive, Send, Scope from applications.config import LOCAL_MODEL_CONFIG, DEFAULT_MODEL from applications.config import ES_HOSTS, ES_PASSWORD, ELASTIC_SEARCH_INDEX from applications.config import MILVUS_CONFIG from applications.resource import init_resource_manager from mcp_server.server import create_mcp_server, logger app = Quart(__name__) # 初始化 MODEL_PATH = LOCAL_MODEL_CONFIG[DEFAULT_MODEL] resource_manager = init_resource_manager( es_hosts=ES_HOSTS, es_index=ELASTIC_SEARCH_INDEX, es_password=ES_PASSWORD, milvus_config=MILVUS_CONFIG, ) # 确保没有重复事件循环 async def start_mcp_server(host: str, port: int, json_response: bool): try: # 创建应用 app = create_mcp_server() session_manager = StreamableHTTPSessionManager( app=app, event_store=None, json_response=json_response, stateless=True, ) logger.info("Session Manager created and ready.") async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None: try: logger.info(f"Started processing request: {scope}") await session_manager.handle_request(scope, receive, send) logger.info(f"Finished processing request: {scope}") except anyio.ClosedResourceError: logger.error("Stream closed unexpectedly during request.") except Exception as e: logger.error(f"Unexpected error: {e}") # 启动应用生命周期管理 @contextlib.asynccontextmanager async def lifespan(starlette_app: Starlette) -> AsyncIterator[None]: async with session_manager.run(): yield # 配置Starlette应用 starlette_app = Starlette( debug=True, routes=[Mount("/mcp", app=handle_streamable_http)], lifespan=lifespan, ) config = Config(app=starlette_app, host=host, port=port) server = Server(config=config) await server.serve() except Exception as e: logger.error(f"Error in start_mcp_server: {e}") raise @app.before_serving async def startup(): await resource_manager.startup() print("Resource manager is ready.") jieba.initialize() print("Jieba dictionary loaded successfully") # 使用 asyncio.create_task 来异步启动 mcp_server asyncio.create_task(start_mcp_server("0.0.0.0", 8002, json_response=True)) @app.after_serving async def shutdown(): await resource_manager.shutdown() print("Resource manager is Down.") # 注册路由 from routes import server_bp app.register_blueprint(server_bp)