#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 节点来源分析脚本 给定一个目标节点,推断它可能由哪些候选节点推导而来。 输入:post_graph 目录中的帖子图谱文件 输出:节点来源分析结果 """ import asyncio import json from pathlib import Path from typing import Dict, List, Optional, TypedDict import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from agents import Agent, Runner, ModelSettings, trace from agents.tracing.create import custom_span from lib.client import get_model from lib.my_trace import set_trace_smith as set_trace from script.data_processing.path_config import PathConfig # 模型配置 MODEL_NAME = "google/gemini-3-pro-preview" # MODEL_NAME = 'deepseek/deepseek-v3.2' # MODEL_NAME = 'anthropic/claude-sonnet-4.5' agent = Agent( name="Node Origin Analyzer", model=get_model(MODEL_NAME), model_settings=ModelSettings( temperature=0.0, max_tokens=65536, ), tools=[], ) # ===== 类型定义 ===== class NodeInfo(TypedDict): 名称: str 描述: str class EdgeInfo(TypedDict): from_node: str to_node: str 关系: Optional[str] 概率: Optional[float] class AnalyzeInput(TypedDict): 目标节点: NodeInfo 候选节点: List[NodeInfo] 边关系: List[EdgeInfo] class OriginPossibility(TypedDict): 来源节点: List[str] 概率: float 推理依据: str class AnalyzeOutput(TypedDict): 推理过程: str 来源可能性: List[OriginPossibility] # ===== 数据提取函数 ===== def get_post_graph_files(config: PathConfig) -> List[Path]: """获取所有帖子图谱文件""" post_graph_dir = config.intermediate_dir / "post_graph" return sorted(post_graph_dir.glob("*_帖子图谱.json")) def load_post_graph(file_path: Path) -> Dict: """加载帖子图谱""" with open(file_path, "r", encoding="utf-8") as f: return json.load(f) def extract_tags_from_post_graph(post_graph: Dict) -> List[Dict]: """ 从帖子图谱中提取标签节点 筛选条件:type === "标签" 且 domain === "帖子" Returns: 标签节点列表 """ tags = [] for node_id, node in post_graph.get("nodes", {}).items(): if node.get("type") == "标签" and node.get("domain") == "帖子": tags.append({ "id": node_id, "name": node.get("name", ""), "dimension": node.get("dimension", ""), "description": node.get("detail", {}).get("description", ""), "pointNames": node.get("detail", {}).get("pointNames", []), }) return tags def prepare_analyze_input( post_graph: Dict, target_name: str = None ) -> AnalyzeInput: """ 准备分析输入数据 Args: post_graph: 帖子图谱数据 target_name: 目标节点名称,如果为 None 则使用关键点标签的第一个 Returns: AnalyzeInput 数据结构 """ # 提取所有标签节点 tags = extract_tags_from_post_graph(post_graph) if not tags: raise ValueError("帖子图谱中没有找到标签节点") # 确定目标节点 if target_name: target_tag = next((t for t in tags if t["name"] == target_name), None) if not target_tag: raise ValueError(f"未找到目标节点: {target_name}") else: # 默认使用关键点标签的第一个 key_point_tags = [t for t in tags if t["dimension"] == "关键点"] if not key_point_tags: raise ValueError("没有找到关键点标签") target_tag = key_point_tags[0] # 候选节点(排除目标节点) candidate_tags = [t for t in tags if t["name"] != target_tag["name"]] # 构建输入(包含特征类型信息) return { "目标特征": { "特征名称": target_tag["name"], "特征类型": target_tag["dimension"] }, "候选特征": [ { "特征名称": t["name"], "特征类型": t["dimension"] } for t in candidate_tags ], "边关系": [] # 暂时为空 } # ===== Prompt 构建 ===== def build_prompt(input_data: Dict, edges: List[EdgeInfo] = None) -> str: """ 构建分析 prompt Args: input_data: 分析输入数据(包含目标节点和候选节点,都带维度信息) edges: 边关系列表 Returns: prompt 文本 """ target = input_data["目标特征"] candidates = input_data["候选特征"] edges = edges or [] # 构建候选特征列表 candidates_text = [] for c in candidates: candidates_text.append(f"- {c['特征名称']} ({c['特征类型']})") candidates_section = "\n".join(candidates_text) # 构建边关系文本 if edges: edges_text = [] for e in edges: edge_str = f"- {e['from_node']} → {e['to_node']}" if e.get("关系"): edge_str += f":{e['关系']}" if e.get("概率") is not None: edge_str += f"(概率: {e['概率']:.2f})" edges_text.append(edge_str) edges_section = "\n".join(edges_text) else: edges_section = "(暂无已知关系)" return f'''你是一个内容创作逆向工程分析专家。你的任务是分析给定的目标特征可能由哪些候选特征推导而来。 ## 目标关键特征 {target['特征名称']} ({target['特征类型']}) ## 候选特征 {candidates_section} ## 已知特征关系(仅供参考) {edges_section} ## 第一步:固有资产判定 首先判断目标特征是否为**固有资产/前提条件**: **固有资产的特征**: - 账号的基础设定(如:萌宠账号的"猫咪主角"、美食博主的"厨艺技能") - 创作者本身拥有的资源(如:有猫、会画画、有专业知识) - 不是针对某个内容"选择"的,而是账号/创作者的固有属性 **判定方法**: 问自己:这个特征是"创作者选择加入的"还是"创作者本来就有的"? - 如果是"本来就有的" → 固有资产,不需要从其他特征推导 - 如果是"选择加入的" → 可推导特征,继续分析 **重要**:如果目标特征是固有资产,应该返回空的来源列表,并说明原因。 ## 第二步:因果方向检验(仅当目标特征非固有资产时) 在判断"候选特征 A 是否能推导出目标特征 T"之前,必须进行因果方向检验: 1. **正向概率 P(A→T)**:假设 A 存在,推导出 T 的概率 2. **反向概率 P(T→A)**:假设 T 存在,推导出 A 的概率 **判定规则**: - 只有当 P(A→T) > P(T→A) 时,A 才能作为 T 的来源特征 - 如果 P(T→A) >= P(A→T),说明 A 更可能是 T 的结果/表现形式 **警惕"利用关系"伪装成"因果关系"**: - 错误:因为要"提供情绪价值",所以选择了"猫咪主角" - 正确:因为已有"猫咪主角"(固有资产),所以用它来"提供情绪价值" - 区别:"提供情绪价值"是对猫咪的利用方式,不是选择猫咪的原因 ## 输出格式 使用JSON格式输出,结构如下: {{ "目标关键特征": "...", "固有资产判定": {{ "是否固有资产": true/false, "判定理由": "..." }}, "推理类型分类": {{ "单独推理": [ {{ "排名": 1, "特征名称": "...", "特征类型": "灵感点/目的点/关键点", "正向概率": 0.xx, "反向概率": 0.xx, "可能性": 0.xx, "推理说明": "..." }} ], "组合推理": [ {{ "组合编号": 1, "组合成员": ["...", "..."], "成员类型": ["...", "..."], "正向概率": 0.xx, "反向概率": 0.xx, "可能性": 0.xx, "单独可能性": {{ "成员1": 0.xx, "成员2": 0.xx }}, "协同效应分析": {{ "单独平均值": 0.xx, "协同增益": 0.xx, "增益说明": "..." }}, "推理说明": "..." }} ], "排除特征": [ {{ "特征名称": "...", "特征类型": "...", "正向概率": 0.xx, "反向概率": 0.xx, "排除原因": "..." }} ] }} }} **注意**:如果目标特征是固有资产,"单独推理"和"组合推理"应为空数组,所有候选特征都应放入"排除特征"。 ## 注意事项 1. **固有资产优先判定**:先判断目标特征是否为固有资产 2. **警惕利用关系**:目的点对关键点的"利用"不等于"推导" 3. 可能性数值需要合理评估,范围在0-1之间 4. 单独推理按可能性从高到低排序 5. 组合推理必须包含2个或以上成员 6. 推理说明要清晰说明推导逻辑 '''.strip() # ===== 主分析函数 ===== async def analyze_node_origin( post_id: str = None, target_name: str = None, config: PathConfig = None ) -> Dict: """ 分析目标节点可能由哪些候选节点推导而来 Args: post_id: 帖子ID,默认使用第一个帖子 target_name: 目标节点名称,默认使用关键点标签的第一个 config: 路径配置,如果为 None 则创建默认配置 Returns: 分析结果 """ if config is None: config = PathConfig() # 获取帖子图谱文件 post_graph_files = get_post_graph_files(config) if not post_graph_files: raise ValueError("没有找到帖子图谱文件") # 选择帖子 if post_id: target_file = next( (f for f in post_graph_files if post_id in f.name), None ) if not target_file: raise ValueError(f"未找到帖子: {post_id}") else: target_file = post_graph_files[0] # 默认第一个 # 加载帖子图谱 post_graph = load_post_graph(target_file) actual_post_id = post_graph.get("meta", {}).get("postId", "unknown") # 准备输入数据 input_data = prepare_analyze_input(post_graph, target_name) actual_target_name = input_data["目标特征"]["特征名称"] # 构建 prompt prompt = build_prompt(input_data, input_data.get("边关系", [])) print(f"帖子ID: {actual_post_id}") print(f"目标特征: {actual_target_name}") print(f"候选特征数: {len(input_data['候选特征'])}") print() # 调试:打印 prompt(取消注释以启用) # print("=" * 40) # print("Prompt 预览:") # print(prompt[:2000]) # print("...") # print("=" * 40) # 使用 custom_span 标识分析流程 with custom_span( name=f"分析特征来源 - {actual_target_name}", data={ "帖子id": actual_post_id, "目标特征": actual_target_name, "候选特征数": len(input_data["候选特征"]), "模型": MODEL_NAME } ): # 调用 agent result = await Runner.run(agent, input=prompt) output = result.final_output # 解析 JSON try: if "```json" in output: json_start = output.find("```json") + 7 json_end = output.find("```", json_start) json_str = output[json_start:json_end].strip() elif "{" in output and "}" in output: json_start = output.find("{") json_end = output.rfind("}") + 1 json_str = output[json_start:json_end] else: json_str = output analysis_result = json.loads(json_str) return { "帖子id": actual_post_id, "目标节点": actual_target_name, "模型": MODEL_NAME, "输入": input_data, "输出": analysis_result } except Exception as e: return { "帖子id": actual_post_id, "目标节点": actual_target_name, "模型": MODEL_NAME, "输入": input_data, "输出": None, "错误": str(e), "原始输出": output } # ===== 图谱构建函数 ===== def build_origin_graph(all_results: List[Dict], post_id: str) -> Dict: """ 将分析结果转换为图谱格式 Args: all_results: 所有目标特征的分析结果 post_id: 帖子ID Returns: 图谱数据,包含 nodes 和 edges """ nodes = {} edges = {} # 从输入收集所有特征节点(不添加额外信息) for result in all_results: target_input = result.get("输入", {}) # 添加目标节点 target_info = target_input.get("目标特征", {}) target_name = target_info.get("特征名称", "") target_type = target_info.get("特征类型", "关键点") node_id = f"帖子:{target_type}:标签:{target_name}" if node_id not in nodes: nodes[node_id] = { "name": target_name, "type": "标签", "dimension": target_type, "domain": "帖子", "detail": {} } # 添加候选特征节点 for candidate in target_input.get("候选特征", []): c_name = candidate.get("特征名称", "") c_type = candidate.get("特征类型", "关键点") c_node_id = f"帖子:{c_type}:标签:{c_name}" if c_node_id not in nodes: nodes[c_node_id] = { "name": c_name, "type": "标签", "dimension": c_type, "domain": "帖子", "detail": {} } # 构建推导边 for result in all_results: target_name = result.get("目标特征", "") target_input = result.get("输入", {}) target_info = target_input.get("目标特征", {}) target_type = target_info.get("特征类型", "关键点") target_node_id = f"帖子:{target_type}:标签:{target_name}" reasoning = result.get("推理类型分类", {}) # 单独推理的边 for item in reasoning.get("单独推理", []): source_name = item.get("特征名称", "") source_type = item.get("特征类型", "关键点") source_node_id = f"帖子:{source_type}:标签:{source_name}" probability = item.get("可能性", 0) edge_id = f"{source_node_id}|推导|{target_node_id}" edges[edge_id] = { "source": source_node_id, "target": target_node_id, "type": "推导", "score": probability, "detail": { "推理类型": "单独推理", "正向概率": item.get("正向概率", 0), "反向概率": item.get("反向概率", 0), "推理说明": item.get("推理说明", "") } } # 组合推理的边(用虚拟节点表示组合) for item in reasoning.get("组合推理", []): members = item.get("组合成员", []) member_types = item.get("成员类型", []) probability = item.get("可能性", 0) # 创建组合虚拟节点(排序成员以保证唯一性) # 将成员和类型配对后排序 member_pairs = list(zip(members, member_types)) if len(member_types) == len(members) else [(m, "关键点") for m in members] sorted_pairs = sorted(member_pairs, key=lambda x: x[0]) sorted_members = [p[0] for p in sorted_pairs] sorted_types = [p[1] for p in sorted_pairs] # 组合名称和ID包含类型信息 combo_parts = [f"{sorted_types[i]}:{m}" for i, m in enumerate(sorted_members)] combo_name = " + ".join(combo_parts) combo_node_id = f"帖子:组合:组合:{combo_name}" if combo_node_id not in nodes: nodes[combo_node_id] = { "name": combo_name, "type": "组合", "dimension": "组合", "domain": "帖子", "detail": { "成员": sorted_members, "成员类型": sorted_types } } # 组合节点到目标的边 edge_id = f"{combo_node_id}|推导|{target_node_id}" edges[edge_id] = { "source": combo_node_id, "target": target_node_id, "type": "推导", "score": probability, "detail": { "推理类型": "组合推理", "正向概率": item.get("正向概率", 0), "反向概率": item.get("反向概率", 0), "协同增益": item.get("协同效应分析", {}).get("协同增益", 0), "推理说明": item.get("推理说明", "") } } # 成员到组合节点的边 for i, member in enumerate(sorted_members): m_type = sorted_types[i] m_node_id = f"帖子:{m_type}:标签:{member}" m_edge_id = f"{m_node_id}|组成|{combo_node_id}" if m_edge_id not in edges: edges[m_edge_id] = { "source": m_node_id, "target": combo_node_id, "type": "组成", "score": 1.0, "detail": {} } return { "meta": { "postId": post_id, "type": "推导图谱", "stats": { "nodeCount": len(nodes), "edgeCount": len(edges) } }, "nodes": nodes, "edges": edges } # ===== 辅助函数 ===== def get_all_target_names(post_graph: Dict) -> List[str]: """获取所有可作为目标的特征名称(关键点标签)""" tags = extract_tags_from_post_graph(post_graph) # 返回所有关键点标签的名称 return [t["name"] for t in tags if t["dimension"] == "关键点"] def display_result(result: Dict): """显示单个分析结果""" output = result.get("输出") if output: print(f"\n目标关键特征: {output.get('目标关键特征', 'N/A')}") # 固有资产判定 asset_check = output.get("固有资产判定", {}) if asset_check.get("是否固有资产"): print(f" → 固有资产: {asset_check.get('判定理由', '')[:60]}...") else: reasoning = output.get("推理类型分类", {}) # 显示单独推理 single = reasoning.get("单独推理", []) if single: print(" 【单独推理】") for item in single[:3]: # 只显示前3个 print(f" [{item.get('可能性', 0):.2f}] {item.get('特征名称', '')}") # 显示组合推理 combo = reasoning.get("组合推理", []) if combo: print(" 【组合推理】") for item in combo[:2]: # 只显示前2个 members = " + ".join(item.get("组合成员", [])) print(f" [{item.get('可能性', 0):.2f}] {members}") else: print(f" 分析失败: {result.get('错误', 'N/A')}") # ===== 主函数 ===== async def main( post_id: str = None, target_name: str = None, num_targets: int = 1, current_time: str = None, log_url: str = None ): """ 主函数 Args: post_id: 帖子ID,可选 target_name: 目标节点名称,可选(如果指定则只分析这一个) num_targets: 要分析的目标特征数量(当 target_name 为空时生效) current_time: 当前时间戳(从外部传入) log_url: 日志链接(从外部传入) """ config = PathConfig() print(f"账号: {config.account_name}") print(f"使用模型: {MODEL_NAME}") if log_url: print(f"Trace URL: {log_url}") print() # 获取帖子图谱文件 post_graph_files = get_post_graph_files(config) if not post_graph_files: print("错误: 没有找到帖子图谱文件") return # 选择帖子 if post_id: target_file = next( (f for f in post_graph_files if post_id in f.name), None ) if not target_file: print(f"错误: 未找到帖子 {post_id}") return else: target_file = post_graph_files[0] # 加载帖子图谱 post_graph = load_post_graph(target_file) actual_post_id = post_graph.get("meta", {}).get("postId", "unknown") print(f"帖子ID: {actual_post_id}") # 确定要分析的目标特征列表 if target_name: target_names = [target_name] else: all_targets = get_all_target_names(post_graph) target_names = all_targets[:num_targets] print(f"待分析目标特征: {target_names}") print("=" * 60) # 输出目录 output_dir = config.intermediate_dir / "node_origin_analysis" output_dir.mkdir(parents=True, exist_ok=True) # 并发分析所有目标特征 async def analyze_single(name: str, index: int): print(f"\n[{index}/{len(target_names)}] 开始分析: {name}") result = await analyze_node_origin( post_id=post_id, target_name=name, config=config ) print(f"[{index}/{len(target_names)}] 完成: {name}") display_result(result) return { "目标特征": result.get("目标节点"), "固有资产判定": result.get("输出", {}).get("固有资产判定", {}), "推理类型分类": result.get("输出", {}).get("推理类型分类", {}), "输入": result.get("输入"), "错误": result.get("错误") } # 创建并发任务 tasks = [ analyze_single(name, i) for i, name in enumerate(target_names, 1) ] # 并发执行 all_results = await asyncio.gather(*tasks) # 合并保存到一个文件 merged_output = { "元数据": { "current_time": current_time, "log_url": log_url, "model": MODEL_NAME }, "帖子id": actual_post_id, "分析结果列表": all_results } output_file = output_dir / f"{actual_post_id}_来源分析.json" with open(output_file, "w", encoding="utf-8") as f: json.dump(merged_output, f, ensure_ascii=False, indent=2) # 生成推导关系图谱 graph_output = build_origin_graph(all_results, actual_post_id) graph_file = output_dir / f"{actual_post_id}_推导图谱.json" with open(graph_file, "w", encoding="utf-8") as f: json.dump(graph_output, f, ensure_ascii=False, indent=2) print("\n" + "=" * 60) print(f"完成! 共分析 {len(target_names)} 个目标特征") print(f"分析结果: {output_file}") print(f"推导图谱: {graph_file}") if log_url: print(f"Trace: {log_url}") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="分析节点来源") parser.add_argument("--post-id", type=str, help="帖子ID") parser.add_argument("--target", type=str, help="目标节点名称(指定则只分析这一个)") parser.add_argument("--num", type=int, default=1, help="要分析的目标特征数量(默认1)") parser.add_argument("--all", action="store_true", help="分析所有关键点") args = parser.parse_args() # 如果指定了 --all,则设置 num 为一个很大的数 if args.all: args.num = 999 # 设置 trace current_time, log_url = set_trace() # 使用 trace 上下文包裹整个执行流程 with trace("节点来源分析"): asyncio.run(main( post_id=args.post_id, target_name=args.target, num_targets=args.num, current_time=current_time, log_url=log_url ))