""" 配置管理服务 统一管理环境配置和爬虫配置 """ import json from typing import Dict, Any, Optional from core.utils.spider_config import SpiderConfig from core.models.spiders_config_models import PlatformConfig class ConfigManager: """ 统一配置管理器 提供对环境配置和爬虫配置的统一访问接口 """ def __init__(self): # 延迟导入settings以避免循环导入 from config import settings self._env_settings = settings self._spider_config = SpiderConfig @property def env_settings(self): """ 获取环境配置 """ return self._env_settings def get_platform_config(self, platform_name: str) -> PlatformConfig: """ 获取平台爬虫配置 """ return self._spider_config.get_platform_config(platform_name) def list_platforms(self) -> list: """ 获取所有平台列表 """ return self._spider_config.list_all_platforms() def reload_spider_configs(self): """ 重新加载爬虫配置 """ self._spider_config.reload_config() def get_config_stats(self) -> Dict[str, Any]: """ 获取配置统计信息 """ stats = self._spider_config.get_config_stats() stats["env"] = self._env_settings.ENV return stats def validate_platform_config(self, platform_name: str) -> bool: """ 验证平台配置是否有效 """ try: self.get_platform_config(platform_name) return True except Exception: return False def export_configs(self) -> Dict[str, Any]: """ 导出所有配置信息(用于调试和监控) """ return { "env_settings": { "env": self._env_settings.ENV, "log_level": self._env_settings.LOG_LEVEL, "db_host": self._env_settings.DB_HOST, "rocketmq_endpoint": str(self._env_settings.ROCKETMQ_ENDPOINT), # 不包含敏感信息如密码、密钥等 }, "spider_configs": self.list_platforms(), "stats": self.get_config_stats() } def get_platform_configs_summary(self) -> Dict[str, Dict[str, Any]]: """ 获取所有平台配置摘要信息 """ platforms = self.list_platforms() summary = {} for platform in platforms: try: config = self.get_platform_config(platform) summary[platform] = { "platform": config.platform, "mode": config.mode, "method": config.method, "url": str(config.url), "loop_times": config.loop_times, } except Exception as e: summary[platform] = { "error": str(e) } return summary async def reload_configs_runtime(self): """ 运行时重新加载配置(支持不重启服务的情况下重新加载配置) 这个方法可以在接收到特定信号或API调用时被调用 """ try: # 重新加载爬虫配置 self.reload_spider_configs() return True except Exception as e: # 记录错误日志 print(f"运行时重新加载配置失败: {e}") return False # 全局配置管理器实例 config_manager = ConfigManager() def get_config_manager() -> ConfigManager: """ 获取配置管理器实例 """ return config_manager