import time from multiprocessing import Process, cpu_count from typing import Dict from scheduler.process_manager import split_topics, start_worker_process from spiders.spider_registry import SPIDER_CLASS_MAP from core.utils.log.logger_manager import LoggerManager def main(): logger = LoggerManager.get_logger() aliyun_log = LoggerManager.get_aliyun_logger() """ 主调度入口: - 获取全部爬虫 topic; - 按 CPU 核心数分组; - 启动子进程运行; - 监控子进程状态,自动恢复。 """ topic_list = list(SPIDER_CLASS_MAP.keys()) logger.info(f"[主进程] 监听 Topics: {topic_list}") max_processes = cpu_count() num_processes = min(len(topic_list), max_processes) # 避免过多进程 # 将所有topic平均分配给子进程 topic_groups = split_topics(topic_list, num_processes) # [[1, 5, 9, 13], [2, 6, 10, 14], [3, 7, 11, 15], [4, 8, 12]] logger.info(f"[主进程] CPU 核心数: {max_processes},启动进程数: {num_processes}") process_map: Dict[int, Process] = {} for group_id, topic_group in enumerate(topic_groups): start_worker_process(group_id, topic_group, process_map) # 主进程持续监控子进程状态 try: while True: time.sleep(5) for group_id, p in list(process_map.items()): if not p.is_alive(): logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...") time.sleep(2) start_worker_process(group_id, topic_groups[group_id], process_map) except KeyboardInterrupt: logger.warning("[主进程] 接收到退出信号,终止所有子进程...") for p in process_map.values(): p.terminate() for p in process_map.values(): p.join() if __name__ == '__main__': main()