| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- import time
- import signal # 新增:导入signal模块
- from multiprocessing import Process, cpu_count
- from typing import Dict
- from datetime import datetime
- import asyncio
- from core.utils.log.logger_manager import LoggerManager
- from scheduler.process_manager import split_topics, start_worker_process
- from spiders.spider_registry import SPIDER_CLASS_MAP
- from scripts.activity_calculator import ActivityCalculator
- # 从 APScheduler 导入异步调度器
- from apscheduler.schedulers.asyncio import AsyncIOScheduler
- from apscheduler.triggers.cron import CronTrigger
- # 全局关闭标志
- shutdown_flag = False
- def handle_signal(sig, frame):
- """信号处理函数"""
- global shutdown_flag
- logger = LoggerManager.get_logger()
- logger.warning(f"[主进程] 接收到信号 {sig},开始优雅关闭...")
- shutdown_flag = True
- async def run_activity_calculator_with_scheduler(shutdown_event):
- """使用 APScheduler 运行活跃度计算器 - 仅执行每日增量更新"""
- logger = LoggerManager.get_logger()
- logger.info("[活跃度计算器] AsyncIOScheduler 启动(仅增量更新)")
-
- scheduler = AsyncIOScheduler()
-
- # 添加每日凌晨0点的增量更新任务
- async def incremental_update_job():
- logger.info("[活跃度计算器] 开始执行每日增量更新")
- try:
- calc = ActivityCalculator(update_mode="incremental")
- try:
- await calc.initialize()
- result = await calc.calculate_and_update()
- logger.info(f"[活跃度计算器] 增量更新完成,处理了 {result} 个用户")
- finally:
- await calc.close()
- except Exception as e:
- logger.error(f"[活跃度计算器] 增量更新失败: {e}")
-
- scheduler.add_job(
- func=incremental_update_job,
- trigger=CronTrigger(hour=0, minute=0), # 每天凌晨0点执行
- id='incremental_update_daily',
- name='每日增量更新',
- misfire_grace_time=3600
- )
-
- # 启动调度器
- scheduler.start()
- logger.info("[活跃度计算器] AsyncIOScheduler 已启动并运行(仅增量更新)")
-
- # 等待关闭事件
- await shutdown_event.wait()
-
- # 关闭调度器
- scheduler.shutdown()
- logger.info("[活跃度计算器] AsyncIOScheduler 已关闭")
- async def main():
- logger = LoggerManager.get_logger()
- aliyun_log = LoggerManager.get_aliyun_logger()
- # 创建关闭事件
- shutdown_event = asyncio.Event()
- def handle_signal_async(sig, frame):
- """异步信号处理函数"""
- logger.warning(f"[主进程] 接收到信号 {sig},开始优雅关闭...")
- shutdown_event.set()
- # 注册信号处理器 - 关键新增
- signal.signal(signal.SIGTERM, handle_signal_async) # 处理 kill
- signal.signal(signal.SIGINT, handle_signal_async) # 处理 Ctrl+C
- signal.signal(signal.SIGHUP, handle_signal_async) # 处理终端断开
- logger.info(f"[主进程] 启动,PID={os.getpid()}") # 记录PID便于管理
- # 启动活跃度计算器调度器(仅增量更新)
- activity_scheduler_task = asyncio.create_task(run_activity_calculator_with_scheduler(shutdown_event))
- logger.info("[主进程] 活跃度计算器调度器已启动(仅增量更新)")
- topic_list = list(SPIDER_CLASS_MAP.keys())
- logger.info(f"[主进程] 监听 Topics: {topic_list}")
- max_processes = cpu_count()
- num_processes = min(len(topic_list), max_processes)
- # topic均分给进程
- topic_groups = split_topics(topic_list, num_processes)
- logger.info(f"[主进程] CPU 核心数: {max_processes},启动进程数: {num_processes}")
- process_map: Dict[int, Process] = {}
- for group_id, topic_group in enumerate(topic_groups):
- start_worker_process(group_id, topic_group, process_map)
- # 重启状态跟踪
- restart_count = {group_id: 0 for group_id in range(len(topic_groups))}
- last_restart_time = {group_id: 0 for group_id in range(len(topic_groups))}
- # 主进程持续监控子进程状态
- try:
- while not shutdown_event.is_set(): # 检查关闭标志
- await asyncio.sleep(5) # 使用异步sleep
- for group_id, p in list(process_map.items()):
- if shutdown_event.is_set(): # 如果正在关闭,跳过检查
- break
- if not p.is_alive():
- current_time = time.time()
- # 重启限制逻辑 - 防止无限重启
- if restart_count[group_id] > 5 and (current_time - last_restart_time[group_id]) < 60:
- logger.error(f"[监控] 进程 {p.name} 频繁崩溃,停止重启!")
- aliyun_log.logging(
- code="1502",
- message="进程频繁崩溃已停止重启",
- data={
- "pid": p.pid,
- "exitcode": p.exitcode,
- "restart_count": restart_count[group_id],
- "group_id": group_id,
- "topics": topic_groups[group_id]
- }
- )
- continue
- logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...")
- aliyun_log.logging(
- code="1501",
- message=f"[主进程监控] 子进程 {p.name} 崩溃重启",
- data={
- "pid": p.pid,
- "exitcode": p.exitcode,
- "group_id": group_id,
- "topics": topic_groups[group_id],
- "restart_count": restart_count[group_id] + 1
- }
- )
- # 更新重启状态
- restart_count[group_id] += 1
- last_restart_time[group_id] = current_time
- await asyncio.sleep(2) # 使用异步sleep
- start_worker_process(group_id, topic_groups[group_id], process_map)
- except KeyboardInterrupt:
- logger.warning("[主进程] 接收到退出信号,终止所有子进程...")
- shutdown_event.set()
- # 优雅终止所有子进程 - 统一处理关闭
- logger.info("[主进程] 终止所有子进程...")
- for p in process_map.values():
- if p.is_alive():
- p.terminate() # 先尝试正常终止
- # 等待子进程退出
- for p in process_map.values():
- p.join(timeout=5) # 最多等待5秒
- if p.is_alive():
- logger.warning(f"进程 {p.pid} 未正常退出,强制终止")
- p.kill() # 强制终止
- p.join()
- # 设置关闭事件并等待活动调度器完成
- shutdown_event.set()
-
- # 取消活动调度器任务
- if not activity_scheduler_task.done():
- activity_scheduler_task.cancel()
- try:
- await asyncio.wait_for(activity_scheduler_task, timeout=10.0)
- except asyncio.TimeoutError:
- logger.warning("[活跃度计算器] 调度器关闭超时")
- except asyncio.CancelledError:
- logger.info("[活跃度计算器] 调度器任务已被取消")
- logger.info("[主进程] 所有子进程已终止,退出")
- if __name__ == '__main__':
- import os
- asyncio.run(main())
|