vector_app.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import asyncio
  2. import contextlib
  3. from typing import AsyncIterator
  4. import anyio
  5. import jieba
  6. from uvicorn import Config, Server
  7. from mcp.server.streamable_http_manager import StreamableHTTPSessionManager
  8. from quart import Quart
  9. from starlette.applications import Starlette
  10. from starlette.routing import Mount
  11. from starlette.types import Receive, Send, Scope
  12. from applications.config import LOCAL_MODEL_CONFIG, DEFAULT_MODEL
  13. from applications.config import ES_HOSTS, ES_PASSWORD, ELASTIC_SEARCH_INDEX
  14. from applications.config import MILVUS_CONFIG
  15. from applications.resource import init_resource_manager
  16. from mcp_server.server import create_mcp_server, logger
  17. app = Quart(__name__)
  18. # 初始化
  19. MODEL_PATH = LOCAL_MODEL_CONFIG[DEFAULT_MODEL]
  20. resource_manager = init_resource_manager(
  21. es_hosts=ES_HOSTS,
  22. es_index=ELASTIC_SEARCH_INDEX,
  23. es_password=ES_PASSWORD,
  24. milvus_config=MILVUS_CONFIG,
  25. )
  26. # 确保没有重复事件循环
  27. async def start_mcp_server(host: str, port: int, json_response: bool):
  28. try:
  29. # 创建应用
  30. app = create_mcp_server()
  31. session_manager = StreamableHTTPSessionManager(
  32. app=app,
  33. event_store=None,
  34. json_response=json_response,
  35. stateless=True,
  36. )
  37. logger.info("Session Manager created and ready.")
  38. async def handle_streamable_http(scope: Scope, receive: Receive, send: Send) -> None:
  39. try:
  40. logger.info(f"Started processing request: {scope}")
  41. await session_manager.handle_request(scope, receive, send)
  42. logger.info(f"Finished processing request: {scope}")
  43. except anyio.ClosedResourceError:
  44. logger.error("Stream closed unexpectedly during request.")
  45. except Exception as e:
  46. logger.error(f"Unexpected error: {e}")
  47. # 启动应用生命周期管理
  48. @contextlib.asynccontextmanager
  49. async def lifespan(starlette_app: Starlette) -> AsyncIterator[None]:
  50. async with session_manager.run():
  51. yield
  52. # 配置Starlette应用
  53. starlette_app = Starlette(
  54. debug=True,
  55. routes=[Mount("/mcp", app=handle_streamable_http)],
  56. lifespan=lifespan,
  57. )
  58. config = Config(app=starlette_app, host=host, port=port)
  59. server = Server(config=config)
  60. await server.serve()
  61. except Exception as e:
  62. logger.error(f"Error in start_mcp_server: {e}")
  63. raise
  64. @app.before_serving
  65. async def startup():
  66. await resource_manager.startup()
  67. print("Resource manager is ready.")
  68. jieba.initialize()
  69. print("Jieba dictionary loaded successfully")
  70. # 使用 asyncio.create_task 来异步启动 mcp_server
  71. asyncio.create_task(start_mcp_server("0.0.0.0", 8002, json_response=True))
  72. @app.after_serving
  73. async def shutdown():
  74. await resource_manager.shutdown()
  75. print("Resource manager is Down.")
  76. # 注册路由
  77. from routes import server_bp
  78. app.register_blueprint(server_bp)