| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- """
- 异步并发处理工具模块
- 提供通用的异步任务并发执行功能
- """
- import asyncio
- from typing import List, Callable, Any, Awaitable
- async def process_tasks_with_semaphore(
- tasks: List[Any],
- process_func: Callable[[Any, int], Awaitable[Any]],
- max_concurrent: int = 3,
- show_progress: bool = True
- ) -> List[Any]:
- """使用信号量控制并发数量处理任务
- Args:
- tasks: 任务列表
- process_func: 处理单个任务的异步函数,签名为 async def func(task, index) -> result
- max_concurrent: 最大并发数
- show_progress: 是否显示进度信息
- Returns:
- 结果列表(保持原始顺序)
- Example:
- async def process_one(task, index):
- result = await some_async_operation(task)
- return result
- tasks = [task1, task2, task3]
- results = await process_tasks_with_semaphore(tasks, process_one, max_concurrent=3)
- """
- semaphore = asyncio.Semaphore(max_concurrent)
- async def process_with_semaphore(task: Any, index: int):
- """包装处理函数,添加信号量控制"""
- async with semaphore:
- result = await process_func(task, index)
- if show_progress:
- print(f"[{index + 1}/{len(tasks)}] 任务完成")
- return result
- # 并发处理所有任务
- results = await asyncio.gather(
- *[process_with_semaphore(task, i) for i, task in enumerate(tasks)]
- )
- return results
- async def process_tasks_with_semaphore_retry(
- tasks: List[Any],
- process_func: Callable[[Any, int], Awaitable[Any]],
- max_concurrent: int = 3,
- max_retries: int = 3,
- show_progress: bool = True
- ) -> List[Any]:
- """使用信号量控制并发数量处理任务(支持重试)
- Args:
- tasks: 任务列表
- process_func: 处理单个任务的异步函数,签名为 async def func(task, index) -> result
- max_concurrent: 最大并发数
- max_retries: 最大重试次数
- show_progress: 是否显示进度信息
- Returns:
- 结果列表(保持原始顺序)
- """
- semaphore = asyncio.Semaphore(max_concurrent)
- async def process_with_semaphore_and_retry(task: Any, index: int):
- """包装处理函数,添加信号量控制和重试逻辑"""
- async with semaphore:
- for attempt in range(max_retries):
- try:
- result = await process_func(task, index)
- if show_progress:
- print(f"[{index + 1}/{len(tasks)}] 任务完成")
- return result
- except Exception as e:
- if attempt < max_retries - 1:
- if show_progress:
- print(f"[{index + 1}/{len(tasks)}] 重试 {attempt + 1}/{max_retries - 1}: {e}")
- await asyncio.sleep(1) # 重试前等待1秒
- else:
- if show_progress:
- print(f"[{index + 1}/{len(tasks)}] 失败(已重试 {max_retries} 次): {e}")
- raise
- # 并发处理所有任务
- results = await asyncio.gather(
- *[process_with_semaphore_and_retry(task, i) for i, task in enumerate(tasks)],
- return_exceptions=True # 返回异常而不是抛出
- )
- return results
|