main.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import time
  2. import signal # 新增:导入signal模块
  3. from multiprocessing import Process, cpu_count
  4. from typing import Dict
  5. import warnings
  6. from core.utils.log.logger_manager import LoggerManager
  7. from scheduler.process_manager import split_topics, start_worker_process
  8. from spiders.spider_registry import SPIDER_CLASS_MAP
  9. # 禁用 urllib3 InsecureRequestWarning 警告
  10. warnings.filterwarnings('ignore', message='Unverified HTTPS request')
  11. # 全局关闭标志
  12. shutdown_flag = False
  13. def handle_signal(sig, frame):
  14. """信号处理函数"""
  15. global shutdown_flag
  16. logger = LoggerManager.get_logger()
  17. logger.warning(f"[主进程] 接收到信号 {sig},开始优雅关闭...")
  18. shutdown_flag = True
  19. def main():
  20. global shutdown_flag
  21. logger = LoggerManager.get_logger()
  22. aliyun_log = LoggerManager.get_aliyun_logger()
  23. # 注册信号处理器 - 关键新增
  24. signal.signal(signal.SIGTERM, handle_signal) # 处理 kill
  25. signal.signal(signal.SIGINT, handle_signal) # 处理 Ctrl+C
  26. signal.signal(signal.SIGHUP, handle_signal) # 处理终端断开
  27. logger.info(f"[主进程] 启动,PID={os.getpid()}") # 记录PID便于管理
  28. topic_list = list(SPIDER_CLASS_MAP.keys())
  29. logger.info(f"[主进程] 监听 Topics: {topic_list}")
  30. max_processes = cpu_count()
  31. num_processes = min(len(topic_list), max_processes)
  32. # topic均分给进程
  33. topic_groups = split_topics(topic_list, num_processes)
  34. logger.info(f"[主进程] CPU 核心数: {max_processes},启动进程数: {num_processes}")
  35. process_map: Dict[int, Process] = {}
  36. for group_id, topic_group in enumerate(topic_groups):
  37. start_worker_process(group_id, topic_group, process_map)
  38. # 重启状态跟踪
  39. restart_count = {group_id: 0 for group_id in range(len(topic_groups))}
  40. last_restart_time = {group_id: 0 for group_id in range(len(topic_groups))}
  41. # 主进程持续监控子进程状态
  42. try:
  43. while not shutdown_flag: # 检查关闭标志
  44. time.sleep(5)
  45. for group_id, p in list(process_map.items()):
  46. if shutdown_flag: # 如果正在关闭,跳过检查
  47. break
  48. if not p.is_alive():
  49. current_time = time.time()
  50. # 重启限制逻辑 - 防止无限重启
  51. if restart_count[group_id] > 5 and (current_time - last_restart_time[group_id]) < 60:
  52. logger.error(f"[监控] 进程 {p.name} 频繁崩溃,停止重启!")
  53. aliyun_log.logging(
  54. code="1502",
  55. message="进程频繁崩溃已停止重启",
  56. data={
  57. "pid": p.pid,
  58. "exitcode": p.exitcode,
  59. "restart_count": restart_count[group_id],
  60. "group_id": group_id,
  61. "topics": topic_groups[group_id]
  62. }
  63. )
  64. continue
  65. logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...")
  66. aliyun_log.logging(
  67. code="1501",
  68. message=f"[主进程监控] 子进程 {p.name} 崩溃重启",
  69. data={
  70. "pid": p.pid,
  71. "exitcode": p.exitcode,
  72. "group_id": group_id,
  73. "topics": topic_groups[group_id],
  74. "restart_count": restart_count[group_id] + 1
  75. }
  76. )
  77. # 更新重启状态
  78. restart_count[group_id] += 1
  79. last_restart_time[group_id] = current_time
  80. time.sleep(2)
  81. start_worker_process(group_id, topic_groups[group_id], process_map)
  82. except KeyboardInterrupt:
  83. logger.warning("[主进程] 接收到退出信号,终止所有子进程...")
  84. shutdown_flag = True
  85. # 优雅终止所有子进程 - 统一处理关闭
  86. logger.info("[主进程] 终止所有子进程...")
  87. for p in process_map.values():
  88. if p.is_alive():
  89. p.terminate() # 先尝试正常终止
  90. # 等待子进程退出
  91. for p in process_map.values():
  92. p.join(timeout=5) # 最多等待5秒
  93. if p.is_alive():
  94. logger.warning(f"进程 {p.pid} 未正常退出,强制终止")
  95. p.kill() # 强制终止
  96. p.join()
  97. logger.info("[主进程] 所有子进程已终止,退出")
  98. if __name__ == '__main__':
  99. import os
  100. main()