process_manager.py 911 B

123456789101112131415161718192021222324
  1. import multiprocessing
  2. from typing import List, Dict
  3. from core.utils.log.logger_manager import LoggerManager
  4. logger = LoggerManager.get_logger()
  5. aliyun_log = LoggerManager.get_aliyun_logger()
  6. def split_topics(topics: List[str], num_groups: int) -> List[List[str]]:
  7. """将所有 topic 平均划分为 num_groups 组,用于分配给子进程。"""
  8. return [topics[i::num_groups] for i in range(num_groups)]
  9. def start_worker_process(group_id: int, topic_group: List[str], process_map: Dict[int, multiprocessing.Process]):
  10. """启动一个子进程处理一组 topic。"""
  11. from scheduler.async_consumer import handle_topic_group
  12. p = multiprocessing.Process(
  13. target=handle_topic_group,
  14. args=(topic_group,),
  15. name=f"Worker-{group_id}"
  16. )
  17. p.start()
  18. process_map[group_id] = p
  19. logger.info(f"[主进程] 启动进程 PID={p.pid} 处理 topics={topic_group}")