import time import signal # 新增:导入signal模块 from multiprocessing import Process, cpu_count from typing import Dict from core.utils.log.logger_manager import LoggerManager from scheduler.process_manager import split_topics, start_worker_process from spiders.spider_registry import SPIDER_CLASS_MAP # 全局关闭标志 shutdown_flag = False def handle_signal(sig, frame): """信号处理函数""" global shutdown_flag logger = LoggerManager.get_logger() logger.warning(f"[主进程] 接收到信号 {sig},开始优雅关闭...") shutdown_flag = True def main(): global shutdown_flag logger = LoggerManager.get_logger() aliyun_log = LoggerManager.get_aliyun_logger() # 注册信号处理器 - 关键新增 signal.signal(signal.SIGTERM, handle_signal) # 处理 kill signal.signal(signal.SIGINT, handle_signal) # 处理 Ctrl+C signal.signal(signal.SIGHUP, handle_signal) # 处理终端断开 logger.info(f"[主进程] 启动,PID={os.getpid()}") # 记录PID便于管理 topic_list = list(SPIDER_CLASS_MAP.keys()) logger.info(f"[主进程] 监听 Topics: {topic_list}") max_processes = cpu_count() num_processes = min(len(topic_list), max_processes) topic_groups = split_topics(topic_list, num_processes) logger.info(f"[主进程] CPU 核心数: {max_processes},启动进程数: {num_processes}") process_map: Dict[int, Process] = {} for group_id, topic_group in enumerate(topic_groups): start_worker_process(group_id, topic_group, process_map) # 重启状态跟踪 - 新增重启限制 restart_count = {group_id: 0 for group_id in range(len(topic_groups))} last_restart_time = {group_id: 0 for group_id in range(len(topic_groups))} # 主进程持续监控子进程状态 try: while not shutdown_flag: # 检查关闭标志 time.sleep(5) for group_id, p in list(process_map.items()): if shutdown_flag: # 如果正在关闭,跳过检查 break if not p.is_alive(): current_time = time.time() # 重启限制逻辑 - 防止无限重启 if restart_count[group_id] > 5 and (current_time - last_restart_time[group_id]) < 60: logger.error(f"[监控] 进程 {p.name} 频繁崩溃,停止重启!") aliyun_log.logging( code="1502", message="进程频繁崩溃已停止重启", data={ "pid": p.pid, "exitcode": p.exitcode, "restart_count": restart_count[group_id], "group_id": group_id, "topics": topic_groups[group_id] } ) continue logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...") aliyun_log.logging( code="1501", message=f"[主进程监控] 子进程 {p.name} 崩溃重启", data={ "pid": p.pid, "exitcode": p.exitcode, "group_id": group_id, "topics": topic_groups[group_id], "restart_count": restart_count[group_id] + 1 } ) # 更新重启状态 restart_count[group_id] += 1 last_restart_time[group_id] = current_time time.sleep(2) start_worker_process(group_id, topic_groups[group_id], process_map) except KeyboardInterrupt: logger.warning("[主进程] 接收到退出信号,终止所有子进程...") shutdown_flag = True # 优雅终止所有子进程 - 统一处理关闭 logger.info("[主进程] 终止所有子进程...") for p in process_map.values(): if p.is_alive(): p.terminate() # 先尝试正常终止 # 等待子进程退出 for p in process_map.values(): p.join(timeout=5) # 最多等待5秒 if p.is_alive(): logger.warning(f"进程 {p.pid} 未正常退出,强制终止") p.kill() # 强制终止 p.join() logger.info("[主进程] 所有子进程已终止,退出") if __name__ == '__main__': import os main()