# 异步并发处理工具 ## 文件说明 `lib/async_utils.py` - 提供通用的异步任务并发执行功能 ## 功能列表 ### 1. `process_tasks_with_semaphore` 基本的并发处理函数,使用信号量控制并发数量。 #### 参数 - `tasks`: 任务列表 - `process_func`: 处理单个任务的异步函数,签名为 `async def func(task, index) -> result` - `max_concurrent`: 最大并发数(默认: 3) - `show_progress`: 是否显示进度信息(默认: True) #### 使用示例 ```python from lib.async_utils import process_tasks_with_semaphore # 定义处理单个任务的函数 async def process_one_task(task: dict, index: int) -> dict: # 你的处理逻辑 result = await some_async_operation(task) return result # 准备任务列表 tasks = [task1, task2, task3, ...] # 并发处理所有任务 results = await process_tasks_with_semaphore( tasks, process_one_task, max_concurrent=3, show_progress=True ) ``` ### 2. `process_tasks_with_semaphore_retry` 支持重试的并发处理函数,适用于不稳定的网络请求。 #### 参数 - `tasks`: 任务列表 - `process_func`: 处理单个任务的异步函数 - `max_concurrent`: 最大并发数(默认: 3) - `max_retries`: 最大重试次数(默认: 3) - `show_progress`: 是否显示进度信息(默认: True) #### 使用示例 ```python from lib.async_utils import process_tasks_with_semaphore_retry # 定义可能失败的异步任务 async def unstable_task(task: dict, index: int) -> dict: # 可能会抛出异常的操作 result = await api_call(task) return result # 并发处理,失败时自动重试 results = await process_tasks_with_semaphore_retry( tasks, unstable_task, max_concurrent=3, max_retries=3, show_progress=True ) ``` ## 在 match_inspiration_to_persona.py 中的使用 ```python # 1. 导入工具 from lib.async_utils import process_tasks_with_semaphore # 2. 定义处理函数 async def process_match_task_with_error_handling(task: dict, index: int) -> dict: try: result = await match_single_task(task) return result except Exception as e: # 错误处理逻辑 return error_result # 3. 并发处理任务 results = await process_tasks_with_semaphore( test_tasks, process_match_task_with_error_handling, max_concurrent=3, show_progress=True ) ``` ## 特点 1. **通用性**: 可用于任何需要并发处理的异步任务 2. **并发控制**: 使用信号量控制并发数量,避免资源耗尽 3. **顺序保证**: 返回结果与输入任务的顺序一致 4. **进度显示**: 可选的进度显示功能 5. **重试支持**: 第二个函数支持自动重试机制 ## 适用场景 - API 批量请求 - 文件批量处理 - 数据库批量操作 - LLM 批量推理 - 任何需要并发控制的异步操作