#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 第一步分析脚本 v2 基于过滤后的 how 解构结果,分析哪些点最有可能是创作者的第一步(创作起点), 以及第一步的前序节点(第一步是怎么来的)。 v2 新增功能: - 分析第一步的前序节点(已有人设 或 外部触发) - 提供搜索关键词(用于后续验证) - 严格约束:前序节点只能是已匹配的人设节点,搜索关键词只能来自节点名称 输入:intermediate/filtered_results/ 中的过滤结果 输出:第一步分析结果(带前序节点信息) """ import asyncio import json from pathlib import Path from typing import Dict, List import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from agents import Agent, Runner, ModelSettings, trace from agents.tracing.create import custom_span from lib.client import get_model from lib.my_trace import set_trace_smith as set_trace from script.data_processing.path_config import PathConfig # 模型配置 MODEL_NAME = "google/gemini-3-pro-preview" # MODEL_NAME = 'anthropic/claude-sonnet-4.5' agent = Agent( name="First Step Analyzer V2", model=get_model(MODEL_NAME), model_settings=ModelSettings( temperature=0.0, max_tokens=65536, ), tools=[], ) def extract_points_from_filtered_result(how_result: Dict) -> List[Dict]: """ 从过滤后的 how 解构结果中提取所有点的信息 Args: how_result: how解构结果 Returns: 点信息列表,不包含原始分类标签 """ points = [] for point_type in ["灵感点", "关键点", "目的点"]: point_list_key = f"{point_type}列表" point_list = how_result.get(point_list_key, []) for point in point_list: point_name = point.get("名称", "") point_desc = point.get("描述", "") # 检查是否有匹配到人设特征 has_match = False matched_feature = None similarity = None # 遍历 how 步骤列表中的特征 for step in point.get("how步骤列表", []): for feature in step.get("特征列表", []): match_results = feature.get("匹配结果", []) if match_results: # 如果有匹配结果(top1) has_match = True match = match_results[0] matched_feature = match.get("人设特征名称", "") similarity = match.get("匹配结果", {}).get("相似度", 0) break if has_match: break point_info = { "名称": point_name, "描述": point_desc, "是否匹配到已有人设": has_match } if has_match: point_info["匹配的人设特征"] = matched_feature point_info["相似度"] = similarity points.append(point_info) return points def build_prompt(points: List[Dict]) -> str: """ 构建分析 prompt (v2版本:增加前序节点分析) Args: points: 点信息列表 Returns: prompt 文本 """ # 构建点的描述文本 points_text = [] matched_points_names = [] # 收集已匹配的点名称 for i, point in enumerate(points, 1): text = f"{i}. {point['名称']}\n {point['描述']}" if point['是否匹配到已有人设']: text += f"\n [已匹配] 匹配到人设特征: {point['匹配的人设特征']} (相似度: {point['相似度']:.2f})" matched_points_names.append(point['名称']) else: text += "\n [未匹配] 未匹配到已有人设特征" points_text.append(text) points_section = "\n\n".join(points_text) matched_points_section = "\n".join([f"- {name}" for name in matched_points_names]) return f''' 以下是一个内容创作的解构结果。这些点已经被分析和分类,但这个分类是分析维度,不代表真实的创作顺序。 请判断:在这些点中,哪些最有可能是创作者的"第一步"(创作起点),以及第一步的前序节点(第一步是怎么来的)。 ## 判断标准 **起点特征**: - 最先触发创作、不依赖其他点的节点 - 可能是外部事件、时事热点、商业需求等 - 起点可能有多个 **前序节点规则**: - 每个第一步只有**一个**直接前序节点 - 客观分析哪个前序节点最有可能引发第一步,给出前序概率 - 前序节点只能是以下两种之一: 1. **已匹配的人设节点**:从下面的"已匹配的点"列表中选择 2. **外部触发**:纯外部事件触发 - 选择前序概率最高的那个,不要预设倾向 **搜索关键词规则**(重要!): - 如果前序是外部触发,必须提供搜索关键词 - 搜索关键词**只能**从点的名称中提取,不能推导、不能扩展、不能添加任何额外的词 ## 已匹配的点(可作为前序节点) {matched_points_section} ## 待分析的点 {points_section} ## 输出要求 以 JSON 格式输出: {{ "推理过程": "详细说明判断逻辑...", "第一步候选": [ {{ "点名称": "...", "第一步概率": 0.95, "推理依据": "...", "来源分析": "外部触发/人设延伸/商业驱动/其他", "前序节点": {{ "类型": "已有人设" 或 "外部触发", "人设节点名称": "..." 或 null, // 如果类型是"已有人设",必须从上面的"已匹配的点"中选择 "匹配的人设特征": "..." 或 null, // 如果有人设节点,填写其匹配的特征 "相似度": 0.84 或 null, // 人设节点与特征的匹配相似度 "前序概率": 0.75, // 前序节点引发第一步的概率(0-1之间) "搜索关键词": ["关键词1", "关键词2"] 或 null, // 只在"外部触发"时提供,且只能从点名称中提取 "推理": "说明为什么这个前序节点可能引发第一步" }} }} ] }} 注意: 1. 只输出最有可能是第一步的点(通常1-3个) 2. 按第一步概率降序排列 3. 客观分析前序节点,选择前序概率最高的(无论是已匹配人设还是外部触发) 4. 搜索关键词只能从点名称中提取,不能推导 5. 前序节点不需要能完全推导出第一步,只要可能引发创作者关注即可 '''.strip() async def analyze_post(post_data: Dict) -> Dict: """ 分析单个帖子 Args: post_data: 帖子数据(包含 how解构结果) Returns: 分析结果 """ post_id = post_data.get("帖子id", "") how_result = post_data.get("how解构结果", {}) # 提取所有点的信息 points = extract_points_from_filtered_result(how_result) if not points: return { "帖子id": post_id, "模型": MODEL_NAME, "输入": {"点列表": []}, "输出": None, "错误": "没有可分析的点" } # 构建 prompt prompt = build_prompt(points) # 使用 custom_span 标识单个帖子的分析流程 with custom_span( name=f"分析第一步 - 帖子 {post_id}", data={ "帖子id": post_id, "点数量": len(points), "模型": MODEL_NAME } ): # 调用 agent result = await Runner.run(agent, input=prompt) output = result.final_output # 解析 JSON try: if "```json" in output: json_start = output.find("```json") + 7 json_end = output.find("```", json_start) json_str = output[json_start:json_end].strip() elif "{" in output and "}" in output: json_start = output.find("{") json_end = output.rfind("}") + 1 json_str = output[json_start:json_end] else: json_str = output analysis_result = json.loads(json_str) return { "帖子id": post_id, "模型": MODEL_NAME, "输入": { "点列表": points, "prompt": prompt }, "输出": analysis_result } except Exception as e: return { "帖子id": post_id, "模型": MODEL_NAME, "输入": { "点列表": points, "prompt": prompt }, "输出": None, "错误": str(e), "原始输出": output } async def main(current_time: str = None, log_url: str = None): """主函数 Args: current_time: 当前时间戳(从外部传入) log_url: 日志链接(从外部传入) """ # 使用路径配置 config = PathConfig() # 确保输出目录存在 config.ensure_dirs() # 获取路径 input_dir = config.intermediate_dir / "filtered_results" output_dir = config.intermediate_dir / "first_step_analysis_v2" # 确保输出目录存在 output_dir.mkdir(parents=True, exist_ok=True) print(f"账号: {config.account_name}") print(f"输入目录: {input_dir}") print(f"输出目录: {output_dir}") print(f"使用模型: {MODEL_NAME}") if log_url: print(f"Trace URL: {log_url}") print() # 读取所有过滤后的文件 input_files = list(input_dir.glob("*_filtered.json")) if not input_files: print(f"错误: 在 {input_dir} 中没有找到任何 *_filtered.json 文件") return print(f"找到 {len(input_files)} 个文件待分析\n") # 批量分析 results = [] for i, input_file in enumerate(input_files, 1): print(f"[{i}/{len(input_files)}] 分析文件: {input_file.name}") # 读取文件 with open(input_file, "r", encoding="utf-8") as f: post_data = json.load(f) # 分析 result = await analyze_post(post_data) results.append(result) # 立即保存单个帖子的结果 post_id = result.get("帖子id", "unknown") single_output_file = output_dir / f"{post_id}_first_step.json" single_result = { "元数据": { "current_time": current_time, "log_url": log_url, "model": MODEL_NAME }, "帖子id": post_id, "分析结果": result } with open(single_output_file, "w", encoding="utf-8") as f: json.dump(single_result, f, ensure_ascii=False, indent=2) # 显示结果 output = result.get("输出", {}) if output: first_steps = output.get("第一步候选", []) print(f" 第一步候选:") for step in first_steps: print(f" - {step.get('点名称', 'N/A')} (概率: {step.get('第一步概率', 0):.2f})") print(f" ✓ 已保存: {single_output_file.name}") else: print(f" 分析失败: {result.get('错误', 'N/A')}") print() print(f"✓ 所有分析完成,结果已保存到: {output_dir}") if log_url: print(f"Trace: {log_url}") # 打印汇总 print("\n" + "=" * 80) print("分析汇总") print("=" * 80) for result in results: post_id = result["帖子id"] output = result.get("输出", {}) if output: first_steps = output.get("第一步候选", []) print(f"\n帖子 {post_id}:") for step in first_steps: print(f" - {step.get('点名称', 'N/A')} ({step.get('来源分析', 'N/A')}, 概率: {step.get('第一步概率', 0):.2f})") else: print(f"\n帖子 {post_id}: 分析失败") if __name__ == "__main__": # 设置 trace current_time, log_url = set_trace() # 使用 trace 上下文包裹整个执行流程 with trace("第一步分析"): asyncio.run(main(current_time, log_url))