import multiprocessing from typing import List, Dict from core.utils.log.logger_manager import LoggerManager logger = LoggerManager.get_logger() aliyun_log = LoggerManager.get_aliyun_logger() def split_topics(topics: List[str], num_groups: int) -> List[List[str]]: """将所有 topic 平均划分为 num_groups 组,用于分配给子进程。""" return [topics[i::num_groups] for i in range(num_groups)] def start_worker_process(group_id: int, topic_group: List[str], process_map: Dict[int, multiprocessing.Process]): """启动一个子进程处理一组 topic。""" from scheduler.async_consumer import handle_topic_group p = multiprocessing.Process( target=handle_topic_group, args=(topic_group,), name=f"Worker-{group_id}" ) p.start() process_map[group_id] = p logger.info(f"[主进程] 启动进程 PID={p.pid} 处理 topics={topic_group}") aliyun_log.logging( code="1500", message=f"[主进程] 启动进程 PID={p.pid}", data=f"处理 topics={topic_group}" )