zhangliang 14 часов назад
Родитель
Сommit
15f1b4e2d8
1 измененных файлов с 73 добавлено и 18 удалено
  1. 73 18
      main.py

+ 73 - 18
main.py

@@ -1,4 +1,5 @@
 import time
+import signal  # 新增:导入signal模块
 from multiprocessing import Process, cpu_count
 from typing import Dict
 
@@ -6,27 +7,36 @@ from core.utils.log.logger_manager import LoggerManager
 from scheduler.process_manager import split_topics, start_worker_process
 from spiders.spider_registry import SPIDER_CLASS_MAP
 
+# 全局关闭标志
+shutdown_flag = False
 
 
+def handle_signal(sig, frame):
+    """信号处理函数"""
+    global shutdown_flag
+    logger = LoggerManager.get_logger()
+    logger.warning(f"[主进程] 接收到信号 {sig},开始优雅关闭...")
+    shutdown_flag = True
+
 
 def main():
+    global shutdown_flag
     logger = LoggerManager.get_logger()
     aliyun_log = LoggerManager.get_aliyun_logger()
-    """
-    主调度入口:
-    - 获取全部爬虫 topic;
-    - 按 CPU 核心数分组;
-    - 启动子进程运行;
-    - 监控子进程状态,自动恢复。
-    """
+
+    # 注册信号处理器 - 关键新增
+    signal.signal(signal.SIGTERM, handle_signal)  # 处理 kill
+    signal.signal(signal.SIGINT, handle_signal)  # 处理 Ctrl+C
+    signal.signal(signal.SIGHUP, handle_signal)  # 处理终端断开
+
+    logger.info(f"[主进程] 启动,PID={os.getpid()}")  # 记录PID便于管理
+
     topic_list = list(SPIDER_CLASS_MAP.keys())
     logger.info(f"[主进程] 监听 Topics: {topic_list}")
 
     max_processes = cpu_count()
-    num_processes = min(len(topic_list), max_processes)  # 避免过多进程
-    # 将所有topic平均分配给子进程
-    topic_groups = split_topics(topic_list,
-                                num_processes)  # [[1, 5, 9, 13], [2, 6, 10, 14], [3, 7, 11, 15], [4, 8, 12]]
+    num_processes = min(len(topic_list), max_processes)
+    topic_groups = split_topics(topic_list, num_processes)
     logger.info(f"[主进程] CPU 核心数: {max_processes},启动进程数: {num_processes}")
 
     process_map: Dict[int, Process] = {}
@@ -34,14 +44,37 @@ def main():
     for group_id, topic_group in enumerate(topic_groups):
         start_worker_process(group_id, topic_group, process_map)
 
+    # 重启状态跟踪 - 新增重启限制
+    restart_count = {group_id: 0 for group_id in range(len(topic_groups))}
+    last_restart_time = {group_id: 0 for group_id in range(len(topic_groups))}
+
     # 主进程持续监控子进程状态
     try:
-        while True:
+        while not shutdown_flag:  # 检查关闭标志
             time.sleep(5)
             for group_id, p in list(process_map.items()):
+                if shutdown_flag:  # 如果正在关闭,跳过检查
+                    break
+
                 if not p.is_alive():
+                    current_time = time.time()
+                    # 重启限制逻辑 - 防止无限重启
+                    if restart_count[group_id] > 5 and (current_time - last_restart_time[group_id]) < 60:
+                        logger.error(f"[监控] 进程 {p.name} 频繁崩溃,停止重启!")
+                        aliyun_log.logging(
+                            code="1502",
+                            message="进程频繁崩溃已停止重启",
+                            data={
+                                "pid": p.pid,
+                                "exitcode": p.exitcode,
+                                "restart_count": restart_count[group_id],
+                                "group_id": group_id,
+                                "topics": topic_groups[group_id]
+                            }
+                        )
+                        continue
+
                     logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...")
-                    # 上报阿里云日志
                     aliyun_log.logging(
                         code="1501",
                         message=f"[主进程监控] 子进程 {p.name} 崩溃重启",
@@ -49,18 +82,40 @@ def main():
                             "pid": p.pid,
                             "exitcode": p.exitcode,
                             "group_id": group_id,
-                            "topics": topic_groups[group_id]
+                            "topics": topic_groups[group_id],
+                            "restart_count": restart_count[group_id] + 1
                         }
                     )
+
+                    # 更新重启状态
+                    restart_count[group_id] += 1
+                    last_restart_time[group_id] = current_time
+
                     time.sleep(2)
                     start_worker_process(group_id, topic_groups[group_id], process_map)
+
     except KeyboardInterrupt:
         logger.warning("[主进程] 接收到退出信号,终止所有子进程...")
-        for p in process_map.values():
-            p.terminate()
-        for p in process_map.values():
+        shutdown_flag = True
+
+    # 优雅终止所有子进程 - 统一处理关闭
+    logger.info("[主进程] 终止所有子进程...")
+    for p in process_map.values():
+        if p.is_alive():
+            p.terminate()  # 先尝试正常终止
+
+    # 等待子进程退出
+    for p in process_map.values():
+        p.join(timeout=5)  # 最多等待5秒
+        if p.is_alive():
+            logger.warning(f"进程 {p.pid} 未正常退出,强制终止")
+            p.kill()  # 强制终止
             p.join()
 
+    logger.info("[主进程] 所有子进程已终止,退出")
+
 
 if __name__ == '__main__':
-    main()
+    import os
+
+    main()