import json import time import traceback from multiprocessing import Process, cpu_count from typing import List, Dict import asyncio from application.config.common.log.logger_manager import LoggerManager from utils.trace_utils import generate_trace_id from application.config.common import get_consumer, ack_message from application.functions.async_mysql_service import AsyncMysqlService from application.spiders.spider_registry import get_spider_class, SPIDER_CLASS_MAP from application.functions.rocketmq_consumer import AsyncRocketMQConsumer # ------------------------------- Topic 协程处理核心 ------------------------------- # 每个进程共享的 mysql service 实例(全局变量) mysql_service: AsyncMysqlService = None async def async_handle_topic(topic: str): """ 单个 topic 的消费逻辑,运行在协程中: - 从 MQ 中消费消息; - 根据消息内容执行对应爬虫; - 使用异步数据库服务查询配置; - 记录日志、确认消息。 """ logger = LoggerManager.get_logger(topic, "worker") aliyun_logger = LoggerManager.get_aliyun_logger(topic, "worker") # 每个 topic 创建独立的 consumer 实例 consumer = AsyncRocketMQConsumer(topic_name=topic, group_id=topic) async def handle_single_message(message): trace_id = generate_trace_id() try: payload = json.loads(message.message_body) platform = payload["platform"] mode = payload["mode"] task_id = payload["id"] user_list = await mysql_service.get_user_list(task_id) rule_dict = await mysql_service.get_rule_dict(task_id) CrawlerClass = get_spider_class(topic) crawler = CrawlerClass( rule_dict=rule_dict, user_list=user_list, trace_id=trace_id ) await crawler.run() # ack 由 run 成功后执行 await consumer.ack_message(message.receipt_handle) aliyun_logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id) except Exception as e: aliyun_logger.logging( code="9001", message=f"处理消息失败: {e}\n{traceback.format_exc()}", trace_id=trace_id, data=message.message_body, ) # 消费循环启动 await consumer.run_forever(handle_single_message) async def run_all_topics(topics: List[str]): """ 启动当前进程中所有 topic 的协程监听任务。 初始化全局 AsyncMysqlService 实例。 """ global mysql_service mysql_service = AsyncMysqlService() await mysql_service.init() # 初始化连接池 tasks = [asyncio.create_task(async_handle_topic(topic)) for topic in topics] await asyncio.gather(*tasks) def handle_topic_group(topics: List[str]): """ 子进程入口函数: 启动异步事件循环处理该组 topics。 """ asyncio.run(run_all_topics(topics)) # ------------------------------- 主调度部分 ------------------------------- def split_topics(topics: List[str], num_groups: int) -> List[List[str]]: """ 将所有 topic 平均划分为 num_groups 组,用于分配给子进程。 """ return [topics[i::num_groups] for i in range(num_groups)] def start_worker_process(group_id: int, topic_group: List[str], process_map: Dict[int, Process]): """ 启动一个子进程处理一组 topic。 """ p = Process(target=handle_topic_group, args=(topic_group,), name=f"Worker-{group_id}") p.start() process_map[group_id] = p print(f"[主进程] 启动进程 PID={p.pid} 处理 topics={topic_group}") def main(): """ 主调度入口: - 获取全部爬虫 topic; - 按 CPU 核心数分组; - 启动子进程运行; - 监控子进程状态,自动恢复。 """ topic_list = list(SPIDER_CLASS_MAP.keys()) print(f"[主进程] 监听 Topics: {topic_list}") num_cpus = cpu_count() topic_groups = split_topics(topic_list, num_cpus) print(f"[主进程] CPU 核心数: {num_cpus},将启动进程数: {len(topic_groups)}") process_map: Dict[int, Process] = {} for group_id, topic_group in enumerate(topic_groups): start_worker_process(group_id, topic_group, process_map) # 主进程持续监控子进程状态 try: while True: time.sleep(5) for group_id, p in list(process_map.items()): if not p.is_alive(): print(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...") time.sleep(2) start_worker_process(group_id, topic_groups[group_id], process_map) except KeyboardInterrupt: print("[主进程] 接收到退出信号,终止所有子进程...") for p in process_map.values(): p.terminate() for p in process_map.values(): p.join() if __name__ == '__main__': main()