main.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. import time
  2. from multiprocessing import Process, cpu_count
  3. from typing import Dict
  4. from scheduler.process_manager import split_topics, start_worker_process
  5. from spiders.spider_registry import SPIDER_CLASS_MAP
  6. from core.utils.log.logger_manager import LoggerManager
  7. def main():
  8. logger = LoggerManager.get_logger()
  9. aliyun_log = LoggerManager.get_aliyun_logger()
  10. """
  11. 主调度入口:
  12. - 获取全部爬虫 topic;
  13. - 按 CPU 核心数分组;
  14. - 启动子进程运行;
  15. - 监控子进程状态,自动恢复。
  16. """
  17. topic_list = list(SPIDER_CLASS_MAP.keys())
  18. logger.info(f"[主进程] 监听 Topics: {topic_list}")
  19. max_processes = cpu_count()
  20. num_processes = min(len(topic_list), max_processes) # 避免过多进程
  21. # 将所有topic平均分配给子进程
  22. topic_groups = split_topics(topic_list,
  23. num_processes) # [[1, 5, 9, 13], [2, 6, 10, 14], [3, 7, 11, 15], [4, 8, 12]]
  24. logger.info(f"[主进程] CPU 核心数: {max_processes},启动进程数: {num_processes}")
  25. process_map: Dict[int, Process] = {}
  26. for group_id, topic_group in enumerate(topic_groups):
  27. start_worker_process(group_id, topic_group, process_map)
  28. # 主进程持续监控子进程状态
  29. try:
  30. while True:
  31. time.sleep(5)
  32. for group_id, p in list(process_map.items()):
  33. if not p.is_alive():
  34. logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...")
  35. time.sleep(2)
  36. start_worker_process(group_id, topic_groups[group_id], process_map)
  37. except KeyboardInterrupt:
  38. logger.warning("[主进程] 接收到退出信号,终止所有子进程...")
  39. for p in process_map.values():
  40. p.terminate()
  41. for p in process_map.values():
  42. p.join()
  43. if __name__ == '__main__':
  44. main()