12345678910111213141516171819202122232425262728293031 |
- import multiprocessing
- from typing import List, Dict
- from core.utils.log.logger_manager import LoggerManager
- logger = LoggerManager.get_logger()
- aliyun_log = LoggerManager.get_aliyun_logger()
- def split_topics(topics: List[str], num_groups: int) -> List[List[str]]:
- """将所有 topic 平均划分为 num_groups 组,用于分配给子进程。"""
- return [topics[i::num_groups] for i in range(num_groups)]
- def start_worker_process(group_id: int, topic_group: List[str], process_map: Dict[int, multiprocessing.Process]):
- """启动一个子进程处理一组 topic。"""
- from scheduler.async_consumer import handle_topic_group
- p = multiprocessing.Process(
- target=handle_topic_group,
- args=(topic_group,),
- name=f"Worker-{group_id}"
- )
- p.start()
- process_map[group_id] = p
- logger.info(f"[主进程] 启动进程 PID={p.pid} 处理 topics={topic_group}")
- aliyun_log.logging(
- code="1500",
- message=f"[主进程] 启动进程 PID={p.pid}",
- data=f"处理 topics={topic_group}"
- )
|