from pymilvus import connections, CollectionSchema, Collection from neo4j import AsyncGraphDatabase, AsyncDriver from applications.config import NEO4j_CONFIG from applications.utils.mysql import DatabaseManager from applications.utils.milvus.field import fields from applications.utils.elastic_search import AsyncElasticSearchClient class ResourceManager: def __init__(self, es_index, es_hosts, es_password, milvus_config): self.es_index = es_index self.es_hosts = es_hosts self.es_password = es_password self.milvus_config = milvus_config self.es_client: AsyncElasticSearchClient | None = None self.milvus_client: Collection | None = None self.mysql_client: DatabaseManager | None = None self.graph_client: AsyncDriver | None = None async def load_milvus(self): connections.connect("default", **self.milvus_config) schema = CollectionSchema( fields, description="Chunk multi-vector embeddings with metadata" ) self.milvus_client = Collection(name="chunk_multi_embeddings_v2", schema=schema) # create index vector_index_params = { "index_type": "IVF_FLAT", "metric_type": "COSINE", "params": {"M": 16, "efConstruction": 200}, } self.milvus_client.create_index("vector_text", vector_index_params) self.milvus_client.create_index("vector_summary", vector_index_params) self.milvus_client.create_index("vector_questions", vector_index_params) self.milvus_client.load() async def startup(self): # 初始化 Elasticsearch self.es_client = AsyncElasticSearchClient( index_name=self.es_index, hosts=self.es_hosts, password=self.es_password ) if await self.es_client.es.ping(): print("✅ Elasticsearch connected") else: print("❌ Elasticsearch connection failed") # 初始化 MySQL self.mysql_client = DatabaseManager() await self.mysql_client.init_pools() print("✅ MySQL connected") # 初始化 milvus await self.load_milvus() print("✅ Milvus loaded") uri: str = NEO4j_CONFIG["url"] auth: tuple = NEO4j_CONFIG["user"], NEO4j_CONFIG["password"] self.graph_client = AsyncGraphDatabase.driver(uri=uri, auth=auth) print("✅ NEO4j loaded") async def shutdown(self): # 关闭 Elasticsearch if self.es_client: await self.es_client.close() print("Elasticsearch closed") # 关闭 Milvus connections.disconnect("default") print("Milvus closed") # 关闭 MySQL if self.mysql_client: await self.mysql_client.close_pools() print("Mysql closed") await self.graph_client.close() print("Graph closed") _resource_manager: ResourceManager | None = None def init_resource_manager(es_index, es_hosts, es_password, milvus_config): global _resource_manager if _resource_manager is None: _resource_manager = ResourceManager( es_index, es_hosts, es_password, milvus_config ) return _resource_manager def get_resource_manager() -> ResourceManager: return _resource_manager