#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 第一步分析脚本 基于过滤后的 how 解构结果,分析哪些点最有可能是创作者的第一步(创作起点)。 输入:intermediate/filtered_results/ 中的过滤结果 输出:第一步分析结果 """ import asyncio import json from pathlib import Path from typing import Dict, List import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from agents import Agent, Runner, ModelSettings, trace from agents.tracing.create import custom_span from lib.client import get_model from lib.my_trace import set_trace_smith as set_trace from script.data_processing.path_config import PathConfig # 模型配置 MODEL_NAME = "google/gemini-3-pro-preview" # MODEL_NAME = 'anthropic/claude-sonnet-4.5' agent = Agent( name="First Step Analyzer", model=get_model(MODEL_NAME), model_settings=ModelSettings( temperature=0.0, max_tokens=65536, ), tools=[], ) def extract_points_from_filtered_result(how_result: Dict) -> List[Dict]: """ 从过滤后的 how 解构结果中提取所有点的信息 Args: how_result: how解构结果 Returns: 点信息列表,不包含原始分类标签 """ points = [] for point_type in ["灵感点", "关键点", "目的点"]: point_list_key = f"{point_type}列表" point_list = how_result.get(point_list_key, []) for point in point_list: point_name = point.get("名称", "") point_desc = point.get("描述", "") # 检查是否有匹配到人设特征 has_match = False matched_feature = None similarity = None # 遍历 how 步骤列表中的特征 for step in point.get("how步骤列表", []): for feature in step.get("特征列表", []): match_results = feature.get("匹配结果", []) if match_results: # 如果有匹配结果(top1) has_match = True match = match_results[0] matched_feature = match.get("人设特征名称", "") similarity = match.get("匹配结果", {}).get("相似度", 0) break if has_match: break point_info = { "名称": point_name, "描述": point_desc, "是否匹配到已有人设": has_match } if has_match: point_info["匹配的人设特征"] = matched_feature point_info["相似度"] = similarity points.append(point_info) return points def build_prompt(points: List[Dict]) -> str: """ 构建分析 prompt Args: points: 点信息列表 Returns: prompt 文本 """ # 构建点的描述文本 points_text = [] for i, point in enumerate(points, 1): text = f"{i}. {point['名称']}\n {point['描述']}" if point['是否匹配到已有人设']: text += f"\n [已匹配] 匹配到人设特征: {point['匹配的人设特征']} (相似度: {point['相似度']:.2f})" else: text += "\n [未匹配] 未匹配到已有人设特征" points_text.append(text) points_section = "\n\n".join(points_text) return f''' 以下是一个内容创作的解构结果。这些点已经被分析和分类,但这个分类是分析维度,不代表真实的创作顺序。 请判断:在这些点中,哪些最有可能是创作者的"第一步"(创作起点)? ## 判断标准 **起点特征**: - 最先触发创作、不依赖其他点的节点 - 可能是外部事件、时事热点、商业需求等 - 起点可能有多个 **参考信息**: - **已匹配到人设的点**:来源于创作者已有的人设/风格/习惯 - **未匹配的点**:可能来自外部触发、人设推导、或新尝试 ## 待分析的点 {points_section} ## 输出要求 以 JSON 格式输出: {{ "推理过程": "详细说明判断逻辑...", "第一步候选": [ {{ "点名称": "...", "第一步概率": 0.95, // 0-1之间的数值 "推理依据": "...", "来源分析": "外部触发/人设延伸/商业驱动/其他" }} ] }} 注意: 1. 只输出最有可能是第一步的点(通常1-3个) 2. 按第一步概率降序排列 3. 不要被点的呈现顺序影响判断 '''.strip() async def analyze_post(post_data: Dict) -> Dict: """ 分析单个帖子 Args: post_data: 帖子数据(包含 how解构结果) Returns: 分析结果 """ post_id = post_data.get("帖子id", "") how_result = post_data.get("how解构结果", {}) # 提取所有点的信息 points = extract_points_from_filtered_result(how_result) if not points: return { "帖子id": post_id, "模型": MODEL_NAME, "输入": {"点列表": []}, "输出": None, "错误": "没有可分析的点" } # 构建 prompt prompt = build_prompt(points) # 使用 custom_span 标识单个帖子的分析流程 with custom_span( name=f"分析第一步 - 帖子 {post_id}", data={ "帖子id": post_id, "点数量": len(points), "模型": MODEL_NAME } ): # 调用 agent result = await Runner.run(agent, input=prompt) output = result.final_output # 解析 JSON try: if "```json" in output: json_start = output.find("```json") + 7 json_end = output.find("```", json_start) json_str = output[json_start:json_end].strip() elif "{" in output and "}" in output: json_start = output.find("{") json_end = output.rfind("}") + 1 json_str = output[json_start:json_end] else: json_str = output analysis_result = json.loads(json_str) return { "帖子id": post_id, "模型": MODEL_NAME, "输入": { "点列表": points, "prompt": prompt }, "输出": analysis_result } except Exception as e: return { "帖子id": post_id, "模型": MODEL_NAME, "输入": { "点列表": points, "prompt": prompt }, "输出": None, "错误": str(e), "原始输出": output } async def main(current_time: str = None, log_url: str = None): """主函数 Args: current_time: 当前时间戳(从外部传入) log_url: 日志链接(从外部传入) """ # 使用路径配置 config = PathConfig() # 确保输出目录存在 config.ensure_dirs() # 获取路径 input_dir = config.intermediate_dir / "filtered_results" output_dir = config.intermediate_dir / "first_step_analysis" # 确保输出目录存在 output_dir.mkdir(parents=True, exist_ok=True) print(f"账号: {config.account_name}") print(f"输入目录: {input_dir}") print(f"输出目录: {output_dir}") print(f"使用模型: {MODEL_NAME}") if log_url: print(f"Trace URL: {log_url}") print() # 读取所有过滤后的文件 input_files = list(input_dir.glob("*_filtered.json")) if not input_files: print(f"错误: 在 {input_dir} 中没有找到任何 *_filtered.json 文件") return print(f"找到 {len(input_files)} 个文件待分析\n") # 批量分析 results = [] for i, input_file in enumerate(input_files, 1): print(f"[{i}/{len(input_files)}] 分析文件: {input_file.name}") # 读取文件 with open(input_file, "r", encoding="utf-8") as f: post_data = json.load(f) # 分析 result = await analyze_post(post_data) results.append(result) # 立即保存单个帖子的结果 post_id = result.get("帖子id", "unknown") single_output_file = output_dir / f"{post_id}_first_step.json" single_result = { "元数据": { "current_time": current_time, "log_url": log_url, "model": MODEL_NAME }, "帖子id": post_id, "分析结果": result } with open(single_output_file, "w", encoding="utf-8") as f: json.dump(single_result, f, ensure_ascii=False, indent=2) # 显示结果 output = result.get("输出", {}) if output: first_steps = output.get("第一步候选", []) print(f" 第一步候选:") for step in first_steps: print(f" - {step.get('点名称', 'N/A')} (概率: {step.get('第一步概率', 0):.2f})") print(f" ✓ 已保存: {single_output_file.name}") else: print(f" 分析失败: {result.get('错误', 'N/A')}") print() print(f"✓ 所有分析完成,结果已保存到: {output_dir}") if log_url: print(f"Trace: {log_url}") # 打印汇总 print("\n" + "=" * 80) print("分析汇总") print("=" * 80) for result in results: post_id = result["帖子id"] output = result.get("输出", {}) if output: first_steps = output.get("第一步候选", []) print(f"\n帖子 {post_id}:") for step in first_steps: print(f" - {step.get('点名称', 'N/A')} ({step.get('来源分析', 'N/A')}, 概率: {step.get('第一步概率', 0):.2f})") else: print(f"\n帖子 {post_id}: 分析失败") if __name__ == "__main__": # 设置 trace current_time, log_url = set_trace() # 使用 trace 上下文包裹整个执行流程 with trace("第一步分析"): asyncio.run(main(current_time, log_url))