""" 主流程脚本:串联 Step1 和 Step2 执行完整的灵感分析流程: 1. Step1: 灵感与人设匹配(调用 step1 main,自动保存结果) 2. Step2: 增量词在人设中的匹配(调用 step2 main,自动保存结果) 3. 生成流程汇总文件 """ import os import sys import json import asyncio import random import argparse from agents import trace from lib.my_trace import set_trace_smith as set_trace from lib.data_loader import load_inspiration_list, select_inspiration from lib.utils import read_json # 导入 step1 和 step2 的 main 函数 import step1_inspiration_match import step2_incremental_match def find_step1_output(persona_dir: str, inspiration: str, max_tasks: int = None) -> str: """查找 step1 输出文件 Args: persona_dir: 人设目录 inspiration: 灵感点名称 max_tasks: 任务数限制 Returns: step1 文件路径 """ from pathlib import Path step1_dir = os.path.join(persona_dir, "how", "灵感点", inspiration) scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all" step1_pattern = f"{scope_prefix}_step1_*.json" step1_files = list(Path(step1_dir).glob(step1_pattern)) if not step1_files: raise FileNotFoundError(f"找不到 step1 输出文件: {step1_dir}/{step1_pattern}") return str(step1_files[0]) def find_step2_output(persona_dir: str, inspiration: str, max_tasks: int = None) -> str: """查找 step2 输出文件 Args: persona_dir: 人设目录 inspiration: 灵感点名称 max_tasks: 任务数限制 Returns: step2 文件路径 """ from pathlib import Path step2_dir = os.path.join(persona_dir, "how", "灵感点", inspiration) scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all" step2_pattern = f"{scope_prefix}_step2_*.json" step2_files = list(Path(step2_dir).glob(step2_pattern)) if not step2_files: raise FileNotFoundError(f"找不到 step2 输出文件: {step2_dir}/{step2_pattern}") return str(step2_files[0]) async def run_full_analysis( persona_dir: str, inspiration: str, max_tasks: int = None, force: bool = False, current_time: str = None, log_url: str = None ) -> dict: """执行完整的灵感分析流程(Step1 + Step2) Args: persona_dir: 人设目录路径 inspiration: 灵感点文本 max_tasks: step1 最大任务数(None 表示不限制) force: 是否强制重新执行(跳过文件存在检查) current_time: 当前时间戳 log_url: 日志链接 Returns: 包含文件路径和状态的字典 """ print(f"\n{'=' * 80}") print(f"开始完整分析流程: {inspiration}") print(f"{'=' * 80}\n") # ========== Step1: 灵感与人设匹配 ========== print(f"{'─' * 80}") print(f"Step1: 灵感与人设匹配") print(f"{'─' * 80}\n") # 临时修改 sys.argv 来传递参数给 step1 original_argv = sys.argv.copy() sys.argv = [ "step1_inspiration_match.py", persona_dir, inspiration, str(max_tasks) if max_tasks is not None else "all" ] try: # 调用 step1 的 main 函数(通过参数传递 force) await step1_inspiration_match.main(current_time, log_url, force=force) finally: # 恢复原始参数 sys.argv = original_argv # 查找 step1 输出文件 step1_file = find_step1_output(persona_dir, inspiration, max_tasks) print(f"✓ Step1 完成,结果文件: {step1_file}\n") # 读取 step1 结果 step1_data = read_json(step1_file) step1_results = step1_data.get("匹配结果列表", []) if not step1_results: print("⚠️ Step1 结果为空,终止流程") return { "step1_file": step1_file, "step2_file": None, "summary_file": None, "status": "step1_empty" } step1_top1 = step1_results[0] step1_score = step1_top1.get('匹配结果', {}).get('score', 0) step1_element = step1_top1.get("业务信息", {}).get("匹配要素", "") print(f"Top1 匹配要素: {step1_element}, score: {step1_score:.2f}") # ========== Step2: 增量词匹配 ========== print(f"\n{'─' * 80}") print(f"Step2: 增量词在人设中的匹配") print(f"{'─' * 80}\n") # 临时修改 sys.argv 来传递参数给 step2 sys.argv = [ "step2_incremental_match.py", persona_dir, inspiration ] try: # 调用 step2 的 main 函数(通过参数传递 force) await step2_incremental_match.main(current_time, log_url, force=force) finally: # 恢复原始参数 sys.argv = original_argv # 查找 step2 输出文件 step2_file = find_step2_output(persona_dir, inspiration, max_tasks) print(f"✓ Step2 完成,结果文件: {step2_file}\n") # 读取 step2 结果 step2_data = read_json(step2_file) step2_score = step2_data.get("匹配结果", {}).get("score", 0) step2_b_content = step2_data.get("输入信息", {}).get("B", "") step2_word_count = len(step2_b_content.split("\n")) if step2_b_content else 0 # ========== 保存流程汇总 ========== output_dir = os.path.join(persona_dir, "how", "灵感点", inspiration) scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all" # 从 step1 文件名提取模型名称 step1_filename = os.path.basename(step1_file) model_short = step1_filename.split("_")[-1].replace(".json", "") summary_filename = f"{scope_prefix}_summary_完整流程_{model_short}.json" summary_file = os.path.join(output_dir, summary_filename) summary = { "元数据": { "current_time": current_time, "log_url": log_url, "流程": "Step1 + Step2 完整分析", "step1_model": step1_data.get("元数据", {}).get("model", ""), "step2_model": step2_data.get("元数据", {}).get("model", "") }, "灵感": inspiration, "文件路径": { "step1": step1_file, "step2": step2_file }, "关键指标": { "step1_top1_score": step1_score, "step1_top1_匹配要素": step1_element, "step2_增量词数量": step2_word_count, "step2_score": step2_score } } with open(summary_file, 'w', encoding='utf-8') as f: json.dump(summary, f, ensure_ascii=False, indent=2) print(f"{'=' * 80}") print(f"完整流程执行完成") print(f"{'=' * 80}") print(f"\n结果文件:") print(f" Step1: {step1_file}") print(f" Step2: {step2_file}") print(f" 汇总: {summary_file}\n") return { "step1_file": step1_file, "step2_file": step2_file, "summary_file": summary_file, "status": "success" } async def main(): """主函数""" # 解析命令行参数 parser = argparse.ArgumentParser( description="灵感分析主流程 (Step1 + Step2)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: # 处理第1个灵感 python run_inspiration_analysis.py --dir data/阿里多多酱/out/人设_1110 --count 1 # 随机处理5个灵感 python run_inspiration_analysis.py --count 5 --shuffle # 处理所有灵感,强制重新执行 python run_inspiration_analysis.py --count all --force # 处理前10个灵感,step1只处理前20个任务 python run_inspiration_analysis.py --count 10 --max-tasks 20 """ ) parser.add_argument( "--dir", default="data/阿里多多酱/out/人设_1110", help="人设目录路径 (默认: data/阿里多多酱/out/人设_1110)" ) parser.add_argument( "--count", default="1", help="处理的灵感数量,可以是数字或 'all' (默认: 1)" ) parser.add_argument( "--max-tasks", type=str, default="all", help="Step1 处理的最大任务数,可以是数字或 'all' (默认: all)" ) parser.add_argument( "--force", action="store_true", help="强制重新执行,覆盖已存在的文件" ) parser.add_argument( "--shuffle", action="store_true", help="随机选择灵感,而不是按顺序" ) args = parser.parse_args() persona_dir = args.dir force = args.force shuffle = args.shuffle # 处理 max_tasks max_tasks = None if args.max_tasks == "all" else int(args.max_tasks) print(f"{'=' * 80}") print(f"灵感分析主流程 (Step1 + Step2)") print(f"{'=' * 80}") print(f"人设目录: {persona_dir}") # 加载灵感列表 inspiration_list = load_inspiration_list(persona_dir) # 确定要处理的灵感数量 if args.count == "all": inspiration_count = len(inspiration_list) print(f"处理灵感: 全部 ({inspiration_count} 个)") else: inspiration_count = int(args.count) print(f"处理灵感: 前 {inspiration_count} 个") if max_tasks: print(f"Step1 任务数限制: {max_tasks}") if force: print(f"强制模式: 重新执行所有步骤") if shuffle: print(f"随机模式: 随机选择灵感") # 选择要处理的灵感列表 if shuffle: # 随机打乱灵感列表后选择 shuffled_list = inspiration_list.copy() random.shuffle(shuffled_list) inspirations_to_process = shuffled_list[:inspiration_count] else: # 按顺序选择前 N 个 inspirations_to_process = inspiration_list[:inspiration_count] print(f"\n将处理以下灵感:") for i, insp in enumerate(inspirations_to_process, 1): print(f" {i}. {insp}") # 批量执行流程 results = [] for i, inspiration in enumerate(inspirations_to_process, 1): print(f"\n{'#' * 80}") print(f"处理第 {i}/{len(inspirations_to_process)} 个灵感: {inspiration}") print(f"{'#' * 80}") # 为每个灵感创建独立的 trace insp_time, insp_log_url = set_trace() with trace(f"灵感分析: {inspiration}"): result = await run_full_analysis( persona_dir=persona_dir, inspiration=inspiration, max_tasks=max_tasks, force=force, current_time=insp_time, log_url=insp_log_url ) results.append(result) if insp_log_url: print(f"本次 Trace: {insp_log_url}") # 输出最终汇总 print(f"\n{'=' * 80}") print(f"批量处理完成") print(f"{'=' * 80}") success_count = sum(1 for r in results if r["status"] == "success") print(f"\n成功: {success_count}/{len(results)}") for i, (insp, result) in enumerate(zip(inspirations_to_process, results), 1): status_icon = "✓" if result["status"] == "success" else "✗" print(f" {status_icon} [{i}] {insp}") if __name__ == "__main__": # 主流程不设置 trace,由每个灵感独立设置 asyncio.run(main())