yangxiaohui de2cd57022 feat: 添加语义相似度分析模块 1 týždeň pred
..
README_async_utils.md dd33bd5b57 add 3 týždňov pred
README_semantic_similarity.md de2cd57022 feat: 添加语义相似度分析模块 1 týždeň pred
async_utils.py dd33bd5b57 add 3 týždňov pred
batch_match_analyzer.py 6dbe0e5d13 feat: 添加批量匹配分析器和可视化工具v2 2 týždňov pred
client.py dd33bd5b57 add 3 týždňov pred
data_loader.py 929d439ca4 add 3 týždňov pred
match_analyzer.py 25ad27c8e5 feat: 优化Step4匹配逻辑和增强批处理功能 2 týždňov pred
my_trace.py dd33bd5b57 add 3 týždňov pred
relation_analyzer.py 6dbe0e5d13 feat: 添加批量匹配分析器和可视化工具v2 2 týždňov pred
semantic_similarity.py de2cd57022 feat: 添加语义相似度分析模块 1 týždeň pred
structured_logger.py dd33bd5b57 add 3 týždňov pred
utils.py dd33bd5b57 add 3 týždňov pred

README_async_utils.md

异步并发处理工具

文件说明

lib/async_utils.py - 提供通用的异步任务并发执行功能

功能列表

1. process_tasks_with_semaphore

基本的并发处理函数,使用信号量控制并发数量。

参数

  • tasks: 任务列表
  • process_func: 处理单个任务的异步函数,签名为 async def func(task, index) -> result
  • max_concurrent: 最大并发数(默认: 3)
  • show_progress: 是否显示进度信息(默认: True)

使用示例

from lib.async_utils import process_tasks_with_semaphore

# 定义处理单个任务的函数
async def process_one_task(task: dict, index: int) -> dict:
    # 你的处理逻辑
    result = await some_async_operation(task)
    return result

# 准备任务列表
tasks = [task1, task2, task3, ...]

# 并发处理所有任务
results = await process_tasks_with_semaphore(
    tasks,
    process_one_task,
    max_concurrent=3,
    show_progress=True
)

2. process_tasks_with_semaphore_retry

支持重试的并发处理函数,适用于不稳定的网络请求。

参数

  • tasks: 任务列表
  • process_func: 处理单个任务的异步函数
  • max_concurrent: 最大并发数(默认: 3)
  • max_retries: 最大重试次数(默认: 3)
  • show_progress: 是否显示进度信息(默认: True)

使用示例

from lib.async_utils import process_tasks_with_semaphore_retry

# 定义可能失败的异步任务
async def unstable_task(task: dict, index: int) -> dict:
    # 可能会抛出异常的操作
    result = await api_call(task)
    return result

# 并发处理,失败时自动重试
results = await process_tasks_with_semaphore_retry(
    tasks,
    unstable_task,
    max_concurrent=3,
    max_retries=3,
    show_progress=True
)

在 match_inspiration_to_persona.py 中的使用

# 1. 导入工具
from lib.async_utils import process_tasks_with_semaphore

# 2. 定义处理函数
async def process_match_task_with_error_handling(task: dict, index: int) -> dict:
    try:
        result = await match_single_task(task)
        return result
    except Exception as e:
        # 错误处理逻辑
        return error_result

# 3. 并发处理任务
results = await process_tasks_with_semaphore(
    test_tasks,
    process_match_task_with_error_handling,
    max_concurrent=3,
    show_progress=True
)

特点

  1. 通用性: 可用于任何需要并发处理的异步任务
  2. 并发控制: 使用信号量控制并发数量,避免资源耗尽
  3. 顺序保证: 返回结果与输入任务的顺序一致
  4. 进度显示: 可选的进度显示功能
  5. 重试支持: 第二个函数支持自动重试机制

适用场景

  • API 批量请求
  • 文件批量处理
  • 数据库批量操作
  • LLM 批量推理
  • 任何需要并发控制的异步操作