main.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import time
  2. from multiprocessing import Process, cpu_count
  3. from typing import Dict
  4. from core.utils.log.logger_manager import LoggerManager
  5. from scheduler.process_manager import split_topics, start_worker_process
  6. from spiders.spider_registry import SPIDER_CLASS_MAP
  7. from core.utils import CODES
  8. def main():
  9. logger = LoggerManager.get_logger()
  10. aliyun_log = LoggerManager.get_aliyun_logger()
  11. """
  12. 主调度入口:
  13. - 获取全部爬虫 topic;
  14. - 按 CPU 核心数分组;
  15. - 启动子进程运行;
  16. - 监控子进程状态,自动恢复。
  17. """
  18. topic_list = list(SPIDER_CLASS_MAP.keys())
  19. logger.info(f"[主进程] 监听 Topics: {topic_list}")
  20. max_processes = cpu_count()
  21. num_processes = min(len(topic_list), max_processes) # 避免过多进程
  22. # 将所有topic平均分配给子进程
  23. topic_groups = split_topics(topic_list,
  24. num_processes) # [[1, 5, 9, 13], [2, 6, 10, 14], [3, 7, 11, 15], [4, 8, 12]]
  25. logger.info(f"[主进程] CPU 核心数: {max_processes},启动进程数: {num_processes}")
  26. process_map: Dict[int, Process] = {}
  27. for group_id, topic_group in enumerate(topic_groups):
  28. start_worker_process(group_id, topic_group, process_map)
  29. # 主进程持续监控子进程状态
  30. try:
  31. while True:
  32. time.sleep(5)
  33. for group_id, p in list(process_map.items()):
  34. if not p.is_alive():
  35. logger.warning(f"[监控] 进程 {p.name} PID={p.pid} 已崩溃,正在重启...")
  36. # 上报阿里云日志
  37. aliyun_log.logging(
  38. code="1501",
  39. message=f"[主进程监控] 子进程 {p.name} 崩溃重启",
  40. data={
  41. "pid": p.pid,
  42. "exitcode": p.exitcode,
  43. "group_id": group_id,
  44. "topics": topic_groups[group_id]
  45. }
  46. )
  47. time.sleep(2)
  48. start_worker_process(group_id, topic_groups[group_id], process_map)
  49. except KeyboardInterrupt:
  50. logger.warning("[主进程] 接收到退出信号,终止所有子进程...")
  51. for p in process_map.values():
  52. p.terminate()
  53. for p in process_map.values():
  54. p.join()
  55. if __name__ == '__main__':
  56. main()