async_utils.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. """
  2. 异步并发处理工具模块
  3. 提供通用的异步任务并发执行功能
  4. """
  5. import asyncio
  6. from typing import List, Callable, Any, Awaitable
  7. async def process_tasks_with_semaphore(
  8. tasks: List[Any],
  9. process_func: Callable[[Any, int], Awaitable[Any]],
  10. max_concurrent: int = 3,
  11. show_progress: bool = True
  12. ) -> List[Any]:
  13. """使用信号量控制并发数量处理任务
  14. Args:
  15. tasks: 任务列表
  16. process_func: 处理单个任务的异步函数,签名为 async def func(task, index) -> result
  17. max_concurrent: 最大并发数
  18. show_progress: 是否显示进度信息
  19. Returns:
  20. 结果列表(保持原始顺序)
  21. Example:
  22. async def process_one(task, index):
  23. result = await some_async_operation(task)
  24. return result
  25. tasks = [task1, task2, task3]
  26. results = await process_tasks_with_semaphore(tasks, process_one, max_concurrent=3)
  27. """
  28. semaphore = asyncio.Semaphore(max_concurrent)
  29. async def process_with_semaphore(task: Any, index: int):
  30. """包装处理函数,添加信号量控制"""
  31. async with semaphore:
  32. result = await process_func(task, index)
  33. if show_progress:
  34. print(f"[{index + 1}/{len(tasks)}] 任务完成")
  35. return result
  36. # 并发处理所有任务
  37. results = await asyncio.gather(
  38. *[process_with_semaphore(task, i) for i, task in enumerate(tasks)]
  39. )
  40. return results
  41. async def process_tasks_with_semaphore_retry(
  42. tasks: List[Any],
  43. process_func: Callable[[Any, int], Awaitable[Any]],
  44. max_concurrent: int = 3,
  45. max_retries: int = 3,
  46. show_progress: bool = True
  47. ) -> List[Any]:
  48. """使用信号量控制并发数量处理任务(支持重试)
  49. Args:
  50. tasks: 任务列表
  51. process_func: 处理单个任务的异步函数,签名为 async def func(task, index) -> result
  52. max_concurrent: 最大并发数
  53. max_retries: 最大重试次数
  54. show_progress: 是否显示进度信息
  55. Returns:
  56. 结果列表(保持原始顺序)
  57. """
  58. semaphore = asyncio.Semaphore(max_concurrent)
  59. async def process_with_semaphore_and_retry(task: Any, index: int):
  60. """包装处理函数,添加信号量控制和重试逻辑"""
  61. async with semaphore:
  62. for attempt in range(max_retries):
  63. try:
  64. result = await process_func(task, index)
  65. if show_progress:
  66. print(f"[{index + 1}/{len(tasks)}] 任务完成")
  67. return result
  68. except Exception as e:
  69. if attempt < max_retries - 1:
  70. if show_progress:
  71. print(f"[{index + 1}/{len(tasks)}] 重试 {attempt + 1}/{max_retries - 1}: {e}")
  72. await asyncio.sleep(1) # 重试前等待1秒
  73. else:
  74. if show_progress:
  75. print(f"[{index + 1}/{len(tasks)}] 失败(已重试 {max_retries} 次): {e}")
  76. raise
  77. # 并发处理所有任务
  78. results = await asyncio.gather(
  79. *[process_with_semaphore_and_retry(task, i) for i, task in enumerate(tasks)],
  80. return_exceptions=True # 返回异常而不是抛出
  81. )
  82. return results