123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- # spiders/advanced_factory.py
- from typing import Dict, List, Type, Optional
- from config.spider_config import SpiderConfig
- from core.di.container import container
- from spiders.basespider import BaseSpider
- from spiders.spider_registry import SPIDER_CLASS_MAP, get_spider_class
- class AdvancedSpiderFactory:
- """高级爬虫工厂,支持自动配置映射和依赖注入"""
- @classmethod
- def create_spider(cls, spider_key: str, rule_dict: Dict, user_list: List,
- env: str = "prod", **kwargs) -> BaseSpider:
- """
- 智能创建爬虫实例
- """
- # 1. 获取配置
- config = SpiderConfig.get_platform_config(spider_key)
- # 2. 确定爬虫类
- spider_class = cls._resolve_spider_class(spider_key, config)
- # 3. 准备依赖服务
- dependencies = cls._prepare_dependencies(config, **kwargs)
- # 4. 创建实例
- return spider_class(
- rule_dict=rule_dict,
- user_list=user_list,
- env=env,
- **dependencies
- )
- @classmethod
- def create_spider_by_topic(cls, topic: str, task_id: str, env: str = "prod") -> Optional[BaseSpider]:
- """
- 根据MQ topic和任务ID创建爬虫(完整流程)
- """
- # 1. 映射topic到爬虫类型
- spider_key = cls._map_topic_to_spider_key(topic)
- if not spider_key:
- return None
- # 2. 从数据库获取任务配置
- rule_dict, user_list = cls._get_task_config(task_id)
- if not rule_dict:
- return None
- # 3. 创建爬虫实例
- return cls.create_spider(spider_key, rule_dict, user_list, env)
- @classmethod
- def _resolve_spider_class(cls, spider_key: str, config) -> Type[BaseSpider]:
- """解析爬虫类(支持自动发现)"""
- # 优先使用注册表
- if spider_key in SPIDER_CLASS_MAP:
- return SPIDER_CLASS_MAP[spider_key]
- # 根据命名约定自动推断
- if spider_key.endswith('recommend'):
- from spiders.recommendspider import RecommendSpider
- return RecommendSpider
- elif spider_key.endswith('author'):
- from spiders.authorspider import AuthorSpider
- return AuthorSpider
- # 默认使用基类
- return BaseSpider
- @classmethod
- def _prepare_dependencies(cls, config, **kwargs):
- """准备依赖服务"""
- dependencies = {}
- # 数据库服务
- if 'db_service' not in kwargs:
- dependencies['db_service'] = container.db_service(
- platform=config.platform,
- mode=config.mode
- )
- # MQ 生产者
- if 'mq_producer' not in kwargs:
- dependencies['mq_producer'] = container.mq_producer()
- return {**dependencies, **kwargs}
- @classmethod
- def _map_topic_to_spider_key(cls, topic: str) -> Optional[str]:
- """topic到爬虫配置的映射"""
- mapping = {
- "bszf_recommend_prod": "benshanzhufurecommend",
- "ynfqmm_recommend_prod": "yuannifuqimanmanrecommend",
- "xng_author_prod": "xiaoniangaoauthor"
- }
- return mapping.get(topic)
- @classmethod
- def _get_task_config(cls, task_id: str):
- """从数据库获取任务配置"""
- # 这里需要实现数据库查询逻辑
- # 返回 (rule_dict, user_list)
- return {}, []
- # 全局工厂实例
- spider_factory = AdvancedSpiderFactory()
|