import time import signal # 新增:导入signal模块 from multiprocessing import Process, cpu_count from typing import Dict from datetime import datetime import asyncio from core.utils.log.logger_manager import LoggerManager from scheduler.process_manager import split_topics, start_worker_process from spiders.spider_registry import SPIDER_CLASS_MAP from scripts.activity_calculator import ActivityCalculator # 从 APScheduler 导入异步调度器 from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger # 全局关闭标志 shutdown_flag = False def handle_signal(sig, frame): """信号处理函数""" global shutdown_flag logger = LoggerManager.get_logger() logger.warning(f"[主进程] 接收到信号 {sig},开始优雅关闭...") shutdown_flag = True async def run_activity_calculator_with_scheduler(shutdown_event): """使用 APScheduler 运行活跃度计算器 - 仅执行每日增量更新""" logger = LoggerManager.get_logger() logger.info("[活跃度计算器] AsyncIOScheduler 启动(仅增量更新)") scheduler = AsyncIOScheduler() # 添加每日凌晨0点的增量更新任务 async def incremental_update_job(): logger.info("[活跃度计算器] 开始执行每日增量更新") try: calc = ActivityCalculator(update_mode="incremental") try: await calc.initialize() result = await calc.calculate_and_update() logger.info(f"[活跃度计算器] 增量更新完成,处理了 {result} 个用户") finally: await calc.close() except Exception as e: logger.error(f"[活跃度计算器] 增量更新失败: {e}") scheduler.add_job( func=incremental_update_job, trigger=CronTrigger(hour=0, minute=0), # 每天凌晨0点执行 id='incremental_update_daily', name='每日增量更新', misfire_grace_time=3600 ) # 启动调度器 scheduler.start() logger.info("[活跃度计算器] AsyncIOScheduler 已启动并运行(仅增量更新)") # 等待关闭事件 await shutdown_event.wait() # 关闭调度器 scheduler.shutdown() logger.info("[活跃度计算器] AsyncIOScheduler 已关闭") async def main(): logger = LoggerManager.get_logger() aliyun_log = LoggerManager.get_aliyun_logger() # 创建关闭事件 shutdown_event = asyncio.Event() def handle_signal_async(sig, frame): """异步信号处理函数""" logger.warning(f"[主进程] 接收到信号 {sig},开始优雅关闭...") shutdown_event.set() # 注册信号处理器 - 关键新增 signal.signal(signal.SIGTERM, handle_signal_async) # 处理 kill signal.signal(signal.SIGINT, handle_signal_async) # 处理 Ctrl+C signal.signal(signal.SIGHUP, handle_signal_async) # 处理终端断开 logger.info(f"[主进程] 启动,PID={os.getpid()}") # 记录PID便于管理 # 启动活跃度计算器调度器(仅增量更新) activity_scheduler_task = asyncio.create_task(run_activity_calculator_with_scheduler(shutdown_event)) logger.info("[主进程] 活跃度计算器调度器已启动(仅增量更新)") topic_list = list(SPIDER_CLASS_MAP.keys()) logger.info(f"[主进程] 监听 Topics: {topic_list}") max_processes = cpu_count() num_processes = min(len(topic_list), max_processes) # topic均分给进程 topic_groups = split_topics(topic_list, num_processes) logger.info(f"[主进程] CPU 核心数: {max_processes},启动进程数: {num_processes}") process_map: Dict[int, Process] = {} for group_id, topic_group in enumerate(topic_groups): start_worker_process(group_id, topic_group, process_map) # 重启状态跟踪 restart_count = {group_id: 0 for group_id in range(len(topic_groups))} last_restart_time = {group_id: 0 for group_id in range(len(topic_groups))} # 主进程持续监控子进程状态 try: while not shutdown_event.is_set(): # 检查关闭标志 await asyncio.sleep(5) # 使用异步sleep for group_id, p in list(process_map.items()): if shutdown_event.is_set(): # 如果正在关闭,跳过检查 break if not p.is_alive(): current_time = time.time() # 重启限制逻辑 - 防止无限重启 if restart_count[group_id] > 5 and (current_time - last_restart_time[group_id]) < 60: logger.error(f"[监控] 进程 {p.name} 频繁崩溃,停止重启!") aliyun_log.logging( code="1502", message="进程频繁崩溃已停止重启", data={ "pid": p.pid, "exitcode": p.exitcode, "restart_count": restart_count[group_id], "group_id": group_id, "topics": topic_groups[group_id] } ) continue logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...") aliyun_log.logging( code="1501", message=f"[主进程监控] 子进程 {p.name} 崩溃重启", data={ "pid": p.pid, "exitcode": p.exitcode, "group_id": group_id, "topics": topic_groups[group_id], "restart_count": restart_count[group_id] + 1 } ) # 更新重启状态 restart_count[group_id] += 1 last_restart_time[group_id] = current_time await asyncio.sleep(2) # 使用异步sleep start_worker_process(group_id, topic_groups[group_id], process_map) except KeyboardInterrupt: logger.warning("[主进程] 接收到退出信号,终止所有子进程...") shutdown_event.set() # 优雅终止所有子进程 - 统一处理关闭 logger.info("[主进程] 终止所有子进程...") for p in process_map.values(): if p.is_alive(): p.terminate() # 先尝试正常终止 # 等待子进程退出 for p in process_map.values(): p.join(timeout=5) # 最多等待5秒 if p.is_alive(): logger.warning(f"进程 {p.pid} 未正常退出,强制终止") p.kill() # 强制终止 p.join() # 设置关闭事件并等待活动调度器完成 shutdown_event.set() # 取消活动调度器任务 if not activity_scheduler_task.done(): activity_scheduler_task.cancel() try: await asyncio.wait_for(activity_scheduler_task, timeout=10.0) except asyncio.TimeoutError: logger.warning("[活跃度计算器] 调度器关闭超时") except asyncio.CancelledError: logger.info("[活跃度计算器] 调度器任务已被取消") logger.info("[主进程] 所有子进程已终止,退出") if __name__ == '__main__': import os asyncio.run(main())