123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- from pymilvus import connections, CollectionSchema, Collection
- from neo4j import AsyncGraphDatabase
- from applications.config import NEO4j_CONFIG
- from applications.utils.mysql import DatabaseManager
- from applications.utils.milvus.field import fields
- from applications.utils.elastic_search import AsyncElasticSearchClient
- class ResourceManager:
- def __init__(self, es_index, es_hosts, es_password, milvus_config):
- self.es_index = es_index
- self.es_hosts = es_hosts
- self.es_password = es_password
- self.milvus_config = milvus_config
- self.es_client: AsyncElasticSearchClient | None = None
- self.milvus_client: Collection | None = None
- self.mysql_client: DatabaseManager | None = None
- self.graph_client = None
- async def load_milvus(self):
- connections.connect("default", **self.milvus_config)
- schema = CollectionSchema(
- fields, description="Chunk multi-vector embeddings with metadata"
- )
- self.milvus_client = Collection(name="chunk_multi_embeddings_v2", schema=schema)
- # create index
- vector_index_params = {
- "index_type": "IVF_FLAT",
- "metric_type": "COSINE",
- "params": {"M": 16, "efConstruction": 200},
- }
- self.milvus_client.create_index("vector_text", vector_index_params)
- self.milvus_client.create_index("vector_summary", vector_index_params)
- self.milvus_client.create_index("vector_questions", vector_index_params)
- self.milvus_client.load()
- async def startup(self):
- # 初始化 Elasticsearch
- self.es_client = AsyncElasticSearchClient(
- index_name=self.es_index, hosts=self.es_hosts, password=self.es_password
- )
- if await self.es_client.es.ping():
- print("✅ Elasticsearch connected")
- else:
- print("❌ Elasticsearch connection failed")
- # 初始化 MySQL
- self.mysql_client = DatabaseManager()
- await self.mysql_client.init_pools()
- print("✅ MySQL connected")
- # 初始化 milvus
- await self.load_milvus()
- print("✅ Milvus loaded")
- uri: str = NEO4j_CONFIG["url"]
- auth: tuple = NEO4j_CONFIG["user"], NEO4j_CONFIG["password"]
- self.graph_client = AsyncGraphDatabase.driver(uri=uri, auth=auth)
- async def shutdown(self):
- # 关闭 Elasticsearch
- if self.es_client:
- await self.es_client.close()
- print("Elasticsearch closed")
- # 关闭 Milvus
- connections.disconnect("default")
- print("Milvus closed")
- # 关闭 MySQL
- if self.mysql_client:
- await self.mysql_client.close_pools()
- print("Mysql closed")
- await self.graph_client.close()
- _resource_manager: ResourceManager | None = None
- def init_resource_manager(es_index, es_hosts, es_password, milvus_config):
- global _resource_manager
- if _resource_manager is None:
- _resource_manager = ResourceManager(
- es_index, es_hosts, es_password, milvus_config
- )
- return _resource_manager
- def get_resource_manager() -> ResourceManager:
- return _resource_manager
|