1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- import asyncio
- import contextlib
- from typing import AsyncIterator
- import anyio
- import jieba
- from uvicorn import Config, Server
- from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
- from quart import Quart
- from starlette.applications import Starlette
- from starlette.routing import Mount
- from starlette.types import Receive, Send, Scope
- from applications.config import LOCAL_MODEL_CONFIG, DEFAULT_MODEL
- from applications.config import ES_HOSTS, ES_PASSWORD, ELASTIC_SEARCH_INDEX
- from applications.config import MILVUS_CONFIG
- from applications.resource import init_resource_manager
- from mcp_server.server import create_mcp_server, logger
- app = Quart(__name__)
- # 初始化
- MODEL_PATH = LOCAL_MODEL_CONFIG[DEFAULT_MODEL]
- resource_manager = init_resource_manager(
- es_hosts=ES_HOSTS,
- es_index=ELASTIC_SEARCH_INDEX,
- es_password=ES_PASSWORD,
- milvus_config=MILVUS_CONFIG,
- )
- # 确保没有重复事件循环
- async def start_mcp_server(host: str, port: int, json_response: bool):
- try:
- # 创建应用
- app = create_mcp_server()
- session_manager = StreamableHTTPSessionManager(
- app=app,
- event_store=None,
- json_response=json_response,
- stateless=True,
- )
- logger.info("Session Manager created and ready.")
- async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:
- try:
- logger.info(f"Started processing request: {scope}")
- await session_manager.handle_request(scope, receive, send)
- logger.info(f"Finished processing request: {scope}")
- except anyio.ClosedResourceError:
- logger.error("Stream closed unexpectedly during request.")
- except Exception as e:
- logger.error(f"Unexpected error: {e}")
- # 启动应用生命周期管理
- @contextlib.asynccontextmanager
- async def lifespan(starlette_app: Starlette) -> AsyncIterator[None]:
- async with session_manager.run():
- yield
- # 配置Starlette应用
- starlette_app = Starlette(
- debug=True,
- routes=[Mount("/mcp", app=handle_streamable_http)],
- lifespan=lifespan,
- )
- config = Config(app=starlette_app, host=host, port=port)
- server = Server(config=config)
- await server.serve()
- except Exception as e:
- logger.error(f"Error in start_mcp_server: {e}")
- raise
- @app.before_serving
- async def startup():
- await resource_manager.startup()
- print("Resource manager is ready.")
- jieba.initialize()
- print("Jieba dictionary loaded successfully")
- # 使用 asyncio.create_task 来异步启动 mcp_server
- asyncio.create_task(start_mcp_server("0.0.0.0", 8002, json_response=True))
- @app.after_serving
- async def shutdown():
- await resource_manager.shutdown()
- print("Resource manager is Down.")
- # 注册路由
- from routes import server_bp
- app.register_blueprint(server_bp)
|