#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 特征来源分析脚本 V2 基于过滤后的 how 解构结果,分析目标特征可能由哪些其他特征推导而来。 输入:intermediate/filtered_results/ 中的过滤结果 输出:特征来源分析结果 """ import asyncio import json from pathlib import Path from typing import Dict, List, Optional import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from agents import Agent, Runner, ModelSettings, trace from agents.tracing.create import custom_span from lib.client import get_model from lib.my_trace import set_trace_smith as set_trace from script.data_processing.path_config import PathConfig # 模型配置 MODEL_NAME = "google/gemini-3-pro-preview" # MODEL_NAME = 'anthropic/claude-sonnet-4.5' agent = Agent( name="Feature Origin Analyzer", model=get_model(MODEL_NAME), model_settings=ModelSettings( temperature=0.0, max_tokens=65536, ), tools=[], ) # ===== 数据提取函数 ===== def extract_post_info(how_result: Dict) -> Dict: """ 从 how 解构结果中提取帖子信息(灵感点、目的点、关键点列表) Args: how_result: how解构结果 Returns: 包含三类点列表的字典,每个点含名称、描述、特征列表 """ result = {} for point_type in ["灵感点", "目的点", "关键点"]: point_list_key = f"{point_type}列表" point_list = how_result.get(point_list_key, []) extracted_points = [] for point in point_list: # 提取特征名称列表 feature_names = [] for feature in point.get("特征列表", []): feature_name = feature.get("特征名称", "") if feature_name: feature_names.append(feature_name) extracted_points.append({ "名称": point.get("名称", ""), "描述": point.get("描述", ""), "特征列表": feature_names }) if extracted_points: result[point_list_key] = extracted_points return result def get_all_features(post_info: Dict) -> List[Dict]: """ 从帖子信息中提取所有特征(点+特征列表中的特征) Args: post_info: 帖子信息 Returns: 所有特征列表,包含名称和类型 """ features = [] for point_type in ["灵感点", "目的点", "关键点"]: point_list_key = f"{point_type}列表" for point in post_info.get(point_list_key, []): # 添加点本身作为特征 features.append({ "特征名称": point["名称"], "特征类型": point_type, "描述": point.get("描述", "") }) return features # ===== Prompt 构建 ===== def build_prompt(target_feature: str, post_info: Dict) -> str: """ 构建分析 prompt Args: target_feature: 目标关键特征名称 post_info: 帖子信息 Returns: prompt 文本 """ # 将帖子信息转为 JSON 格式 post_info_json = json.dumps(post_info, ensure_ascii=False, indent=4) return f'''你是一个内容创作逆向工程分析专家。你的任务是分析给定的特征是如何从其他特征中推理得出的。 请按照以下要求进行分析: ## 目标关键特征 {target_feature} ## 帖子信息 {post_info_json} ## 分析任务 将所有来源特征分为两类: ### 1. 单独推理 - 定义: 该特征单独存在时,可以独立推导出目标关键特征,无需其他特征辅助 ### 2. 组合推理 - 定义: 2个或更多特征必须同时存在才能有效推导出目标关键特征 ## 输出格式 使用JSON格式输出,结构如下: {{ "目标关键特征": "...", "推理类型分类": {{ "单独推理": [ {{ "排名": 1, "特征名称": "...", "特征类型": "灵感点/目的点/关键点", "可能性": 0.xx, "推理说明": "..." }} ], "组合推理": [ {{ "组合编号": 1, "组合成员": ["...", "..."], "成员类型": ["...", "..."], "可能性": 0.xx, "单独可能性": {{ "成员1": 0.xx, "成员2": 0.xx }}, "协同效应分析": {{ "单独平均值": 0.xx, "协同增益": 0.xx, "增益说明": "..." }}, "推理说明": "..." }} ] }} }} ## 注意事项 1. 可能性数值需要合理评估,范围在0-1之间 2. 单独推理按可能性从高到低排序 3. 组合推理必须包含2个或以上成员 4. 协同增益 = 组合可能性 - 单独平均值 5. 推理说明要清晰说明推导逻辑,避免空洞表述 6. 每个特征只能属于一种推理类型,不能既是单独推理又是组合推理的成员 7. 优先识别组合推理,剩余的特征作为单独推理 8. 一般先有实质,再有形式,如,先有角色,再有服化道;除非形式是关键特征 '''.strip() # ===== 主分析函数 ===== async def analyze_feature_origin( post_data: Dict, target_feature: str = None ) -> Dict: """ 分析单个帖子中目标特征的来源 Args: post_data: 帖子数据(包含 how解构结果) target_feature: 目标特征名称,如果为 None 则使用关键点的第一个 Returns: 分析结果 """ post_id = post_data.get("帖子id", "") how_result = post_data.get("how解构结果", {}) # 提取帖子信息 post_info = extract_post_info(how_result) if not post_info: return { "帖子id": post_id, "模型": MODEL_NAME, "输入": {"帖子信息": {}}, "输出": None, "错误": "没有可分析的点" } # 确定目标特征 if target_feature is None: # 默认使用关键点的第一个 key_points = post_info.get("关键点列表", []) if key_points: target_feature = key_points[0]["名称"] else: return { "帖子id": post_id, "模型": MODEL_NAME, "输入": {"帖子信息": post_info}, "输出": None, "错误": "没有找到关键点" } # 构建 prompt prompt = build_prompt(target_feature, post_info) # 使用 custom_span 标识分析流程 with custom_span( name=f"分析特征来源 - {target_feature}", data={ "帖子id": post_id, "目标特征": target_feature, "模型": MODEL_NAME } ): # 调用 agent result = await Runner.run(agent, input=prompt) output = result.final_output # 解析 JSON try: if "```json" in output: json_start = output.find("```json") + 7 json_end = output.find("```", json_start) json_str = output[json_start:json_end].strip() elif "{" in output and "}" in output: json_start = output.find("{") json_end = output.rfind("}") + 1 json_str = output[json_start:json_end] else: json_str = output analysis_result = json.loads(json_str) return { "帖子id": post_id, "目标特征": target_feature, "模型": MODEL_NAME, "输入": { "帖子信息": post_info, "prompt": prompt }, "输出": analysis_result } except Exception as e: return { "帖子id": post_id, "目标特征": target_feature, "模型": MODEL_NAME, "输入": { "帖子信息": post_info, "prompt": prompt }, "输出": None, "错误": str(e), "原始输出": output } # ===== 主函数 ===== async def main( post_id: str = None, target_feature: str = None, current_time: str = None, log_url: str = None ): """ 主函数 Args: post_id: 帖子ID,可选(默认使用第一个) target_feature: 目标特征名称,可选(默认使用关键点第一个) current_time: 当前时间戳(从外部传入) log_url: 日志链接(从外部传入) """ config = PathConfig() # 获取输入目录 input_dir = config.intermediate_dir / "filtered_results" output_dir = config.intermediate_dir / "feature_origin_analysis" output_dir.mkdir(parents=True, exist_ok=True) print(f"账号: {config.account_name}") print(f"输入目录: {input_dir}") print(f"输出目录: {output_dir}") print(f"使用模型: {MODEL_NAME}") if log_url: print(f"Trace URL: {log_url}") print() # 获取输入文件 input_files = sorted(input_dir.glob("*_filtered.json")) if not input_files: print(f"错误: 在 {input_dir} 中没有找到任何 *_filtered.json 文件") return # 选择帖子 if post_id: target_file = next( (f for f in input_files if post_id in f.name), None ) if not target_file: print(f"错误: 未找到帖子 {post_id}") return else: target_file = input_files[0] # 默认第一个 # 读取文件 with open(target_file, "r", encoding="utf-8") as f: post_data = json.load(f) actual_post_id = post_data.get("帖子id", "unknown") print(f"帖子ID: {actual_post_id}") print(f"目标特征: {target_feature or '(默认关键点第一个)'}") print() # 分析 result = await analyze_feature_origin(post_data, target_feature) # 显示结果 output = result.get("输出") if output: print("=" * 60) print("分析结果") print("=" * 60) print(f"\n目标关键特征: {output.get('目标关键特征', 'N/A')}\n") reasoning = output.get("推理类型分类", {}) # 显示单独推理 single = reasoning.get("单独推理", []) if single: print("【单独推理】") for item in single: print(f" #{item.get('排名', '-')} [{item.get('可能性', 0):.2f}] {item.get('特征名称', '')} ({item.get('特征类型', '')})") print(f" {item.get('推理说明', '')}") # 显示组合推理 combo = reasoning.get("组合推理", []) if combo: print("\n【组合推理】") for item in combo: members = " + ".join(item.get("组合成员", [])) prob = item.get("可能性", 0) synergy = item.get("协同效应分析", {}) gain = synergy.get("协同增益", 0) print(f" 组合{item.get('组合编号', '-')}: [{prob:.2f}] {members}") print(f" 协同增益: {gain:+.2f}") print(f" {item.get('推理说明', '')}") else: print(f"分析失败: {result.get('错误', 'N/A')}") # 保存结果 target_name = result.get("目标特征", "unknown") output_file = output_dir / f"{actual_post_id}_{target_name}_来源分析.json" save_data = { "元数据": { "current_time": current_time, "log_url": log_url, "model": MODEL_NAME }, **result } with open(output_file, "w", encoding="utf-8") as f: json.dump(save_data, f, ensure_ascii=False, indent=2) print(f"\n结果已保存到: {output_file}") if log_url: print(f"Trace: {log_url}") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="分析特征来源 V2") parser.add_argument("--post-id", type=str, help="帖子ID") parser.add_argument("--target", type=str, help="目标特征名称") args = parser.parse_args() # 设置 trace current_time, log_url = set_trace() # 使用 trace 上下文包裹整个执行流程 with trace("特征来源分析V2"): asyncio.run(main( post_id=args.post_id, target_feature=args.target, current_time=current_time, log_url=log_url ))