from pymilvus import connections, CollectionSchema, Collection from neo4j import AsyncGraphDatabase, AsyncDriver from applications.config import NEO4j_CONFIG from applications.utils.mysql import DatabaseManager from applications.utils.milvus.field import fields, mode_fields from applications.utils.elastic_search import AsyncElasticSearchClient class ResourceManager: def __init__(self, es_index, es_hosts, es_password, milvus_config): self.es_index = es_index self.es_hosts = es_hosts self.es_password = es_password self.milvus_config = milvus_config self.es_client: AsyncElasticSearchClient | None = None self.milvus_client: Collection | None = None self.mysql_client: DatabaseManager | None = None self.graph_client: AsyncDriver | None = None async def load_milvus(self): connections.connect("default", **self.milvus_config) schema = CollectionSchema( mode_fields, description="标准模式向量空间" ) self.milvus_client = Collection(name="standard_mode_embeddings", schema=schema) # create index vector_index_params = { "index_type": "IVF_FLAT", "metric_type": "COSINE", "params": {"M": 16, "efConstruction": 200}, } self.milvus_client.create_index("mode_vector", vector_index_params) self.milvus_client.load() async def startup(self): # 初始化 Elasticsearch # self.es_client = AsyncElasticSearchClient( # index_name=self.es_index, hosts=self.es_hosts, password=self.es_password # ) # if await self.es_client.es.ping(): # print("✅ Elasticsearch connected") # else: # print("❌ Elasticsearch connection failed") # 初始化 MySQL self.mysql_client = DatabaseManager() await self.mysql_client.init_pools() print("✅ MySQL connected") # 初始化 milvus await self.load_milvus() print("✅ Milvus loaded") # uri: str = NEO4j_CONFIG["url"] # auth: tuple = NEO4j_CONFIG["user"], NEO4j_CONFIG["password"] # self.graph_client = AsyncGraphDatabase.driver(uri=uri, auth=auth) # print("✅ NEO4j loaded") async def shutdown(self): # 关闭 Elasticsearch if self.es_client: await self.es_client.close() print("Elasticsearch closed") # 关闭 Milvus connections.disconnect("default") print("Milvus closed") # 关闭 MySQL if self.mysql_client: await self.mysql_client.close_pools() print("Mysql closed") # await self.graph_client.close() # print("Graph closed") _resource_manager: ResourceManager | None = None def init_resource_manager(es_index, es_hosts, es_password, milvus_config): global _resource_manager if _resource_manager is None: _resource_manager = ResourceManager( es_index, es_hosts, es_password, milvus_config ) return _resource_manager def get_resource_manager() -> ResourceManager: return _resource_manager