""" 基于灵感匹配分析结果,批量执行 Step3 生成新灵感 从灵感匹配分析.json中筛选符合条件的灵感(step1 score 在指定范围内), 然后对每个灵感的 top1 匹配结果执行 step3,生成新的灵感点 """ import os import sys import json import asyncio import argparse from agents import trace from lib.my_trace import set_trace_smith as set_trace from lib.data_loader import load_persona_data import step3_generate_inspirations def filter_inspirations_by_score( analysis_file: str, min_score: float = 0.5, max_score: float = 0.8 ) -> list: """从分析文件中筛选符合条件的灵感 Args: analysis_file: 灵感匹配分析.json 文件路径 min_score: step1 score 最小值(含) max_score: step1 score 最大值(含) Returns: 筛选后的灵感列表 """ with open(analysis_file, 'r', encoding='utf-8') as f: analysis_data = json.load(f) results = analysis_data.get("排序结果", []) filtered = [] for item in results: step1_score = item["step1"]["匹配结果"].get("score", 0) if min_score <= step1_score <= max_score: filtered.append(item["灵感"]) return filtered async def run_step3_for_inspiration( persona_dir: str, inspiration: str, persona_data: dict, force: bool = False ) -> dict: """为单个灵感执行 step3 Args: persona_dir: 人设目录 inspiration: 灵感名称 persona_data: 人设数据 force: 是否强制重新执行 Returns: 执行结果字典 """ print(f"\n{'=' * 80}") print(f"处理灵感: {inspiration}") print(f"{'=' * 80}\n") # 查找 step1 结果文件 model_name = "google/gemini-2.5-pro" step1_file = step3_generate_inspirations.find_step1_file( persona_dir, inspiration, model_name ) # 读取 step1 结果 with open(step1_file, 'r', encoding='utf-8') as f: step1_data = json.load(f) step1_results = step1_data.get("匹配结果列表", []) if not step1_results: print("❌ step1 结果为空") return { "灵感": inspiration, "status": "step1_empty", "output_file": None } # 获取 top1 step1_top1 = step1_results[0] # 构建输出文件路径 output_dir = os.path.join(persona_dir, "how", "灵感点", inspiration) model_name_short = model_name.replace("google/", "").replace("/", "_") step1_filename = os.path.basename(step1_file) step1_basename = os.path.splitext(step1_filename)[0] scope_prefix = step1_basename.split("_")[0] output_filename = f"{scope_prefix}_step3_top1_生成灵感_{model_name_short}.json" output_file = os.path.join(output_dir, output_filename) # 检查文件是否已存在 if not force and os.path.exists(output_file): print(f"✓ 输出文件已存在,跳过: {output_file}") return { "灵感": inspiration, "status": "skipped", "output_file": output_file } # 创建独立的 trace current_time, log_url = set_trace() try: with trace(f"Step3: {inspiration}"): # 执行 step3 output = await step3_generate_inspirations.process_step3_generate_inspirations( step1_top1=step1_top1, persona_data=persona_data, current_time=current_time, log_url=log_url ) # 添加元数据 output["元数据"]["step1_匹配索引"] = 1 # 保存结果 os.makedirs(output_dir, exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: json.dump(output, f, ensure_ascii=False, indent=2) # 输出预览 inspirations = output.get("灵感点列表", []) print(f"✓ 生成了 {len(inspirations)} 个灵感点") if log_url: print(f" Trace: {log_url}") return { "灵感": inspiration, "status": "success", "output_file": output_file, "生成数量": len(inspirations) } except Exception as e: print(f"❌ 执行失败: {e}") return { "灵感": inspiration, "status": "error", "output_file": None, "error": str(e) } async def main(): """主函数""" parser = argparse.ArgumentParser( description="基于灵感匹配分析结果,批量执行 Step3", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: # 测试:只处理第1个符合条件的灵感 python run_step3_from_analysis.py --count 1 # 使用默认参数(step1 score 在 [0.5, 0.8] 区间) python run_step3_from_analysis.py # 指定 score 范围 python run_step3_from_analysis.py --min-score 0.6 --max-score 0.9 # 强制重新执行,处理前3个 python run_step3_from_analysis.py --force --count 3 # 指定人设目录 python run_step3_from_analysis.py --dir data/阿里多多酱/out/人设_1110 """ ) parser.add_argument( "--dir", default="data/阿里多多酱/out/人设_1110", help="人设目录路径 (默认: data/阿里多多酱/out/人设_1110)" ) parser.add_argument( "--analysis-file", default=None, help="灵感匹配分析文件路径 (默认: {dir}/how/灵感匹配分析.json)" ) parser.add_argument( "--min-score", type=float, default=0.5, help="step1 score 最小值(含)(默认: 0.5)" ) parser.add_argument( "--max-score", type=float, default=0.8, help="step1 score 最大值(含)(默认: 0.8)" ) parser.add_argument( "--force", action="store_true", help="强制重新执行,覆盖已存在的文件" ) parser.add_argument( "--count", type=int, default=1, help="处理的灵感数量限制(默认: 1)" ) args = parser.parse_args() persona_dir = args.dir min_score = args.min_score max_score = args.max_score force = args.force count_limit = args.count # 确定分析文件路径 if args.analysis_file: analysis_file = args.analysis_file else: analysis_file = os.path.join(persona_dir, "how", "灵感匹配分析.json") print(f"{'=' * 80}") print(f"基于灵感匹配分析,批量执行 Step3") print(f"{'=' * 80}") print(f"人设目录: {persona_dir}") print(f"分析文件: {analysis_file}") print(f"Score 范围: [{min_score}, {max_score}]") if count_limit: print(f"数量限制: 处理前 {count_limit} 个") if force: print(f"强制模式: 重新执行所有步骤") print() # 检查分析文件是否存在 if not os.path.exists(analysis_file): print(f"❌ 分析文件不存在: {analysis_file}") print(f"请先运行 analyze_inspiration_results.py 生成分析文件") sys.exit(1) # 筛选灵感 filtered_inspirations = filter_inspirations_by_score( analysis_file, min_score, max_score ) if not filtered_inspirations: print(f"❌ 没有找到符合条件的灵感(step1 score 在 [{min_score}, {max_score}] 范围内)") sys.exit(0) # 应用数量限制 if count_limit and count_limit < len(filtered_inspirations): filtered_inspirations = filtered_inspirations[:count_limit] print(f"找到 {len(filtered_inspirations)} 个符合条件的灵感(已应用数量限制):\n") else: print(f"找到 {len(filtered_inspirations)} 个符合条件的灵感:\n") for i, insp in enumerate(filtered_inspirations, 1): print(f" {i}. {insp}") print() # 加载人设数据(只需要加载一次) persona_data = load_persona_data(persona_dir) # 批量执行 step3 results = [] for i, inspiration in enumerate(filtered_inspirations, 1): print(f"\n{'#' * 80}") print(f"处理第 {i}/{len(filtered_inspirations)} 个") print(f"{'#' * 80}") result = await run_step3_for_inspiration( persona_dir=persona_dir, inspiration=inspiration, persona_data=persona_data, force=force ) results.append(result) # 输出最终汇总 print(f"\n{'=' * 80}") print(f"批量处理完成") print(f"{'=' * 80}\n") success_count = sum(1 for r in results if r["status"] == "success") skipped_count = sum(1 for r in results if r["status"] == "skipped") error_count = sum(1 for r in results if r["status"] == "error") print(f"统计:") print(f" 总数: {len(results)}") print(f" 成功: {success_count}") print(f" 跳过: {skipped_count}") print(f" 失败: {error_count}") print(f"\n详细结果:") for i, result in enumerate(results, 1): status_icon = { "success": "✓", "skipped": "○", "error": "✗", "step1_empty": "⚠" }.get(result["status"], "?") status_text = { "success": f"成功,生成 {result.get('生成数量', 0)} 个", "skipped": "已存在", "error": f"失败: {result.get('error', '')}", "step1_empty": "step1 结果为空" }.get(result["status"], result["status"]) print(f" {status_icon} [{i}] {result['灵感']} - {status_text}") if __name__ == "__main__": asyncio.run(main())