123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- import time
- from multiprocessing import Process, cpu_count
- from typing import Dict
- from core.utils.log.logger_manager import LoggerManager
- from scheduler.process_manager import split_topics, start_worker_process
- from spiders.spider_registry import SPIDER_CLASS_MAP
- def main():
- logger = LoggerManager.get_logger()
- aliyun_log = LoggerManager.get_aliyun_logger()
- """
- 主调度入口:
- - 获取全部爬虫 topic;
- - 按 CPU 核心数分组;
- - 启动子进程运行;
- - 监控子进程状态,自动恢复。
- """
- topic_list = list(SPIDER_CLASS_MAP.keys())
- logger.info(f"[主进程] 监听 Topics: {topic_list}")
- max_processes = cpu_count()
- num_processes = min(len(topic_list), max_processes) # 避免过多进程
- # 将所有topic平均分配给子进程
- topic_groups = split_topics(topic_list,
- num_processes) # [[1, 5, 9, 13], [2, 6, 10, 14], [3, 7, 11, 15], [4, 8, 12]]
- logger.info(f"[主进程] CPU 核心数: {max_processes},启动进程数: {num_processes}")
- process_map: Dict[int, Process] = {}
- for group_id, topic_group in enumerate(topic_groups):
- start_worker_process(group_id, topic_group, process_map)
- # 主进程持续监控子进程状态
- try:
- while True:
- time.sleep(5)
- for group_id, p in list(process_map.items()):
- if not p.is_alive():
- logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...")
- # 上报阿里云日志
- aliyun_log.logging(
- code="1501",
- message=f"[主进程监控] 子进程 {p.name} 崩溃重启",
- data={
- "pid": p.pid,
- "exitcode": p.exitcode,
- "group_id": group_id,
- "topics": topic_groups[group_id]
- }
- )
- time.sleep(2)
- start_worker_process(group_id, topic_groups[group_id], process_map)
- except KeyboardInterrupt:
- logger.warning("[主进程] 接收到退出信号,终止所有子进程...")
- for p in process_map.values():
- p.terminate()
- for p in process_map.values():
- p.join()
- if __name__ == '__main__':
- main()
|