| 1234567891011121314151617181920212223242526272829303132333435363738394041 |
- import time
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from typing import Callable, Sequence
- def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int = 10) -> None:
- """
- 通用任务处理器,将任务分批并发执行。
- :param tasks: 一个可迭代对象,每个元素是一个 callable(无需参数)
- :param max_workers: 最大并发数
- """
- total_tasks = len(tasks)
- task_counter = 0
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- future_tasks = {}
- for task in tasks:
- task_counter += 1
- print(f"提交任务: 第 {task_counter}/{total_tasks} 个任务")
- # 提交任务
- future = executor.submit(task)
- future_tasks[future] = (task, task_counter)
- time.sleep(0.01)
- # 控制每批次提交的任务数
- if len(future_tasks) == max_workers or task_counter == total_tasks:
- # 等待当前批次完成
- for future in as_completed(future_tasks):
- task, counter = future_tasks[future]
- try:
- # 获取任务执行结果
- future.result()
- print(f"任务完成: 第 {counter}/{total_tasks} 个任务")
- except Exception as exc:
- print(f"任务出错: 第 {counter}/{total_tasks} 个任务出错, {exc}")
- # 清空当前批次任务
- future_tasks = {}
|