""" 主流程脚本:串联 Step1、搜索和 Step2 执行完整的灵感分析流程: 1. Step1: 灵感与人设匹配(调用 step1 main,自动保存结果) 2. Step1.5: 基于 Top1 匹配要素进行小红书搜索(使用 search_xiaohongshu) 3. Step2: 增量词在人设中的匹配(调用 step2 main,自动保存结果) 4. 生成流程汇总文件 """ import os import sys import json import asyncio import random import argparse from agents import trace from lib.my_trace import set_trace_smith as set_trace from lib.data_loader import load_inspiration_list, select_inspiration from lib.utils import read_json # 导入 step1, step2 和 step4 的 main 函数 import step1_inspiration_match import step2_incremental_match import step4_search_result_match # 导入搜索功能 from script.search import search_xiaohongshu def find_step1_output(persona_dir: str, inspiration: str, max_tasks: int = None) -> str: """查找 step1 输出文件 Args: persona_dir: 人设目录 inspiration: 灵感点名称 max_tasks: 任务数限制 Returns: step1 文件路径 """ from pathlib import Path step1_dir = os.path.join(persona_dir, "how", "灵感点", inspiration) scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all" step1_pattern = f"{scope_prefix}_step1_*.json" step1_files = list(Path(step1_dir).glob(step1_pattern)) if not step1_files: raise FileNotFoundError(f"找不到 step1 输出文件: {step1_dir}/{step1_pattern}") return str(step1_files[0]) def find_step2_output(persona_dir: str, inspiration: str, max_tasks: int = None) -> str: """查找 step2 输出文件 Args: persona_dir: 人设目录 inspiration: 灵感点名称 max_tasks: 任务数限制 Returns: step2 文件路径 """ from pathlib import Path step2_dir = os.path.join(persona_dir, "how", "灵感点", inspiration) scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all" step2_pattern = f"{scope_prefix}_step2_*.json" step2_files = list(Path(step2_dir).glob(step2_pattern)) if not step2_files: raise FileNotFoundError(f"找不到 step2 输出文件: {step2_dir}/{step2_pattern}") return str(step2_files[0]) def get_inspiration_score(persona_dir: str, inspiration: str, max_tasks: int = None) -> float: """获取灵感的 Step1 Top1 分数 Args: persona_dir: 人设目录 inspiration: 灵感点名称 max_tasks: 任务数限制 Returns: Step1 Top1 的 score,如果文件不存在返回 -1 """ try: step1_file = find_step1_output(persona_dir, inspiration, max_tasks) step1_data = read_json(step1_file) results = step1_data.get("匹配结果列表", []) if results: return results[0].get('匹配结果', {}).get('score', 0) return 0 except (FileNotFoundError, Exception): return -1 def sort_inspirations_by_score( persona_dir: str, inspiration_list: list, max_tasks: int = None ) -> list: """根据 Step1 结果分数对灵感列表排序 Args: persona_dir: 人设目录 inspiration_list: 灵感列表 max_tasks: 任务数限制 Returns: 排序后的灵感列表(按分数降序) """ print(f"\n{'─' * 80}") print(f"正在读取现有 Step1 结果文件...") print(f"{'─' * 80}") inspiration_scores = [] for inspiration in inspiration_list: score = get_inspiration_score(persona_dir, inspiration, max_tasks) inspiration_scores.append({ "inspiration": inspiration, "score": score, "has_result": score >= 0 }) # 统计 has_result_count = sum(1 for item in inspiration_scores if item["has_result"]) print(f"找到 {has_result_count}/{len(inspiration_list)} 个灵感的 Step1 结果") # 排序:有结果的按分数降序,无结果的放最后(保持原顺序) sorted_items = sorted( inspiration_scores, key=lambda x: (x["has_result"], x["score"]), reverse=True ) # 显示排序结果(前10个) print(f"\n排序后的灵感列表(前10个):") for i, item in enumerate(sorted_items[:10], 1): status = f"score={item['score']:.2f}" if item['has_result'] else "无结果" print(f" {i}. [{status}] {item['inspiration']}") if len(sorted_items) > 10: print(f" ... 还有 {len(sorted_items) - 10} 个") return [item["inspiration"] for item in sorted_items] async def run_full_analysis( persona_dir: str, inspiration: str, max_tasks: int = None, force: bool = False, current_time: str = None, log_url: str = None, enable_step2: bool = False, search_only: bool = False, search_and_match: bool = False ) -> dict: """执行完整的灵感分析流程(Step1 + 搜索 + Step4匹配 + Step2) Args: persona_dir: 人设目录路径 inspiration: 灵感点文本 max_tasks: step1 最大任务数(None 表示不限制) force: 是否强制重新执行(跳过文件存在检查) current_time: 当前时间戳 log_url: 日志链接 enable_step2: 是否执行 Step2(默认 False) search_only: 是否只执行搜索(跳过 Step1 和 Step2,默认 False) search_and_match: 是否搜索并匹配模式(跳过 Step1 和 Step2,执行搜索和 Step4,默认 False) Returns: 包含文件路径和状态的字典 """ print(f"\n{'=' * 80}") mode_desc = "仅搜索" if search_only else ("搜索并匹配" if search_and_match else "完整分析") print(f"开始{mode_desc}流程: {inspiration}") print(f"{'=' * 80}\n") # 保存原始 sys.argv original_argv = sys.argv.copy() # ========== Step1: 灵感与人设匹配 ========== if not search_only and not search_and_match: print(f"{'─' * 80}") print(f"Step1: 灵感与人设匹配") print(f"{'─' * 80}\n") # 临时修改 sys.argv 来传递参数给 step1 sys.argv = [ "step1_inspiration_match.py", persona_dir, inspiration, str(max_tasks) if max_tasks is not None else "all" ] try: # 调用 step1 的 main 函数(通过参数传递 force) await step1_inspiration_match.main(current_time, log_url, force=force) finally: # 恢复原始参数 sys.argv = original_argv # 查找 step1 输出文件 step1_file = find_step1_output(persona_dir, inspiration, max_tasks) print(f"✓ Step1 完成,结果文件: {step1_file}\n") else: print(f"{'─' * 80}") mode_label = "搜索并匹配模式" if search_and_match else "仅搜索模式" print(f"Step1: 跳过({mode_label})") print(f"{'─' * 80}\n") # 查找已有的 step1 输出文件 try: step1_file = find_step1_output(persona_dir, inspiration, max_tasks) print(f"✓ 找到已有 Step1 结果: {step1_file}\n") except FileNotFoundError as e: print(f"⚠️ {e}") return { "step1_file": None, "search_file": None, "step4_file": None, "step2_file": None, "summary_file": None, "status": "step1_not_found" } # 读取 step1 结果 step1_data = read_json(step1_file) step1_results = step1_data.get("匹配结果列表", []) if not step1_results: print("⚠️ Step1 结果为空,终止流程") return { "step1_file": step1_file, "step2_file": None, "summary_file": None, "status": "step1_empty" } step1_top1 = step1_results[0] step1_score = step1_top1.get('匹配结果', {}).get('score', 0) step1_element = step1_top1.get("业务信息", {}).get("匹配要素", "") print(f"Top1 匹配要素: {step1_element}, score: {step1_score:.2f}") # ========== Step1.5: 小红书搜索 ========== print(f"\n{'─' * 80}") print(f"Step1.5: 基于 Top1 匹配要素进行小红书搜索") print(f"{'─' * 80}\n") search_keyword = step1_element print(f"搜索关键词: {search_keyword}") # 执行搜索 try: search_result = search_xiaohongshu(search_keyword) search_notes_count = len(search_result.get('notes', [])) print(f"✓ 搜索完成,找到 {search_notes_count} 条笔记") # 保存搜索结果 search_dir = os.path.join(persona_dir, "how", "灵感点", inspiration, "search") os.makedirs(search_dir, exist_ok=True) scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all" # 清理文件名中的非法字符 safe_keyword = search_keyword[:20].replace('/', '_').replace('\\', '_').replace(':', '_') search_filename = f"{scope_prefix}_search_{safe_keyword}.json" search_file = os.path.join(search_dir, search_filename) with open(search_file, 'w', encoding='utf-8') as f: json.dump(search_result, f, ensure_ascii=False, indent=2) print(f"✓ 搜索结果已保存: {search_file}\n") except Exception as e: print(f"⚠️ 搜索失败: {e}") search_file = None search_notes_count = 0 # ========== Step4: 搜索结果匹配 ========== step4_file = None step4_high_score_count = None step4_top1_score = None if search_and_match and 'search_file' in locals() and search_file: from pathlib import Path # 检查 step4 输出文件是否已存在 step4_dir = os.path.join(persona_dir, "how", "灵感点", inspiration, "search") scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all" step4_pattern = f"{scope_prefix}_step4_*.json" step4_files = list(Path(step4_dir).glob(step4_pattern)) if os.path.exists(step4_dir) else [] step4_exists = len(step4_files) > 0 if step4_exists and not force: print(f"\n{'─' * 80}") print(f"Step4: 已跳过(结果文件已存在)") print(f"{'─' * 80}\n") step4_file = str(step4_files[0]) print(f"✓ 找到已有 Step4 结果: {step4_file}\n") # 读取已有结果 step4_data = read_json(step4_file) step4_results = step4_data.get("匹配结果列表", []) step4_high_score_count = sum(1 for r in step4_results if r.get("匹配结果", {}).get("score", 0) >= 0.7) step4_top1_score = step4_results[0].get("匹配结果", {}).get("score", 0) if step4_results else 0 else: print(f"\n{'─' * 80}") print(f"Step4: 搜索结果与灵感匹配") print(f"{'─' * 80}\n") # 临时修改 sys.argv 来传递参数给 step4 sys.argv = [ "step4_search_result_match.py", persona_dir, inspiration ] if max_tasks is not None: sys.argv.append(str(max_tasks)) try: # 调用 step4 的 main 函数 await step4_search_result_match.main(current_time, log_url, force=force) finally: # 恢复原始参数 sys.argv = original_argv # 查找 step4 输出文件 step4_files = list(Path(step4_dir).glob(step4_pattern)) if step4_files: step4_file = str(step4_files[0]) print(f"✓ Step4 完成,结果文件: {step4_file}\n") # 读取 step4 结果 step4_data = read_json(step4_file) step4_results = step4_data.get("匹配结果列表", []) step4_high_score_count = sum(1 for r in step4_results if r.get("匹配结果", {}).get("score", 0) >= 0.7) step4_top1_score = step4_results[0].get("匹配结果", {}).get("score", 0) if step4_results else 0 elif search_and_match: print(f"\n{'─' * 80}") print(f"Step4: 已跳过(搜索失败)") print(f"{'─' * 80}\n") # ========== Step2: 增量词匹配 ========== step2_file = None step2_score = None step2_word_count = None if enable_step2 and not search_only and not search_and_match: print(f"\n{'─' * 80}") print(f"Step2: 增量词在人设中的匹配") print(f"{'─' * 80}\n") # 临时修改 sys.argv 来传递参数给 step2 sys.argv = [ "step2_incremental_match.py", persona_dir, inspiration ] try: # 调用 step2 的 main 函数(通过参数传递 force) await step2_incremental_match.main(current_time, log_url, force=force) finally: # 恢复原始参数 sys.argv = original_argv # 查找 step2 输出文件 step2_file = find_step2_output(persona_dir, inspiration, max_tasks) print(f"✓ Step2 完成,结果文件: {step2_file}\n") # 读取 step2 结果 step2_data = read_json(step2_file) step2_score = step2_data.get("匹配结果", {}).get("score", 0) step2_b_content = step2_data.get("输入信息", {}).get("B", "") step2_word_count = len(step2_b_content.split("\n")) if step2_b_content else 0 elif not search_only and not search_and_match: print(f"\n{'─' * 80}") print(f"Step2: 已跳过(使用 --enable-step2 启用)") print(f"{'─' * 80}\n") # ========== 保存流程汇总 ========== # search_only 和 search_and_match 模式不保存汇总文件 if not search_only and not search_and_match: output_dir = os.path.join(persona_dir, "how", "灵感点", inspiration) scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all" # 从 step1 文件名提取模型名称 step1_filename = os.path.basename(step1_file) model_short = step1_filename.split("_")[-1].replace(".json", "") summary_filename = f"{scope_prefix}_summary_完整流程_{model_short}.json" summary_file = os.path.join(output_dir, summary_filename) # 构建流程描述 workflow = "Step1 + 搜索" if enable_step2: workflow += " + Step2" summary = { "元数据": { "current_time": current_time, "log_url": log_url, "流程": workflow, "step1_model": step1_data.get("元数据", {}).get("model", ""), "step2_model": step2_data.get("元数据", {}).get("model", "") if enable_step2 and 'step2_data' in locals() else None }, "灵感": inspiration, "文件路径": { "step1": step1_file, "search": search_file if 'search_file' in locals() else None, "step2": step2_file }, "关键指标": { "step1_top1_score": step1_score, "step1_top1_匹配要素": step1_element, "search_keyword": search_keyword if 'search_keyword' in locals() else None, "search_notes_count": search_notes_count if 'search_notes_count' in locals() else 0, "step2_增量词数量": step2_word_count, "step2_score": step2_score } } with open(summary_file, 'w', encoding='utf-8') as f: json.dump(summary, f, ensure_ascii=False, indent=2) else: summary_file = None print(f"{'=' * 80}") mode_desc = "仅搜索" if search_only else ("搜索并匹配" if search_and_match else "完整流程") print(f"{mode_desc}执行完成") print(f"{'=' * 80}") print(f"\n结果文件:") if not search_only and not search_and_match: print(f" Step1: {step1_file}") if 'search_file' in locals() and search_file: print(f" 搜索: {search_file}") if step4_file: print(f" Step4: {step4_file}") if step4_high_score_count is not None: print(f" (高匹配: {step4_high_score_count} 个, Top1 score: {step4_top1_score:.2f})") if enable_step2 and step2_file: print(f" Step2: {step2_file}") if summary_file: print(f" 汇总: {summary_file}") print() return { "step1_file": step1_file if not search_only and not search_and_match else None, "search_file": search_file if 'search_file' in locals() else None, "step4_file": step4_file, "step2_file": step2_file, "summary_file": summary_file, "status": "success" } async def main(): """主函数""" # 解析命令行参数 parser = argparse.ArgumentParser( description="灵感分析主流程 (Step1 + 搜索 + Step2)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 使用示例: # 处理第1个灵感(Step1 + 搜索,默认不执行 Step2) python run_inspiration_analysis.py --dir data/阿里多多酱/out/人设_1110 --count 1 # 启用 Step2 完整流程(Step1 + 搜索 + Step2) python run_inspiration_analysis.py --count 1 --enable-step2 # 随机处理5个灵感 python run_inspiration_analysis.py --count 5 --shuffle # 按 Step1 分数排序,处理前10个高分灵感 python run_inspiration_analysis.py --count 10 --sort-by-score # 仅搜索模式:基于已有 Step1 结果,按分数降序搜索前10个 python run_inspiration_analysis.py --search-only --count 10 # 搜索并匹配模式:基于已有 Step1 结果,执行搜索和 Step4 匹配 python run_inspiration_analysis.py --search-and-match --count 10 # 处理所有灵感,强制重新执行 python run_inspiration_analysis.py --count all --force # 处理前10个灵感,step1只处理前20个任务 python run_inspiration_analysis.py --count 10 --max-tasks 20 """ ) parser.add_argument( "--dir", default="data/阿里多多酱/out/人设_1110", help="人设目录路径 (默认: data/阿里多多酱/out/人设_1110)" ) parser.add_argument( "--count", default="1", help="处理的灵感数量,可以是数字或 'all' (默认: 1)" ) parser.add_argument( "--max-tasks", type=str, default="all", help="Step1 处理的最大任务数,可以是数字或 'all' (默认: all)" ) parser.add_argument( "--force", action="store_true", help="强制重新执行,覆盖已存在的文件" ) parser.add_argument( "--shuffle", action="store_true", help="随机选择灵感,而不是按顺序" ) parser.add_argument( "--sort-by-score", action="store_true", help="根据 Step1 结果分数排序(降序),优先处理高分灵感" ) parser.add_argument( "--enable-step2", action="store_true", help="启用 Step2 增量词匹配(默认关闭)" ) parser.add_argument( "--search-only", action="store_true", help="仅执行搜索(跳过 Step1 和 Step2,基于已有 Step1 结果,自动按分数降序)" ) parser.add_argument( "--search-and-match", action="store_true", help="搜索并匹配模式(跳过 Step1 和 Step2,执行搜索和 Step4 匹配,自动按分数降序)" ) args = parser.parse_args() persona_dir = args.dir force = args.force shuffle = args.shuffle sort_by_score = args.sort_by_score enable_step2 = args.enable_step2 search_only = args.search_only search_and_match = args.search_and_match # 互斥检查 if search_only and search_and_match: print("❌ 错误: --search-only 和 --search-and-match 不能同时使用") sys.exit(1) # search_only 和 search_and_match 模式自动启用分数排序 if search_only or search_and_match: sort_by_score = True enable_step2 = False # 搜索模式下强制禁用 step2 if shuffle: print("⚠️ 警告: --search-only 模式会自动按分数排序,忽略 --shuffle 参数") shuffle = False # 处理 max_tasks max_tasks = None if args.max_tasks == "all" else int(args.max_tasks) # 动态流程名称 if search_only: workflow_name = "仅搜索" else: workflow_name = "Step1 + 搜索" if enable_step2: workflow_name += " + Step2" print(f"{'=' * 80}") print(f"灵感分析主流程 ({workflow_name})") print(f"{'=' * 80}") print(f"人设目录: {persona_dir}") # 加载灵感列表 inspiration_list = load_inspiration_list(persona_dir) # 确定要处理的灵感数量 if args.count == "all": inspiration_count = len(inspiration_list) print(f"处理灵感: 全部 ({inspiration_count} 个)") else: inspiration_count = int(args.count) print(f"处理灵感: 前 {inspiration_count} 个") if max_tasks: print(f"Step1 任务数限制: {max_tasks}") if search_only: print(f"搜索模式: 仅搜索(跳过 Step1 和 Step2)") print(f"分数排序: 根据已有 Step1 结果按分数降序处理") else: if force: print(f"强制模式: 重新执行所有步骤") if shuffle: print(f"随机模式: 随机选择灵感") if sort_by_score: print(f"分数排序: 根据 Step1 结果按分数降序处理") if enable_step2: print(f"Step2: 启用增量词匹配") else: print(f"Step2: 已关闭(使用 --enable-step2 启用)") # 选择要处理的灵感列表 if sort_by_score: # 根据 Step1 结果分数排序 sorted_list = sort_inspirations_by_score(persona_dir, inspiration_list, max_tasks) inspirations_to_process = sorted_list[:inspiration_count] elif shuffle: # 随机打乱灵感列表后选择 shuffled_list = inspiration_list.copy() random.shuffle(shuffled_list) inspirations_to_process = shuffled_list[:inspiration_count] else: # 按顺序选择前 N 个 inspirations_to_process = inspiration_list[:inspiration_count] print(f"\n将处理以下灵感:") for i, insp in enumerate(inspirations_to_process, 1): print(f" {i}. {insp}") # 批量执行流程 results = [] for i, inspiration in enumerate(inspirations_to_process, 1): print(f"\n{'#' * 80}") print(f"处理第 {i}/{len(inspirations_to_process)} 个灵感: {inspiration}") print(f"{'#' * 80}") # search_only 和 search_and_match 模式不创建 trace if search_only or search_and_match: result = await run_full_analysis( persona_dir=persona_dir, inspiration=inspiration, max_tasks=max_tasks, force=force, current_time=None, log_url=None, enable_step2=enable_step2, search_only=search_only, search_and_match=search_and_match ) else: # 为每个灵感创建独立的 trace insp_time, insp_log_url = set_trace() with trace(f"灵感分析: {inspiration}"): result = await run_full_analysis( persona_dir=persona_dir, inspiration=inspiration, max_tasks=max_tasks, force=force, current_time=insp_time, log_url=insp_log_url, enable_step2=enable_step2, search_only=search_only, search_and_match=search_and_match ) if insp_log_url: print(f"本次 Trace: {insp_log_url}") results.append(result) # 输出最终汇总 print(f"\n{'=' * 80}") print(f"批量处理完成") print(f"{'=' * 80}") success_count = sum(1 for r in results if r["status"] == "success") print(f"\n成功: {success_count}/{len(results)}") for i, (insp, result) in enumerate(zip(inspirations_to_process, results), 1): status_icon = "✓" if result["status"] == "success" else "✗" print(f" {status_icon} [{i}] {insp}") if __name__ == "__main__": # 主流程不设置 trace,由每个灵感独立设置 asyncio.run(main())