container.py 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. # core/di/container.py
  2. from typing import Dict, Any, Type
  3. from dependency_injector import containers, providers
  4. from config.base import settings
  5. from services.async_mysql_service import AsyncMysqlService
  6. from services.async_mq_producer import AsyncMQProducer
  7. from services.async_mq_consumer import AsyncRocketMQConsumer
  8. class ServiceContainer(containers.DeclarativeContainer):
  9. """服务容器:统一管理所有服务的生命周期"""
  10. # 配置
  11. config = providers.Configuration()
  12. # 数据库服务工厂
  13. db_service = providers.Factory(
  14. AsyncMysqlService,
  15. platform=config.platform,
  16. mode=config.mode
  17. )
  18. # MQ 生产者工厂
  19. mq_producer = providers.Singleton(
  20. AsyncMQProducer,
  21. topic_name="topic_crawler_etl_prod_v2",
  22. platform=config.platform,
  23. mode=config.mode
  24. )
  25. # MQ 消费者工厂
  26. mq_consumer = providers.Factory(
  27. AsyncRocketMQConsumer,
  28. topic_name=config.topic_name,
  29. group_id=config.group_id
  30. )
  31. # 全局容器实例
  32. container = ServiceContainer()