resource_manager.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. from pymilvus import connections, CollectionSchema, Collection
  2. from applications.utils.mysql import DatabaseManager
  3. from applications.utils.milvus.field import fields
  4. from applications.utils.elastic_search import AsyncElasticSearchClient
  5. class ResourceManager:
  6. def __init__(self, es_index, es_hosts, es_password, milvus_config):
  7. self.es_index = es_index
  8. self.es_hosts = es_hosts
  9. self.es_password = es_password
  10. self.milvus_config = milvus_config
  11. self.es_client: AsyncElasticSearchClient | None = None
  12. self.milvus_client: Collection | None = None
  13. self.mysql_client: DatabaseManager | None = None
  14. async def load_milvus(self):
  15. connections.connect("default", **self.milvus_config)
  16. schema = CollectionSchema(
  17. fields, description="Chunk multi-vector embeddings with metadata"
  18. )
  19. self.milvus_client = Collection(name="chunk_multi_embeddings_v2", schema=schema)
  20. # create index
  21. vector_index_params = {
  22. "index_type": "IVF_FLAT",
  23. "metric_type": "COSINE",
  24. "params": {"M": 16, "efConstruction": 200},
  25. }
  26. self.milvus_client.create_index("vector_text", vector_index_params)
  27. self.milvus_client.create_index("vector_summary", vector_index_params)
  28. self.milvus_client.create_index("vector_questions", vector_index_params)
  29. self.milvus_client.load()
  30. async def startup(self):
  31. # 初始化 Elasticsearch
  32. self.es_client = AsyncElasticSearchClient(
  33. index_name=self.es_index, hosts=self.es_hosts, password=self.es_password
  34. )
  35. if await self.es_client.es.ping():
  36. print("✅ Elasticsearch connected")
  37. else:
  38. print("❌ Elasticsearch connection failed")
  39. # 初始化 MySQL
  40. self.mysql_client = DatabaseManager()
  41. await self.mysql_client.init_pools()
  42. print("✅ MySQL connected")
  43. # 初始化 milvus
  44. await self.load_milvus()
  45. print("✅ Milvus loaded")
  46. async def shutdown(self):
  47. # 关闭 Elasticsearch
  48. if self.es_client:
  49. await self.es_client.close()
  50. print("Elasticsearch closed")
  51. # 关闭 Milvus
  52. connections.disconnect("default")
  53. print("Milvus closed")
  54. # 关闭 MySQL
  55. if self.mysql_client:
  56. await self.mysql_client.close_pools()
  57. print("Mysql closed")
  58. _resource_manager: ResourceManager | None = None
  59. def init_resource_manager(es_index, es_hosts, es_password, milvus_config):
  60. global _resource_manager
  61. if _resource_manager is None:
  62. _resource_manager = ResourceManager(
  63. es_index, es_hosts, es_password, milvus_config
  64. )
  65. return _resource_manager
  66. def get_resource_manager() -> ResourceManager:
  67. return _resource_manager