""" 异步并发处理工具模块 提供通用的异步任务并发执行功能 """ import asyncio from typing import List, Callable, Any, Awaitable async def process_tasks_with_semaphore( tasks: List[Any], process_func: Callable[[Any, int], Awaitable[Any]], max_concurrent: int = 3, show_progress: bool = True ) -> List[Any]: """使用信号量控制并发数量处理任务 Args: tasks: 任务列表 process_func: 处理单个任务的异步函数,签名为 async def func(task, index) -> result max_concurrent: 最大并发数 show_progress: 是否显示进度信息 Returns: 结果列表(保持原始顺序) Example: async def process_one(task, index): result = await some_async_operation(task) return result tasks = [task1, task2, task3] results = await process_tasks_with_semaphore(tasks, process_one, max_concurrent=3) """ semaphore = asyncio.Semaphore(max_concurrent) async def process_with_semaphore(task: Any, index: int): """包装处理函数,添加信号量控制""" async with semaphore: result = await process_func(task, index) if show_progress: print(f"[{index + 1}/{len(tasks)}] 任务完成") return result # 并发处理所有任务 results = await asyncio.gather( *[process_with_semaphore(task, i) for i, task in enumerate(tasks)] ) return results async def process_tasks_with_semaphore_retry( tasks: List[Any], process_func: Callable[[Any, int], Awaitable[Any]], max_concurrent: int = 3, max_retries: int = 3, show_progress: bool = True ) -> List[Any]: """使用信号量控制并发数量处理任务(支持重试) Args: tasks: 任务列表 process_func: 处理单个任务的异步函数,签名为 async def func(task, index) -> result max_concurrent: 最大并发数 max_retries: 最大重试次数 show_progress: 是否显示进度信息 Returns: 结果列表(保持原始顺序) """ semaphore = asyncio.Semaphore(max_concurrent) async def process_with_semaphore_and_retry(task: Any, index: int): """包装处理函数,添加信号量控制和重试逻辑""" async with semaphore: for attempt in range(max_retries): try: result = await process_func(task, index) if show_progress: print(f"[{index + 1}/{len(tasks)}] 任务完成") return result except Exception as e: if attempt < max_retries - 1: if show_progress: print(f"[{index + 1}/{len(tasks)}] 重试 {attempt + 1}/{max_retries - 1}: {e}") await asyncio.sleep(1) # 重试前等待1秒 else: if show_progress: print(f"[{index + 1}/{len(tasks)}] 失败(已重试 {max_retries} 次): {e}") raise # 并发处理所有任务 results = await asyncio.gather( *[process_with_semaphore_and_retry(task, i) for i, task in enumerate(tasks)], return_exceptions=True # 返回异常而不是抛出 ) return results