thread_util.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. import time
  2. from concurrent.futures import ThreadPoolExecutor, as_completed
  3. from typing import Callable, Sequence
  4. def process_tasks(tasks: Sequence[Callable[[], None]], max_workers: int = 10) -> None:
  5. """
  6. 通用任务处理器,将任务分批并发执行。
  7. :param tasks: 一个可迭代对象,每个元素是一个 callable(无需参数)
  8. :param max_workers: 最大并发数
  9. """
  10. total_tasks = len(tasks)
  11. task_counter = 0
  12. with ThreadPoolExecutor(max_workers=max_workers) as executor:
  13. future_tasks = {}
  14. for task in tasks:
  15. task_counter += 1
  16. print(f"提交任务: 第 {task_counter}/{total_tasks} 个任务")
  17. # 提交任务
  18. future = executor.submit(task)
  19. future_tasks[future] = (task, task_counter)
  20. time.sleep(0.01)
  21. # 控制每批次提交的任务数
  22. if len(future_tasks) == max_workers or task_counter == total_tasks:
  23. # 等待当前批次完成
  24. for future in as_completed(future_tasks):
  25. task, counter = future_tasks[future]
  26. try:
  27. # 获取任务执行结果
  28. future.result()
  29. print(f"任务完成: 第 {counter}/{total_tasks} 个任务")
  30. except Exception as exc:
  31. print(f"任务出错: 第 {counter}/{total_tasks} 个任务出错, {exc}")
  32. # 清空当前批次任务
  33. future_tasks = {}