||
- """
- 主流程脚本:串联 Step1、搜索和 Step2
- 执行完整的灵感分析流程:
- 1. Step1: 灵感与人设匹配(调用 step1 main,自动保存结果)
- 2. Step1.5: 基于 Top1 匹配要素进行小红书搜索(使用 search_xiaohongshu)
- 3. Step2: 增量词在人设中的匹配(调用 step2 main,自动保存结果)
- 4. 生成流程汇总文件
- """
- import os
- import sys
- import json
- import asyncio
- import random
- import argparse
- from agents import trace
- from lib.my_trace import set_trace_smith as set_trace
- from lib.data_loader import load_inspiration_list, select_inspiration
- from lib.utils import read_json
- # 导入 step1, step2 和 step4 的 main 函数
- import step1_inspiration_match
- import step2_incremental_match
- import step4_search_result_match
- # 导入搜索功能
- from script.search import search_xiaohongshu
- def find_step1_output(persona_dir: str, inspiration: str, max_tasks: int = None) -> str:
- """查找 step1 输出文件
- Args:
- persona_dir: 人设目录
- inspiration: 灵感点名称
- max_tasks: 任务数限制
- Returns:
- step1 文件路径
- """
- from pathlib import Path
- step1_dir = os.path.join(persona_dir, "how", "灵感点", inspiration)
- scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all"
- step1_pattern = f"{scope_prefix}_step1_*.json"
- step1_files = list(Path(step1_dir).glob(step1_pattern))
- if not step1_files:
- raise FileNotFoundError(f"找不到 step1 输出文件: {step1_dir}/{step1_pattern}")
- return str(step1_files[0])
- def find_step2_output(persona_dir: str, inspiration: str, max_tasks: int = None) -> str:
- """查找 step2 输出文件
- Args:
- persona_dir: 人设目录
- inspiration: 灵感点名称
- max_tasks: 任务数限制
- Returns:
- step2 文件路径
- """
- from pathlib import Path
- step2_dir = os.path.join(persona_dir, "how", "灵感点", inspiration)
- scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all"
- step2_pattern = f"{scope_prefix}_step2_*.json"
- step2_files = list(Path(step2_dir).glob(step2_pattern))
- if not step2_files:
- raise FileNotFoundError(f"找不到 step2 输出文件: {step2_dir}/{step2_pattern}")
- return str(step2_files[0])
- def get_inspiration_score(persona_dir: str, inspiration: str, max_tasks: int = None) -> float:
- """获取灵感的 Step1 Top1 分数
- Args:
- persona_dir: 人设目录
- inspiration: 灵感点名称
- max_tasks: 任务数限制
- Returns:
- Step1 Top1 的 score,如果文件不存在返回 -1
- """
- try:
- step1_file = find_step1_output(persona_dir, inspiration, max_tasks)
- step1_data = read_json(step1_file)
- results = step1_data.get("匹配结果列表", [])
- if results:
- return results[0].get('匹配结果', {}).get('score', 0)
- return 0
- except (FileNotFoundError, Exception):
- return -1
- def sort_inspirations_by_score(
- persona_dir: str,
- inspiration_list: list,
- max_tasks: int = None
- ) -> list:
- """根据 Step1 结果分数对灵感列表排序
- Args:
- persona_dir: 人设目录
- inspiration_list: 灵感列表
- max_tasks: 任务数限制
- Returns:
- 排序后的灵感列表(按分数降序)
- """
- print(f"\n{'─' * 80}")
- print(f"正在读取现有 Step1 结果文件...")
- print(f"{'─' * 80}")
- inspiration_scores = []
- for inspiration in inspiration_list:
- score = get_inspiration_score(persona_dir, inspiration, max_tasks)
- inspiration_scores.append({
- "inspiration": inspiration,
- "score": score,
- "has_result": score >= 0
- })
- # 统计
- has_result_count = sum(1 for item in inspiration_scores if item["has_result"])
- print(f"找到 {has_result_count}/{len(inspiration_list)} 个灵感的 Step1 结果")
- # 排序:有结果的按分数降序,无结果的放最后(保持原顺序)
- sorted_items = sorted(
- inspiration_scores,
- key=lambda x: (x["has_result"], x["score"]),
- reverse=True
- )
- # 显示排序结果(前10个)
- print(f"\n排序后的灵感列表(前10个):")
- for i, item in enumerate(sorted_items[:10], 1):
- status = f"score={item['score']:.2f}" if item['has_result'] else "无结果"
- print(f" {i}. [{status}] {item['inspiration']}")
- if len(sorted_items) > 10:
- print(f" ... 还有 {len(sorted_items) - 10} 个")
- return [item["inspiration"] for item in sorted_items]
- async def run_full_analysis(
- persona_dir: str,
- inspiration: str,
- max_tasks: int = None,
- force: bool = False,
- current_time: str = None,
- log_url: str = None,
- enable_step2: bool = False,
- search_only: bool = False,
- search_and_match: bool = False
- ) -> dict:
- """执行完整的灵感分析流程(Step1 + 搜索 + Step4匹配 + Step2)
- Args:
- persona_dir: 人设目录路径
- inspiration: 灵感点文本
- max_tasks: step1 最大任务数(None 表示不限制)
- force: 是否强制重新执行(跳过文件存在检查)
- current_time: 当前时间戳
- log_url: 日志链接
- enable_step2: 是否执行 Step2(默认 False)
- search_only: 是否只执行搜索(跳过 Step1 和 Step2,默认 False)
- search_and_match: 是否搜索并匹配模式(跳过 Step1 和 Step2,执行搜索和 Step4,默认 False)
- Returns:
- 包含文件路径和状态的字典
- """
- print(f"\n{'=' * 80}")
- mode_desc = "仅搜索" if search_only else ("搜索并匹配" if search_and_match else "完整分析")
- print(f"开始{mode_desc}流程: {inspiration}")
- print(f"{'=' * 80}\n")
- # 保存原始 sys.argv
- original_argv = sys.argv.copy()
- # ========== Step1: 灵感与人设匹配 ==========
- if not search_only and not search_and_match:
- print(f"{'─' * 80}")
- print(f"Step1: 灵感与人设匹配")
- print(f"{'─' * 80}\n")
- # 临时修改 sys.argv 来传递参数给 step1
- sys.argv = [
- "step1_inspiration_match.py",
- persona_dir,
- inspiration,
- str(max_tasks) if max_tasks is not None else "all"
- ]
- try:
- # 调用 step1 的 main 函数(通过参数传递 force)
- await step1_inspiration_match.main(current_time, log_url, force=force)
- finally:
- # 恢复原始参数
- sys.argv = original_argv
- # 查找 step1 输出文件
- step1_file = find_step1_output(persona_dir, inspiration, max_tasks)
- print(f"✓ Step1 完成,结果文件: {step1_file}\n")
- else:
- print(f"{'─' * 80}")
- mode_label = "搜索并匹配模式" if search_and_match else "仅搜索模式"
- print(f"Step1: 跳过({mode_label})")
- print(f"{'─' * 80}\n")
- # 查找已有的 step1 输出文件
- try:
- step1_file = find_step1_output(persona_dir, inspiration, max_tasks)
- print(f"✓ 找到已有 Step1 结果: {step1_file}\n")
- except FileNotFoundError as e:
- print(f"⚠️ {e}")
- return {
- "step1_file": None,
- "search_file": None,
- "step4_file": None,
- "step2_file": None,
- "summary_file": None,
- "status": "step1_not_found"
- }
- # 读取 step1 结果
- step1_data = read_json(step1_file)
- step1_results = step1_data.get("匹配结果列表", [])
- if not step1_results:
- print("⚠️ Step1 结果为空,终止流程")
- return {
- "step1_file": step1_file,
- "step2_file": None,
- "summary_file": None,
- "status": "step1_empty"
- }
- step1_top1 = step1_results[0]
- step1_score = step1_top1.get('匹配结果', {}).get('score', 0)
- step1_element = step1_top1.get("业务信息", {}).get("匹配要素", "")
- print(f"Top1 匹配要素: {step1_element}, score: {step1_score:.2f}")
- # ========== Step1.5: 小红书搜索 ==========
- print(f"\n{'─' * 80}")
- print(f"Step1.5: 基于 Top1 匹配要素进行小红书搜索")
- print(f"{'─' * 80}\n")
- search_keyword = step1_element
- print(f"搜索关键词: {search_keyword}")
- # 执行搜索
- try:
- search_result = search_xiaohongshu(search_keyword)
- search_notes_count = len(search_result.get('notes', []))
- print(f"✓ 搜索完成,找到 {search_notes_count} 条笔记")
- # 保存搜索结果
- search_dir = os.path.join(persona_dir, "how", "灵感点", inspiration, "search")
- os.makedirs(search_dir, exist_ok=True)
- scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all"
- # 清理文件名中的非法字符
- safe_keyword = search_keyword[:20].replace('/', '_').replace('\\', '_').replace(':', '_')
- search_filename = f"{scope_prefix}_search_{safe_keyword}.json"
- search_file = os.path.join(search_dir, search_filename)
- with open(search_file, 'w', encoding='utf-8') as f:
- json.dump(search_result, f, ensure_ascii=False, indent=2)
- print(f"✓ 搜索结果已保存: {search_file}\n")
- except Exception as e:
- print(f"⚠️ 搜索失败: {e}")
- search_file = None
- search_notes_count = 0
- # ========== Step4: 搜索结果匹配 ==========
- step4_file = None
- step4_high_score_count = None
- step4_top1_score = None
- if search_and_match and 'search_file' in locals() and search_file:
- from pathlib import Path
- # 检查 step4 输出文件是否已存在
- step4_dir = os.path.join(persona_dir, "how", "灵感点", inspiration, "search")
- scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all"
- step4_pattern = f"{scope_prefix}_step4_*.json"
- step4_files = list(Path(step4_dir).glob(step4_pattern)) if os.path.exists(step4_dir) else []
- step4_exists = len(step4_files) > 0
- if step4_exists and not force:
- print(f"\n{'─' * 80}")
- print(f"Step4: 已跳过(结果文件已存在)")
- print(f"{'─' * 80}\n")
- step4_file = str(step4_files[0])
- print(f"✓ 找到已有 Step4 结果: {step4_file}\n")
- # 读取已有结果
- step4_data = read_json(step4_file)
- step4_results = step4_data.get("匹配结果列表", [])
- step4_high_score_count = sum(1 for r in step4_results if r.get("匹配结果", {}).get("score", 0) >= 0.7)
- step4_top1_score = step4_results[0].get("匹配结果", {}).get("score", 0) if step4_results else 0
- else:
- print(f"\n{'─' * 80}")
- print(f"Step4: 搜索结果与灵感匹配")
- print(f"{'─' * 80}\n")
- # 临时修改 sys.argv 来传递参数给 step4
- sys.argv = [
- "step4_search_result_match.py",
- persona_dir,
- inspiration
- ]
- if max_tasks is not None:
- sys.argv.append(str(max_tasks))
- try:
- # 调用 step4 的 main 函数
- await step4_search_result_match.main(current_time, log_url, force=force)
- finally:
- # 恢复原始参数
- sys.argv = original_argv
- # 查找 step4 输出文件
- step4_files = list(Path(step4_dir).glob(step4_pattern))
- if step4_files:
- step4_file = str(step4_files[0])
- print(f"✓ Step4 完成,结果文件: {step4_file}\n")
- # 读取 step4 结果
- step4_data = read_json(step4_file)
- step4_results = step4_data.get("匹配结果列表", [])
- step4_high_score_count = sum(1 for r in step4_results if r.get("匹配结果", {}).get("score", 0) >= 0.7)
- step4_top1_score = step4_results[0].get("匹配结果", {}).get("score", 0) if step4_results else 0
- elif search_and_match:
- print(f"\n{'─' * 80}")
- print(f"Step4: 已跳过(搜索失败)")
- print(f"{'─' * 80}\n")
- # ========== Step2: 增量词匹配 ==========
- step2_file = None
- step2_score = None
- step2_word_count = None
- if enable_step2 and not search_only and not search_and_match:
- print(f"\n{'─' * 80}")
- print(f"Step2: 增量词在人设中的匹配")
- print(f"{'─' * 80}\n")
- # 临时修改 sys.argv 来传递参数给 step2
- sys.argv = [
- "step2_incremental_match.py",
- persona_dir,
- inspiration
- ]
- try:
- # 调用 step2 的 main 函数(通过参数传递 force)
- await step2_incremental_match.main(current_time, log_url, force=force)
- finally:
- # 恢复原始参数
- sys.argv = original_argv
- # 查找 step2 输出文件
- step2_file = find_step2_output(persona_dir, inspiration, max_tasks)
- print(f"✓ Step2 完成,结果文件: {step2_file}\n")
- # 读取 step2 结果
- step2_data = read_json(step2_file)
- step2_score = step2_data.get("匹配结果", {}).get("score", 0)
- step2_b_content = step2_data.get("输入信息", {}).get("B", "")
- step2_word_count = len(step2_b_content.split("\n")) if step2_b_content else 0
- elif not search_only and not search_and_match:
- print(f"\n{'─' * 80}")
- print(f"Step2: 已跳过(使用 --enable-step2 启用)")
- print(f"{'─' * 80}\n")
- # ========== 保存流程汇总 ==========
- # search_only 和 search_and_match 模式不保存汇总文件
- if not search_only and not search_and_match:
- output_dir = os.path.join(persona_dir, "how", "灵感点", inspiration)
- scope_prefix = f"top{max_tasks}" if max_tasks is not None else "all"
- # 从 step1 文件名提取模型名称
- step1_filename = os.path.basename(step1_file)
- model_short = step1_filename.split("_")[-1].replace(".json", "")
- summary_filename = f"{scope_prefix}_summary_完整流程_{model_short}.json"
- summary_file = os.path.join(output_dir, summary_filename)
- # 构建流程描述
- workflow = "Step1 + 搜索"
- if enable_step2:
- workflow += " + Step2"
- summary = {
- "元数据": {
- "current_time": current_time,
- "log_url": log_url,
- "流程": workflow,
- "step1_model": step1_data.get("元数据", {}).get("model", ""),
- "step2_model": step2_data.get("元数据", {}).get("model", "") if enable_step2 and 'step2_data' in locals() else None
- },
- "灵感": inspiration,
- "文件路径": {
- "step1": step1_file,
- "search": search_file if 'search_file' in locals() else None,
- "step2": step2_file
- },
- "关键指标": {
- "step1_top1_score": step1_score,
- "step1_top1_匹配要素": step1_element,
- "search_keyword": search_keyword if 'search_keyword' in locals() else None,
- "search_notes_count": search_notes_count if 'search_notes_count' in locals() else 0,
- "step2_增量词数量": step2_word_count,
- "step2_score": step2_score
- }
- }
- with open(summary_file, 'w', encoding='utf-8') as f:
- json.dump(summary, f, ensure_ascii=False, indent=2)
- else:
- summary_file = None
- print(f"{'=' * 80}")
- mode_desc = "仅搜索" if search_only else ("搜索并匹配" if search_and_match else "完整流程")
- print(f"{mode_desc}执行完成")
- print(f"{'=' * 80}")
- print(f"\n结果文件:")
- if not search_only and not search_and_match:
- print(f" Step1: {step1_file}")
- if 'search_file' in locals() and search_file:
- print(f" 搜索: {search_file}")
- if step4_file:
- print(f" Step4: {step4_file}")
- if step4_high_score_count is not None:
- print(f" (高匹配: {step4_high_score_count} 个, Top1 score: {step4_top1_score:.2f})")
- if enable_step2 and step2_file:
- print(f" Step2: {step2_file}")
- if summary_file:
- print(f" 汇总: {summary_file}")
- print()
- return {
- "step1_file": step1_file if not search_only and not search_and_match else None,
- "search_file": search_file if 'search_file' in locals() else None,
- "step4_file": step4_file,
- "step2_file": step2_file,
- "summary_file": summary_file,
- "status": "success"
- }
- async def main():
- """主函数"""
- # 解析命令行参数
- parser = argparse.ArgumentParser(
- description="灵感分析主流程 (Step1 + 搜索 + Step2)",
- formatter_class=argparse.RawDescriptionHelpFormatter,
- epilog="""
- 使用示例:
- # 处理第1个灵感(Step1 + 搜索,默认不执行 Step2)
- python run_inspiration_analysis.py --dir data/阿里多多酱/out/人设_1110 --count 1
- # 启用 Step2 完整流程(Step1 + 搜索 + Step2)
- python run_inspiration_analysis.py --count 1 --enable-step2
- # 随机处理5个灵感
- python run_inspiration_analysis.py --count 5 --shuffle
- # 按 Step1 分数排序,处理前10个高分灵感
- python run_inspiration_analysis.py --count 10 --sort-by-score
- # 仅搜索模式:基于已有 Step1 结果,按分数降序搜索前10个
- python run_inspiration_analysis.py --search-only --count 10
- # 搜索并匹配模式:基于已有 Step1 结果,执行搜索和 Step4 匹配
- python run_inspiration_analysis.py --search-and-match --count 10
- # 处理所有灵感,强制重新执行
- python run_inspiration_analysis.py --count all --force
- # 处理前10个灵感,step1只处理前20个任务
- python run_inspiration_analysis.py --count 10 --max-tasks 20
- """
- )
- parser.add_argument(
- "--dir",
- default="data/阿里多多酱/out/人设_1110",
- help="人设目录路径 (默认: data/阿里多多酱/out/人设_1110)"
- )
- parser.add_argument(
- "--count",
- default="1",
- help="处理的灵感数量,可以是数字或 'all' (默认: 1)"
- )
- parser.add_argument(
- "--max-tasks",
- type=str,
- default="all",
- help="Step1 处理的最大任务数,可以是数字或 'all' (默认: all)"
- )
- parser.add_argument(
- "--force",
- action="store_true",
- help="强制重新执行,覆盖已存在的文件"
- )
- parser.add_argument(
- "--shuffle",
- action="store_true",
- help="随机选择灵感,而不是按顺序"
- )
- parser.add_argument(
- "--sort-by-score",
- action="store_true",
- help="根据 Step1 结果分数排序(降序),优先处理高分灵感"
- )
- parser.add_argument(
- "--enable-step2",
- action="store_true",
- help="启用 Step2 增量词匹配(默认关闭)"
- )
- parser.add_argument(
- "--search-only",
- action="store_true",
- help="仅执行搜索(跳过 Step1 和 Step2,基于已有 Step1 结果,自动按分数降序)"
- )
- parser.add_argument(
- "--search-and-match",
- action="store_true",
- help="搜索并匹配模式(跳过 Step1 和 Step2,执行搜索和 Step4 匹配,自动按分数降序)"
- )
- args = parser.parse_args()
- persona_dir = args.dir
- force = args.force
- shuffle = args.shuffle
- sort_by_score = args.sort_by_score
- enable_step2 = args.enable_step2
- search_only = args.search_only
- search_and_match = args.search_and_match
- # 互斥检查
- if search_only and search_and_match:
- print("❌ 错误: --search-only 和 --search-and-match 不能同时使用")
- sys.exit(1)
- # search_only 和 search_and_match 模式自动启用分数排序
- if search_only or search_and_match:
- sort_by_score = True
- enable_step2 = False # 搜索模式下强制禁用 step2
- if shuffle:
- print("⚠️ 警告: --search-only 模式会自动按分数排序,忽略 --shuffle 参数")
- shuffle = False
- # 处理 max_tasks
- max_tasks = None if args.max_tasks == "all" else int(args.max_tasks)
- # 动态流程名称
- if search_only:
- workflow_name = "仅搜索"
- else:
- workflow_name = "Step1 + 搜索"
- if enable_step2:
- workflow_name += " + Step2"
- print(f"{'=' * 80}")
- print(f"灵感分析主流程 ({workflow_name})")
- print(f"{'=' * 80}")
- print(f"人设目录: {persona_dir}")
- # 加载灵感列表
- inspiration_list = load_inspiration_list(persona_dir)
- # 确定要处理的灵感数量
- if args.count == "all":
- inspiration_count = len(inspiration_list)
- print(f"处理灵感: 全部 ({inspiration_count} 个)")
- else:
- inspiration_count = int(args.count)
- print(f"处理灵感: 前 {inspiration_count} 个")
- if max_tasks:
- print(f"Step1 任务数限制: {max_tasks}")
- if search_only:
- print(f"搜索模式: 仅搜索(跳过 Step1 和 Step2)")
- print(f"分数排序: 根据已有 Step1 结果按分数降序处理")
- else:
- if force:
- print(f"强制模式: 重新执行所有步骤")
- if shuffle:
- print(f"随机模式: 随机选择灵感")
- if sort_by_score:
- print(f"分数排序: 根据 Step1 结果按分数降序处理")
- if enable_step2:
- print(f"Step2: 启用增量词匹配")
- else:
- print(f"Step2: 已关闭(使用 --enable-step2 启用)")
- # 选择要处理的灵感列表
- if sort_by_score:
- # 根据 Step1 结果分数排序
- sorted_list = sort_inspirations_by_score(persona_dir, inspiration_list, max_tasks)
- inspirations_to_process = sorted_list[:inspiration_count]
- elif shuffle:
- # 随机打乱灵感列表后选择
- shuffled_list = inspiration_list.copy()
- random.shuffle(shuffled_list)
- inspirations_to_process = shuffled_list[:inspiration_count]
- else:
- # 按顺序选择前 N 个
- inspirations_to_process = inspiration_list[:inspiration_count]
- print(f"\n将处理以下灵感:")
- for i, insp in enumerate(inspirations_to_process, 1):
- print(f" {i}. {insp}")
- # 批量执行流程
- results = []
- for i, inspiration in enumerate(inspirations_to_process, 1):
- print(f"\n{'#' * 80}")
- print(f"处理第 {i}/{len(inspirations_to_process)} 个灵感: {inspiration}")
- print(f"{'#' * 80}")
- # search_only 模式不创建 trace,search_and_match 需要 trace
- if search_only:
- result = await run_full_analysis(
- persona_dir=persona_dir,
- inspiration=inspiration,
- max_tasks=max_tasks,
- force=force,
- current_time=None,
- log_url=None,
- enable_step2=enable_step2,
- search_only=search_only,
- search_and_match=search_and_match
- )
- else:
- # 为每个灵感创建独立的 trace
- insp_time, insp_log_url = set_trace()
- with trace(f"灵感分析: {inspiration}"):
- result = await run_full_analysis(
- persona_dir=persona_dir,
- inspiration=inspiration,
- max_tasks=max_tasks,
- force=force,
- current_time=insp_time,
- log_url=insp_log_url,
- enable_step2=enable_step2,
- search_only=search_only,
- search_and_match=search_and_match
- )
- if insp_log_url:
- print(f"本次 Trace: {insp_log_url}")
- results.append(result)
- # 输出最终汇总
- print(f"\n{'=' * 80}")
- print(f"批量处理完成")
- print(f"{'=' * 80}")
- success_count = sum(1 for r in results if r["status"] == "success")
- print(f"\n成功: {success_count}/{len(results)}")
- for i, (insp, result) in enumerate(zip(inspirations_to_process, results), 1):
- status_icon = "✓" if result["status"] == "success" else "✗"
- print(f" {status_icon} [{i}] {insp}")
- if __name__ == "__main__":
- # 主流程不设置 trace,由每个灵感独立设置
- asyncio.run(main())
|