main.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. import time
  2. import signal # 新增:导入signal模块
  3. from multiprocessing import Process, cpu_count
  4. from typing import Dict
  5. from datetime import datetime
  6. import asyncio
  7. from core.utils.log.logger_manager import LoggerManager
  8. from scheduler.process_manager import split_topics, start_worker_process
  9. from spiders.spider_registry import SPIDER_CLASS_MAP
  10. from scripts.activity_calculator import ActivityCalculator
  11. # 从 APScheduler 导入异步调度器
  12. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  13. from apscheduler.triggers.cron import CronTrigger
  14. # 全局关闭标志
  15. shutdown_flag = False
  16. def handle_signal(sig, frame):
  17. """信号处理函数"""
  18. global shutdown_flag
  19. logger = LoggerManager.get_logger()
  20. logger.warning(f"[主进程] 接收到信号 {sig},开始优雅关闭...")
  21. shutdown_flag = True
  22. async def run_activity_calculator_with_scheduler(shutdown_event):
  23. """使用 APScheduler 运行活跃度计算器 - 仅执行每日增量更新"""
  24. logger = LoggerManager.get_logger()
  25. logger.info("[活跃度计算器] AsyncIOScheduler 启动(仅增量更新)")
  26. scheduler = AsyncIOScheduler()
  27. # 添加每日凌晨0点的增量更新任务
  28. async def incremental_update_job():
  29. logger.info("[活跃度计算器] 开始执行每日增量更新")
  30. try:
  31. calc = ActivityCalculator(update_mode="incremental")
  32. try:
  33. await calc.initialize()
  34. result = await calc.calculate_and_update()
  35. logger.info(f"[活跃度计算器] 增量更新完成,处理了 {result} 个用户")
  36. finally:
  37. await calc.close()
  38. except Exception as e:
  39. logger.error(f"[活跃度计算器] 增量更新失败: {e}")
  40. scheduler.add_job(
  41. func=incremental_update_job,
  42. trigger=CronTrigger(hour=0, minute=0), # 每天凌晨0点执行
  43. id='incremental_update_daily',
  44. name='每日增量更新',
  45. misfire_grace_time=3600
  46. )
  47. # 启动调度器
  48. scheduler.start()
  49. logger.info("[活跃度计算器] AsyncIOScheduler 已启动并运行(仅增量更新)")
  50. # 等待关闭事件
  51. await shutdown_event.wait()
  52. # 关闭调度器
  53. scheduler.shutdown()
  54. logger.info("[活跃度计算器] AsyncIOScheduler 已关闭")
  55. async def main():
  56. logger = LoggerManager.get_logger()
  57. aliyun_log = LoggerManager.get_aliyun_logger()
  58. # 创建关闭事件
  59. shutdown_event = asyncio.Event()
  60. def handle_signal_async(sig, frame):
  61. """异步信号处理函数"""
  62. logger.warning(f"[主进程] 接收到信号 {sig},开始优雅关闭...")
  63. shutdown_event.set()
  64. # 注册信号处理器 - 关键新增
  65. signal.signal(signal.SIGTERM, handle_signal_async) # 处理 kill
  66. signal.signal(signal.SIGINT, handle_signal_async) # 处理 Ctrl+C
  67. signal.signal(signal.SIGHUP, handle_signal_async) # 处理终端断开
  68. logger.info(f"[主进程] 启动,PID={os.getpid()}") # 记录PID便于管理
  69. # 启动活跃度计算器调度器(仅增量更新)
  70. activity_scheduler_task = asyncio.create_task(run_activity_calculator_with_scheduler(shutdown_event))
  71. logger.info("[主进程] 活跃度计算器调度器已启动(仅增量更新)")
  72. topic_list = list(SPIDER_CLASS_MAP.keys())
  73. logger.info(f"[主进程] 监听 Topics: {topic_list}")
  74. max_processes = cpu_count()
  75. num_processes = min(len(topic_list), max_processes)
  76. # topic均分给进程
  77. topic_groups = split_topics(topic_list, num_processes)
  78. logger.info(f"[主进程] CPU 核心数: {max_processes},启动进程数: {num_processes}")
  79. process_map: Dict[int, Process] = {}
  80. for group_id, topic_group in enumerate(topic_groups):
  81. start_worker_process(group_id, topic_group, process_map)
  82. # 重启状态跟踪
  83. restart_count = {group_id: 0 for group_id in range(len(topic_groups))}
  84. last_restart_time = {group_id: 0 for group_id in range(len(topic_groups))}
  85. # 主进程持续监控子进程状态
  86. try:
  87. while not shutdown_event.is_set(): # 检查关闭标志
  88. await asyncio.sleep(5) # 使用异步sleep
  89. for group_id, p in list(process_map.items()):
  90. if shutdown_event.is_set(): # 如果正在关闭,跳过检查
  91. break
  92. if not p.is_alive():
  93. current_time = time.time()
  94. # 重启限制逻辑 - 防止无限重启
  95. if restart_count[group_id] > 5 and (current_time - last_restart_time[group_id]) < 60:
  96. logger.error(f"[监控] 进程 {p.name} 频繁崩溃,停止重启!")
  97. aliyun_log.logging(
  98. code="1502",
  99. message="进程频繁崩溃已停止重启",
  100. data={
  101. "pid": p.pid,
  102. "exitcode": p.exitcode,
  103. "restart_count": restart_count[group_id],
  104. "group_id": group_id,
  105. "topics": topic_groups[group_id]
  106. }
  107. )
  108. continue
  109. logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...")
  110. aliyun_log.logging(
  111. code="1501",
  112. message=f"[主进程监控] 子进程 {p.name} 崩溃重启",
  113. data={
  114. "pid": p.pid,
  115. "exitcode": p.exitcode,
  116. "group_id": group_id,
  117. "topics": topic_groups[group_id],
  118. "restart_count": restart_count[group_id] + 1
  119. }
  120. )
  121. # 更新重启状态
  122. restart_count[group_id] += 1
  123. last_restart_time[group_id] = current_time
  124. await asyncio.sleep(2) # 使用异步sleep
  125. start_worker_process(group_id, topic_groups[group_id], process_map)
  126. except KeyboardInterrupt:
  127. logger.warning("[主进程] 接收到退出信号,终止所有子进程...")
  128. shutdown_event.set()
  129. # 优雅终止所有子进程 - 统一处理关闭
  130. logger.info("[主进程] 终止所有子进程...")
  131. for p in process_map.values():
  132. if p.is_alive():
  133. p.terminate() # 先尝试正常终止
  134. # 等待子进程退出
  135. for p in process_map.values():
  136. p.join(timeout=5) # 最多等待5秒
  137. if p.is_alive():
  138. logger.warning(f"进程 {p.pid} 未正常退出,强制终止")
  139. p.kill() # 强制终止
  140. p.join()
  141. # 设置关闭事件并等待活动调度器完成
  142. shutdown_event.set()
  143. # 取消活动调度器任务
  144. if not activity_scheduler_task.done():
  145. activity_scheduler_task.cancel()
  146. try:
  147. await asyncio.wait_for(activity_scheduler_task, timeout=10.0)
  148. except asyncio.TimeoutError:
  149. logger.warning("[活跃度计算器] 调度器关闭超时")
  150. except asyncio.CancelledError:
  151. logger.info("[活跃度计算器] 调度器任务已被取消")
  152. logger.info("[主进程] 所有子进程已终止,退出")
  153. if __name__ == '__main__':
  154. import os
  155. asyncio.run(main())