|
|
há 2 semanas atrás | |
|---|---|---|
| .. | ||
| README_async_utils.md | há 3 semanas atrás | |
| async_utils.py | há 3 semanas atrás | |
| client.py | há 3 semanas atrás | |
| data_loader.py | há 3 semanas atrás | |
| match_analyzer.py | há 2 semanas atrás | |
| my_trace.py | há 3 semanas atrás | |
| relation_analyzer.py | há 2 semanas atrás | |
| structured_logger.py | há 3 semanas atrás | |
| utils.py | há 3 semanas atrás | |
lib/async_utils.py - 提供通用的异步任务并发执行功能
process_tasks_with_semaphore基本的并发处理函数,使用信号量控制并发数量。
tasks: 任务列表process_func: 处理单个任务的异步函数,签名为 async def func(task, index) -> resultmax_concurrent: 最大并发数(默认: 3)show_progress: 是否显示进度信息(默认: True)from lib.async_utils import process_tasks_with_semaphore
# 定义处理单个任务的函数
async def process_one_task(task: dict, index: int) -> dict:
# 你的处理逻辑
result = await some_async_operation(task)
return result
# 准备任务列表
tasks = [task1, task2, task3, ...]
# 并发处理所有任务
results = await process_tasks_with_semaphore(
tasks,
process_one_task,
max_concurrent=3,
show_progress=True
)
process_tasks_with_semaphore_retry支持重试的并发处理函数,适用于不稳定的网络请求。
tasks: 任务列表process_func: 处理单个任务的异步函数max_concurrent: 最大并发数(默认: 3)max_retries: 最大重试次数(默认: 3)show_progress: 是否显示进度信息(默认: True)from lib.async_utils import process_tasks_with_semaphore_retry
# 定义可能失败的异步任务
async def unstable_task(task: dict, index: int) -> dict:
# 可能会抛出异常的操作
result = await api_call(task)
return result
# 并发处理,失败时自动重试
results = await process_tasks_with_semaphore_retry(
tasks,
unstable_task,
max_concurrent=3,
max_retries=3,
show_progress=True
)
# 1. 导入工具
from lib.async_utils import process_tasks_with_semaphore
# 2. 定义处理函数
async def process_match_task_with_error_handling(task: dict, index: int) -> dict:
try:
result = await match_single_task(task)
return result
except Exception as e:
# 错误处理逻辑
return error_result
# 3. 并发处理任务
results = await process_tasks_with_semaphore(
test_tasks,
process_match_task_with_error_handling,
max_concurrent=3,
show_progress=True
)