main.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import time
  2. from multiprocessing import Process, cpu_count
  3. from typing import Dict
  4. from core.utils.log.logger_manager import LoggerManager
  5. from scheduler.process_manager import split_topics, start_worker_process
  6. from spiders.spider_registry import SPIDER_CLASS_MAP
  7. def main():
  8. logger = LoggerManager.get_logger()
  9. aliyun_log = LoggerManager.get_aliyun_logger()
  10. """
  11. 主调度入口:
  12. - 获取全部爬虫 topic;
  13. - 按 CPU 核心数分组;
  14. - 启动子进程运行;
  15. - 监控子进程状态,自动恢复。
  16. """
  17. topic_list = list(SPIDER_CLASS_MAP.keys())
  18. logger.info(f"[主进程] 监听 Topics: {topic_list}")
  19. max_processes = cpu_count()
  20. num_processes = min(len(topic_list), max_processes) # 避免过多进程
  21. # 将所有topic平均分配给子进程
  22. topic_groups = split_topics(topic_list,
  23. num_processes) # [[1, 5, 9, 13], [2, 6, 10, 14], [3, 7, 11, 15], [4, 8, 12]]
  24. logger.info(f"[主进程] CPU 核心数: {max_processes},启动进程数: {num_processes}")
  25. process_map: Dict[int, Process] = {}
  26. for group_id, topic_group in enumerate(topic_groups):
  27. start_worker_process(group_id, topic_group, process_map)
  28. # 主进程持续监控子进程状态
  29. try:
  30. while True:
  31. time.sleep(5)
  32. for group_id, p in list(process_map.items()):
  33. if not p.is_alive():
  34. logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...")
  35. # 上报阿里云日志
  36. aliyun_log.logging(
  37. code="1501",
  38. message=f"[主进程监控] 子进程 {p.name} 崩溃重启",
  39. data={
  40. "pid": p.pid,
  41. "exitcode": p.exitcode,
  42. "group_id": group_id,
  43. "topics": topic_groups[group_id]
  44. }
  45. )
  46. time.sleep(2)
  47. start_worker_process(group_id, topic_groups[group_id], process_map)
  48. except KeyboardInterrupt:
  49. logger.warning("[主进程] 接收到退出信号,终止所有子进程...")
  50. for p in process_map.values():
  51. p.terminate()
  52. for p in process_map.values():
  53. p.join()
  54. if __name__ == '__main__':
  55. main()