import time from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Callable, Sequence def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int = 10) -> None: """ 通用任务处理器,将任务分批并发执行。 :param tasks: 一个可迭代对象,每个元素是一个 callable(无需参数) :param max_workers: 最大并发数 """ total_tasks = len(tasks) task_counter = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: future_tasks = {} for task in tasks: task_counter += 1 print(f"提交任务: 第 {task_counter}/{total_tasks} 个任务") # 提交任务 future = executor.submit(task) future_tasks[future] = (task, task_counter) time.sleep(0.01) # 控制每批次提交的任务数 if len(future_tasks) == max_workers or task_counter == total_tasks: # 等待当前批次完成 for future in as_completed(future_tasks): task, counter = future_tasks[future] try: # 获取任务执行结果 future.result() print(f"任务完成: 第 {counter}/{total_tasks} 个任务") except Exception as exc: print(f"任务出错: 第 {counter}/{total_tasks} 个任务出错, {exc}") # 清空当前批次任务 future_tasks = {}