""" 从灵感点文件夹直接读取 step1 结果,排序后批量执行 Step3 直接扫描 how/灵感点/ 目录下的所有灵感,读取其 step1 结果文件, 根据 score 排序并筛选,然后执行 step3 生成新灵感点 """ import os import sys import json import asyncio import argparse from pathlib import Path from typing import List, Tuple from agents import trace from lib.my_trace import set_trace_smith as set_trace from lib.data_loader import load_persona_data import step3_generate_inspirations def scan_inspirations_from_folder( inspirations_dir: str, model_name: str = "google/gemini-2.5-pro" ) -> List[Tuple[str, dict]]: """扫描灵感点文件夹,读取所有 step1 结果 Args: inspirations_dir: 灵感点目录路径 (如 data/.../how/灵感点) model_name: 模型名称 Returns: [(灵感名称, step1数据), ...] 列表 """ results = [] model_name_short = model_name.replace("google/", "").replace("/", "_") # 遍历所有灵感子目录 for inspiration_dir in Path(inspirations_dir).iterdir(): if not inspiration_dir.is_dir(): continue inspiration_name = inspiration_dir.name # 查找 step1 文件 step1_pattern = f"*_step1_*_{model_name_short}.json" step1_files = list(inspiration_dir.glob(step1_pattern)) if not step1_files: print(f"⚠️ 跳过 {inspiration_name}: 未找到 step1 文件") continue # 读取 step1 文件 step1_file = step1_files[0] try: with open(step1_file, 'r', encoding='utf-8') as f: step1_data = json.load(f) results.append((inspiration_name, step1_data)) except Exception as e: print(f"⚠️ 读取失败 {inspiration_name}: {e}") continue return results def filter_and_sort_inspirations( inspirations_data: List[Tuple[str, dict]], min_score: float = 0.5, max_score: float = 0.8 ) -> List[Tuple[str, float, dict]]: """筛选并排序灵感 Args: inspirations_data: [(灵感名称, step1数据), ...] min_score: 最小 score max_score: 最大 score Returns: [(灵感名称, score, step1数据), ...] 按 score 降序排列 """ filtered = [] for inspiration_name, step1_data in inspirations_data: # 获取匹配结果列表 match_results = step1_data.get("匹配结果列表", []) if not match_results: continue # 取 top1 的 score top1_result = match_results[0] match_result = top1_result.get("匹配结果", {}) score = match_result.get("score", 0) # 筛选 if min_score <= score <= max_score: filtered.append((inspiration_name, score, step1_data)) # 按 score 降序排序 filtered.sort(key=lambda x: x[1], reverse=True) return filtered async def run_step3_for_inspiration( persona_dir: str, inspiration: str, step1_data: dict, persona_data: dict, force: bool = False ) -> dict: """为单个灵感执行 step3 Args: persona_dir: 人设目录 inspiration: 灵感名称 step1_data: step1 完整数据 persona_data: 人设数据 force: 是否强制重新执行 Returns: 执行结果字典 """ print(f"\n{'=' * 80}") print(f"处理灵感: {inspiration}") print(f"{'=' * 80}\n") # 获取 step1 结果 step1_results = step1_data.get("匹配结果列表", []) if not step1_results: print("❌ step1 结果为空") return { "灵感": inspiration, "status": "step1_empty", "output_file": None } # 获取 top1 step1_top1 = step1_results[0] # 构建输出文件路径 model_name = "google/gemini-2.5-pro" output_dir = os.path.join(persona_dir, "how", "灵感点", inspiration) model_name_short = model_name.replace("google/", "").replace("/", "_") # 根据 step1 文件名确定前缀 step1_file_name = f"all_step1_灵感人设匹配_{model_name_short}.json" scope_prefix = "all" output_filename = f"{scope_prefix}_step3_top1_生成灵感_{model_name_short}.json" output_file = os.path.join(output_dir, output_filename) # 检查文件是否已存在 if not force and os.path.exists(output_file): print(f"✓ 输出文件已存在,跳过: {output_file}") return { "灵感": inspiration, "status": "skipped", "output_file": output_file } # 创建独立的 trace current_time, log_url = set_trace() try: with trace(f"Step3: {inspiration}"): # 执行 step3 output = await step3_generate_inspirations.process_step3_generate_inspirations( step1_top1=step1_top1, persona_data=persona_data, current_time=current_time, log_url=log_url ) # 添加元数据 output["元数据"]["step1_匹配索引"] = 1 # 保存结果 os.makedirs(output_dir, exist_ok=True) with open(output_file, 'w', encoding='utf-8') as f: json.dump(output, f, ensure_ascii=False, indent=2) # 输出预览 inspirations = output.get("灵感点列表", []) print(f"✓ 生成了 {len(inspirations)} 个灵感点") if log_url: print(f" Trace: {log_url}") return { "灵感": inspiration, "status": "success", "output_file": output_file, "生成数量": len(inspirations) } except Exception as e: print(f"❌ 执行失败: {e}") return { "灵感": inspiration, "status": "error", "output_file": None, "error": str(e) } async def main(): """主函数""" parser = argparse.ArgumentParser( description="从灵感点文件夹直接读取 step1,排序后批量执行 Step3", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: # 测试:只处理第1个符合条件的灵感 python run_step3_from_folder.py --count 1 # 使用默认参数(step1 score 在 [0.5, 0.8] 区间) python run_step3_from_folder.py # 指定 score 范围 python run_step3_from_folder.py --min-score 0.6 --max-score 0.9 # 强制重新执行,处理前3个 python run_step3_from_folder.py --force --count 3 # 指定人设目录 python run_step3_from_folder.py --dir data/阿里多多酱/out/人设_1110 """ ) parser.add_argument( "--dir", default="data/阿里多多酱/out/人设_1110", help="人设目录路径 (默认: data/阿里多多酱/out/人设_1110)" ) parser.add_argument( "--min-score", type=float, default=0.5, help="step1 score 最小值(含)(默认: 0.5)" ) parser.add_argument( "--max-score", type=float, default=0.8, help="step1 score 最大值(含)(默认: 0.8)" ) parser.add_argument( "--force", action="store_true", help="强制重新执行,覆盖已存在的文件" ) parser.add_argument( "--count", type=int, default=None, help="处理的灵感数量限制(默认: 无限制)" ) args = parser.parse_args() persona_dir = args.dir min_score = args.min_score max_score = args.max_score force = args.force count_limit = args.count # 确定灵感点目录 inspirations_dir = os.path.join(persona_dir, "how", "灵感点") print(f"{'=' * 80}") print(f"从灵感点文件夹批量执行 Step3") print(f"{'=' * 80}") print(f"人设目录: {persona_dir}") print(f"灵感点目录: {inspirations_dir}") print(f"Score 范围: [{min_score}, {max_score}]") if count_limit: print(f"数量限制: 处理前 {count_limit} 个") if force: print(f"强制模式: 重新执行所有步骤") print() # 检查目录是否存在 if not os.path.exists(inspirations_dir): print(f"❌ 灵感点目录不存在: {inspirations_dir}") sys.exit(1) # 扫描灵感点文件夹 print("正在扫描灵感点文件夹...") inspirations_data = scan_inspirations_from_folder(inspirations_dir) print(f"找到 {len(inspirations_data)} 个灵感(有 step1 结果)\n") if not inspirations_data: print("❌ 没有找到任何有效的 step1 结果") sys.exit(0) # 筛选并排序 print("正在筛选和排序...") filtered_sorted = filter_and_sort_inspirations( inspirations_data, min_score, max_score ) if not filtered_sorted: print(f"❌ 没有找到符合条件的灵感(step1 score 在 [{min_score}, {max_score}] 范围内)") sys.exit(0) # 应用数量限制 if count_limit and count_limit < len(filtered_sorted): filtered_sorted = filtered_sorted[:count_limit] print(f"找到 {len(filtered_sorted)} 个符合条件的灵感(已应用数量限制):\n") else: print(f"找到 {len(filtered_sorted)} 个符合条件的灵感:\n") # 打印列表 for i, (insp_name, score, _) in enumerate(filtered_sorted, 1): print(f" {i}. {insp_name} (score: {score:.2f})") print() # 加载人设数据(只需要加载一次) persona_data = load_persona_data(persona_dir) # 批量执行 step3 results = [] for i, (inspiration_name, score, step1_data) in enumerate(filtered_sorted, 1): print(f"\n{'#' * 80}") print(f"处理第 {i}/{len(filtered_sorted)} 个 (score: {score:.2f})") print(f"{'#' * 80}") result = await run_step3_for_inspiration( persona_dir=persona_dir, inspiration=inspiration_name, step1_data=step1_data, persona_data=persona_data, force=force ) results.append(result) # 输出最终汇总 print(f"\n{'=' * 80}") print(f"批量处理完成") print(f"{'=' * 80}\n") success_count = sum(1 for r in results if r["status"] == "success") skipped_count = sum(1 for r in results if r["status"] == "skipped") error_count = sum(1 for r in results if r["status"] == "error") print(f"统计:") print(f" 总数: {len(results)}") print(f" 成功: {success_count}") print(f" 跳过: {skipped_count}") print(f" 失败: {error_count}") print(f"\n详细结果:") for i, result in enumerate(results, 1): status_icon = { "success": "✓", "skipped": "○", "error": "✗", "step1_empty": "⚠" }.get(result["status"], "?") status_text = { "success": f"成功,生成 {result.get('生成数量', 0)} 个", "skipped": "已存在", "error": f"失败: {result.get('error', '')}", "step1_empty": "step1 结果为空" }.get(result["status"], result["status"]) print(f" {status_icon} [{i}] {result['灵感']} - {status_text}") if __name__ == "__main__": asyncio.run(main())