# spiders/advanced_factory.py from typing import Dict, List, Type, Optional from config.spider_config import SpiderConfig from core.di.container import container from spiders.basespider import BaseSpider from spiders.spider_registry import SPIDER_CLASS_MAP, get_spider_class class AdvancedSpiderFactory: """高级爬虫工厂,支持自动配置映射和依赖注入""" @classmethod def create_spider(cls, spider_key: str, rule_dict: Dict, user_list: List, env: str = "prod", **kwargs) -> BaseSpider: """ 智能创建爬虫实例 """ # 1. 获取配置 config = SpiderConfig.get_platform_config(spider_key) # 2. 确定爬虫类 spider_class = cls._resolve_spider_class(spider_key, config) # 3. 准备依赖服务 dependencies = cls._prepare_dependencies(config, **kwargs) # 4. 创建实例 return spider_class( rule_dict=rule_dict, user_list=user_list, env=env, **dependencies ) @classmethod def create_spider_by_topic(cls, topic: str, task_id: str, env: str = "prod") -> Optional[BaseSpider]: """ 根据MQ topic和任务ID创建爬虫(完整流程) """ # 1. 映射topic到爬虫类型 spider_key = cls._map_topic_to_spider_key(topic) if not spider_key: return None # 2. 从数据库获取任务配置 rule_dict, user_list = cls._get_task_config(task_id) if not rule_dict: return None # 3. 创建爬虫实例 return cls.create_spider(spider_key, rule_dict, user_list, env) @classmethod def _resolve_spider_class(cls, spider_key: str, config) -> Type[BaseSpider]: """解析爬虫类(支持自动发现)""" # 优先使用注册表 if spider_key in SPIDER_CLASS_MAP: return SPIDER_CLASS_MAP[spider_key] # 根据命名约定自动推断 if spider_key.endswith('recommend'): from spiders.recommendspider import RecommendSpider return RecommendSpider elif spider_key.endswith('author'): from spiders.authorspider import AuthorSpider return AuthorSpider # 默认使用基类 return BaseSpider @classmethod def _prepare_dependencies(cls, config, **kwargs): """准备依赖服务""" dependencies = {} # 数据库服务 if 'db_service' not in kwargs: dependencies['db_service'] = container.db_service( platform=config.platform, mode=config.mode ) # MQ 生产者 if 'mq_producer' not in kwargs: dependencies['mq_producer'] = container.mq_producer() return {**dependencies, **kwargs} @classmethod def _map_topic_to_spider_key(cls, topic: str) -> Optional[str]: """topic到爬虫配置的映射""" mapping = { "bszf_recommend_prod": "benshanzhufurecommend", "ynfqmm_recommend_prod": "yuannifuqimanmanrecommend", "xng_author_prod": "xiaoniangaoauthor" } return mapping.get(topic) @classmethod def _get_task_config(cls, task_id: str): """从数据库获取任务配置""" # 这里需要实现数据库查询逻辑 # 返回 (rule_dict, user_list) return {}, [] # 全局工厂实例 spider_factory = AdvancedSpiderFactory()