#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 灵感点特征匹配脚本 v2(批量匹配版本) 从解构任务列表中提取灵感点的特征,与人设灵感特征进行匹配, 使用 batch_match_analyzer 模块进行批量匹配分析,确保同一特征对所有人设特征的评分可比。 """ import json import asyncio from pathlib import Path from typing import Dict, List import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from agents import trace from agents.tracing.create import custom_span from lib.my_trace import set_trace from lib.batch_match_analyzer import analyze_batch_match # 全局并发限制 MAX_CONCURRENT_REQUESTS = 20 semaphore = None def get_semaphore(): """获取全局信号量""" global semaphore if semaphore is None: semaphore = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS) return semaphore async def match_feature_with_persona_batch( feature_name: str, persona_features: List[Dict], model_name: str = None ) -> List[Dict]: """ 将一个特征与人设特征列表进行批量匹配(一次调用) Args: feature_name: 要匹配的特征名称 persona_features: 人设特征列表 model_name: 使用的模型名称 Returns: 匹配结果列表(按分数降序排序) """ sem = get_semaphore() async with sem: print(f" 批量匹配: {feature_name} <-> {len(persona_features)}个人设特征") # 提取人设特征名称列表 persona_names = [pf["特征名称"] for pf in persona_features] # 批量分析匹配度 batch_results = await analyze_batch_match( phrase_a=feature_name, phrase_b_list=persona_names, model_name=model_name ) # 转换为原有格式 match_results = [ { "人设特征名称": result["特征"], "匹配结果": { "分数": result["分数"], "说明": result["说明"] } } for result in batch_results ] # 按分数降序排序 match_results.sort(key=lambda x: x["匹配结果"]["分数"], reverse=True) return match_results async def match_single_feature( feature_name: str, persona_features: List[Dict], model_name: str = None ) -> Dict: """ 匹配单个特征与所有人设特征 Args: feature_name: 特征名称 persona_features: 人设特征列表 model_name: 使用的模型名称 Returns: 特征匹配结果 """ print(f" 特征: {feature_name}") match_results = await match_feature_with_persona_batch( feature_name=feature_name, persona_features=persona_features, model_name=model_name ) return { "特征名称": feature_name, "匹配结果": match_results } async def process_single_inspiration_point( inspiration_point: Dict, persona_features: List[Dict], model_name: str = None ) -> Dict: """ 处理单个灵感点的特征匹配(并发执行) Args: inspiration_point: 灵感点数据 persona_features: 人设灵感特征列表 model_name: 使用的模型名称 Returns: 包含 how 步骤列表的灵感点数据 """ point_name = inspiration_point.get("名称", "") feature_list = inspiration_point.get("特征列表", []) print(f" 处理灵感点: {point_name}") print(f" 特征数量: {len(feature_list)}") # 使用 custom_span 标识灵感点处理 with custom_span( name=f"处理灵感点: {point_name}", data={ "灵感点": point_name, "特征数量": len(feature_list), "人设特征数量": len(persona_features) } ): # 并发匹配所有特征(每个特征批量匹配所有人设特征) tasks = [ match_single_feature(feature_name, persona_features, model_name) for feature_name in feature_list ] feature_match_results = await asyncio.gather(*tasks) # 构建 how 步骤 how_step = { "步骤名称": "灵感特征批量匹配人设特征", "特征列表": list(feature_match_results) } # 返回更新后的灵感点 result = inspiration_point.copy() result["how步骤列表"] = [how_step] return result async def process_single_task( task: Dict, task_index: int, total_tasks: int, persona_inspiration_features: List[Dict], model_name: str = None ) -> Dict: """ 处理单个任务 Args: task: 任务数据 task_index: 任务索引(从1开始) total_tasks: 总任务数 persona_inspiration_features: 人设灵感特征列表 model_name: 使用的模型名称 Returns: 包含 how 解构结果的任务 """ post_id = task.get("帖子id", "") print(f"\n处理任务 [{task_index}/{total_tasks}]: {post_id}") # 获取灵感点列表 what_result = task.get("what解构结果", {}) inspiration_list = what_result.get("灵感点列表", []) print(f" 灵感点数量: {len(inspiration_list)}") # 并发处理所有灵感点 tasks = [ process_single_inspiration_point( inspiration_point=inspiration_point, persona_features=persona_inspiration_features, model_name=model_name ) for inspiration_point in inspiration_list ] updated_inspiration_list = await asyncio.gather(*tasks) # 构建 how 解构结果 how_result = { "灵感点列表": list(updated_inspiration_list) } # 更新任务 updated_task = task.copy() updated_task["how解构结果"] = how_result return updated_task async def process_task_list( task_list: List[Dict], persona_features_dict: Dict, model_name: str = None, current_time: str = None, log_url: str = None ) -> List[Dict]: """ 处理整个解构任务列表(并发执行) Args: task_list: 解构任务列表 persona_features_dict: 人设特征字典(包含灵感点、目的点、关键点) model_name: 使用的模型名称 current_time: 当前时间戳 log_url: 日志链接 Returns: 包含 how 解构结果的任务列表 """ persona_inspiration_features = persona_features_dict.get("灵感点", []) print(f"人设灵感特征数量: {len(persona_inspiration_features)}") # 使用 custom_span 标识整个处理流程 with custom_span( name="批量匹配分析 v2 - 所有任务", data={ "任务总数": len(task_list), "人设特征数量": len(persona_inspiration_features), "current_time": current_time, "log_url": log_url } ): # 并发处理所有任务 tasks = [ process_single_task( task=task, task_index=i, total_tasks=len(task_list), persona_inspiration_features=persona_inspiration_features, model_name=model_name ) for i, task in enumerate(task_list, 1) ] updated_task_list = await asyncio.gather(*tasks) return list(updated_task_list) async def main(current_time: str = None, log_url: str = None): """主函数 Args: current_time: 当前时间戳(从外部传入) log_url: 日志链接(从外部传入) """ # 输入输出路径 script_dir = Path(__file__).parent project_root = script_dir.parent.parent data_dir = project_root / "data" / "data_1118" task_list_file = data_dir / "当前帖子_解构任务列表.json" persona_features_file = data_dir / "特征名称_帖子来源.json" output_dir = data_dir / "当前帖子_how解构结果_v2" # 创建输出目录 output_dir.mkdir(parents=True, exist_ok=True) # 获取模型名称 from lib.client import MODEL_NAME model_name_short = MODEL_NAME.replace("google/", "").replace("/", "_") print(f"读取解构任务列表: {task_list_file}") with open(task_list_file, "r", encoding="utf-8") as f: task_list_data = json.load(f) print(f"读取人设特征: {persona_features_file}") with open(persona_features_file, "r", encoding="utf-8") as f: persona_features_data = json.load(f) # 获取任务列表 task_list = task_list_data.get("解构任务列表", []) print(f"\n总任务数: {len(task_list)}") print(f"使用模型: {MODEL_NAME}\n") # 处理任务列表 updated_task_list = await process_task_list( task_list=task_list, persona_features_dict=persona_features_data, model_name=None, # 使用默认模型 current_time=current_time, log_url=log_url ) # 分文件保存结果 print(f"\n保存结果到: {output_dir}") for task in updated_task_list: post_id = task.get("帖子id", "unknown") output_file = output_dir / f"{post_id}_how_v2_{model_name_short}.json" # 在每个任务中添加元数据 task["元数据"] = { "current_time": current_time, "log_url": log_url, "version": "v2_batch", "model": MODEL_NAME } print(f" 保存: {output_file.name}") with open(output_file, "w", encoding="utf-8") as f: json.dump(task, f, ensure_ascii=False, indent=4) print("\n完成!") # 打印统计信息 total_inspiration_points = sum( len(task["how解构结果"]["灵感点列表"]) for task in updated_task_list ) total_features = sum( len(point["特征列表"]) for task in updated_task_list for point in task["how解构结果"]["灵感点列表"] ) print(f"\n统计:") print(f" 处理的帖子数: {len(updated_task_list)}") print(f" 处理的灵感点数: {total_inspiration_points}") print(f" 处理的特征数: {total_features}") if log_url: print(f"\nTrace: {log_url}\n") if __name__ == "__main__": # 设置 trace current_time, log_url = set_trace() # 使用 trace 上下文包裹整个执行流程 with trace("灵感特征批量匹配 v2"): asyncio.run(main(current_time, log_url))