#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 节点来源分析脚本 V4 采用两步法: 1. 第一步(筛选):筛选可能的来源特征 2. 第二步(评估):对筛选出的特征进行可能性评估 输入:post_graph 目录中的帖子图谱文件 输出:节点来源分析结果 """ import asyncio import json from pathlib import Path from typing import Dict, List import sys # 添加项目根目录到路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from agents import Agent, Runner, ModelSettings, trace from agents.tracing.create import custom_span from lib.client import get_model from lib.my_trace import set_trace_smith as set_trace from script.data_processing.path_config import PathConfig # 模型配置 MODEL_NAME = "google/gemini-3-pro-preview" # MODEL_NAME = 'deepseek/deepseek-v3.2' # MODEL_NAME = 'anthropic/claude-sonnet-4.5' # 第一步筛选 Agent filter_agent = Agent( name="Feature Filter", model=get_model(MODEL_NAME), model_settings=ModelSettings( temperature=0.0, max_tokens=16384, ), tools=[], ) # 第二步评估 Agent evaluate_agent = Agent( name="Feature Evaluator", model=get_model(MODEL_NAME), model_settings=ModelSettings( temperature=0.0, max_tokens=32768, ), tools=[], ) # ===== 数据提取函数 ===== def get_post_graph_files(config: PathConfig) -> List[Path]: """获取所有帖子图谱文件""" post_graph_dir = config.intermediate_dir / "post_graph" return sorted(post_graph_dir.glob("*_帖子图谱.json")) def load_post_graph(file_path: Path) -> Dict: """加载帖子图谱""" with open(file_path, "r", encoding="utf-8") as f: return json.load(f) def extract_tags_from_post_graph(post_graph: Dict) -> List[Dict]: """从帖子图谱中提取标签节点""" tags = [] for node_id, node in post_graph.get("nodes", {}).items(): if node.get("type") == "标签" and node.get("domain") == "帖子": tags.append({ "id": node_id, "name": node.get("name", ""), "dimension": node.get("dimension", ""), }) return tags def prepare_analyze_input(post_graph: Dict, target_name: str = None) -> Dict: """准备分析输入数据""" tags = extract_tags_from_post_graph(post_graph) if not tags: raise ValueError("帖子图谱中没有找到标签节点") # 确定目标节点 if target_name: target_tag = next((t for t in tags if t["name"] == target_name), None) if not target_tag: raise ValueError(f"未找到目标节点: {target_name}") else: key_point_tags = [t for t in tags if t["dimension"] == "关键点"] if not key_point_tags: raise ValueError("没有找到关键点标签") target_tag = key_point_tags[0] # 候选节点筛选:灵感点/目的点的候选集排除关键点 target_dimension = target_tag["dimension"] candidate_tags = [] for t in tags: if t["name"] == target_tag["name"]: continue if target_dimension in ["灵感点", "目的点"] and t["dimension"] == "关键点": continue candidate_tags.append(t) return { "目标特征": { "特征名称": target_tag["name"], "特征类型": target_tag["dimension"] }, "候选特征": [ { "特征名称": t["name"], "特征类型": t["dimension"] } for t in candidate_tags ] } # ===== Prompt 构建 ===== def build_filter_prompt(input_data: Dict) -> str: """构建第一步筛选 prompt""" target = input_data["目标特征"] candidates = input_data["候选特征"] # 构建候选特征列表 candidates_text = [] for c in candidates: candidates_text.append(f"- {c['特征名称']} ({c['特征类型']})") candidates_section = "\n".join(candidates_text) return f'''# 背景 推理一个小红书帖子选题前脑海中的点,在创作者脑中的因果顺序 # Task 请分析【输入数据】与【目标点】的关系,按以下两类筛选证据: 1. **单独推理**:哪个特征单凭自己就能有可能指向目标特征? 2. **组合推理**:哪几个特征必须结合在一起,才能指向目标特征?(缺一不可才算组合) 如果能独立推出则无需组合。 # 筛选原则 1. 实质推形式,而不是形式推实质 2. 因推果而不是果推因 3. 目的推理手段而不是手段推理目的 4. 只有当 A 是 B 的充分必要条件的时候,A 可以推理出 B **本次分析的目标特征是:{target['特征名称']}({target['特征类型']})** # 输入数据 {candidates_section} # 输出格式 请严格按照以下 JSON 结构输出: ```json {{ "目标特征": "{target['特征名称']}", "预备分析列表": {{ "单独推理": [ {{ "来源特征": "特征A", "来源特征类型": "灵感点/目的点/关键点", "初步理由": "简要说明为什么这个特征可能推导出目标" }} ], "组合推理": [ {{ "组合成员": ["特征B", "特征C"], "成员类型": ["目的点", "关键点"], "初步理由": "简要说明为什么这些特征需要组合才能推导出目标" }} ] }} }} ``` 注意: - 单独推理的来源特征必须是输入数据中的原话 - 组合推理的成员数量通常为 2-3 个 - 如果某个特征完全无法推导出目标,不要勉强添加 '''.strip() def build_evaluate_prompt(input_data: Dict, filter_result: Dict) -> str: """构建第二步评估 prompt""" target = input_data["目标特征"] prep_list = filter_result.get("预备分析列表", {}) # 构建单独推理列表 single_items = prep_list.get("单独推理", []) single_text = "" if single_items: for item in single_items: single_text += f"- {item.get('来源特征', '')}({item.get('来源特征类型', '')})\n" else: single_text = "(无)\n" # 构建组合推理列表 combo_items = prep_list.get("组合推理", []) combo_text = "" if combo_items: for item in combo_items: members = " + ".join(item.get("组合成员", [])) combo_text += f"- {members}\n" else: combo_text = "(无)\n" return f'''# 背景 推理一个小红书帖子选题前的点,在创作者脑中的因果顺序 # Task 请判断以下筛选出的特征推理出【{target['特征名称']}】的可能性 ## 待评估的单独推理特征: {single_text} ## 待评估的组合推理特征: {combo_text} # 推理约束 1. 实质推形式,而不是形式推实质 2. 因推果而不是果推因 3. 目的推理手段而不是手段推理目的 4. 只有当 A 是 B 的充分必要条件的时候,A 可以推理出 B # 评分标准 | 分数范围 | 等级 | 说明 | |---------|------|------| | 0.80 - 1.00 | 逻辑必然 | A 是 B 的充分必要条件,必然推导 | | 0.50 - 0.79 | 高可能性 | A 高度倾向于推导出 B,但非唯一选择 | | 0.20 - 0.49 | 创意偏好 | A 可以推导出 B,但其他选择同样可行 | | 0.00 - 0.19 | 弱关联 | A 与 B 关联性很弱,不建议保留 | # 输出格式 请严格按照以下 JSON 结构输出: ```json {{ "目标关键特征": "{target['特征名称']}", "推理分析": {{ "单独推理": [ {{ "来源特征": "特征A", "来源特征类型": "灵感点/目的点/关键点", "可能性": 0.xx, "结论": "详细说明推导逻辑..." }} ], "组合推理": [ {{ "组合成员": ["特征B", "特征C"], "成员类型": ["目的点", "关键点"], "可能性": 0.xx, "结论": "详细说明组合推导逻辑..." }} ] }} }} ``` 注意: - 如果某个特征经评估后可能性低于 0.2,可以标注但建议说明原因 - 结论要清晰说明推导逻辑,避免空洞表述 '''.strip() # ===== 主分析函数 ===== async def analyze_node_origin( post_id: str = None, target_name: str = None, config: PathConfig = None ) -> Dict: """分析目标节点可能由哪些候选节点推导而来(两步法)""" if config is None: config = PathConfig() # 获取帖子图谱文件 post_graph_files = get_post_graph_files(config) if not post_graph_files: raise ValueError("没有找到帖子图谱文件") # 选择帖子 if post_id: target_file = next( (f for f in post_graph_files if post_id in f.name), None ) if not target_file: raise ValueError(f"未找到帖子: {post_id}") else: target_file = post_graph_files[0] # 加载帖子图谱 post_graph = load_post_graph(target_file) actual_post_id = post_graph.get("meta", {}).get("postId", "unknown") # 准备输入数据 input_data = prepare_analyze_input(post_graph, target_name) actual_target_name = input_data["目标特征"]["特征名称"] print(f"帖子ID: {actual_post_id}") print(f"目标特征: {actual_target_name}") print(f"候选特征数: {len(input_data['候选特征'])}") # ===== 第一步:筛选 ===== filter_prompt = build_filter_prompt(input_data) with custom_span( name=f"Step1 筛选 - {actual_target_name}", data={"目标特征": actual_target_name} ): filter_result_raw = await Runner.run(filter_agent, input=filter_prompt) filter_output = filter_result_raw.final_output # 解析筛选结果 try: filter_result = parse_json_output(filter_output) except Exception as e: return { "帖子id": actual_post_id, "目标节点": actual_target_name, "模型": MODEL_NAME, "输入": input_data, "输出": None, "错误": f"筛选步骤解析失败: {str(e)}", "原始输出_筛选": filter_output } # 检查是否有可评估的特征 prep_list = filter_result.get("预备分析列表", {}) single_count = len(prep_list.get("单独推理", [])) combo_count = len(prep_list.get("组合推理", [])) if single_count == 0 and combo_count == 0: return { "帖子id": actual_post_id, "目标节点": actual_target_name, "模型": MODEL_NAME, "输入": input_data, "筛选结果": filter_result, "输出": { "目标关键特征": actual_target_name, "推理分析": { "单独推理": [], "组合推理": [] } }, "说明": "筛选步骤未找到可推导的特征" } print(f" 筛选结果: 单独推理 {single_count} 个, 组合推理 {combo_count} 个") # ===== 第二步:评估 ===== evaluate_prompt = build_evaluate_prompt(input_data, filter_result) with custom_span( name=f"Step2 评估 - {actual_target_name}", data={"单独推理数": single_count, "组合推理数": combo_count} ): evaluate_result_raw = await Runner.run(evaluate_agent, input=evaluate_prompt) evaluate_output = evaluate_result_raw.final_output # 解析评估结果 try: evaluate_result = parse_json_output(evaluate_output) except Exception as e: return { "帖子id": actual_post_id, "目标节点": actual_target_name, "模型": MODEL_NAME, "输入": input_data, "筛选结果": filter_result, "输出": None, "错误": f"评估步骤解析失败: {str(e)}", "原始输出_评估": evaluate_output } return { "帖子id": actual_post_id, "目标节点": actual_target_name, "模型": MODEL_NAME, "输入": input_data, "筛选结果": filter_result, "输出": evaluate_result } def parse_json_output(output: str) -> Dict: """解析 JSON 输出""" if "```json" in output: json_start = output.find("```json") + 7 json_end = output.find("```", json_start) json_str = output[json_start:json_end].strip() elif "{" in output and "}" in output: json_start = output.find("{") json_end = output.rfind("}") + 1 json_str = output[json_start:json_end] else: json_str = output return json.loads(json_str) # ===== 图谱构建函数 ===== def build_origin_graph(all_results: List[Dict], post_id: str) -> Dict: """将分析结果转换为图谱格式""" nodes = {} edges = {} # 特征名到节点ID的映射(用于修正 LLM 返回的类型名不匹配问题) name_to_node_id = {} for result in all_results: target_input = result.get("输入", {}) # 添加目标节点 target_info = target_input.get("目标特征", {}) target_name = target_info.get("特征名称", "") target_type = target_info.get("特征类型", "关键点") node_id = f"帖子:{target_type}:标签:{target_name}" if node_id not in nodes: nodes[node_id] = { "name": target_name, "type": "标签", "dimension": target_type, "domain": "帖子", "detail": {} } name_to_node_id[target_name] = node_id # 添加候选特征节点 for candidate in target_input.get("候选特征", []): c_name = candidate.get("特征名称", "") c_type = candidate.get("特征类型", "关键点") c_node_id = f"帖子:{c_type}:标签:{c_name}" if c_node_id not in nodes: nodes[c_node_id] = { "name": c_name, "type": "标签", "dimension": c_type, "domain": "帖子", "detail": {} } name_to_node_id[c_name] = c_node_id # 构建推导边 for result in all_results: target_name = result.get("目标特征", "") # 使用映射获取正确的节点ID target_node_id = name_to_node_id.get(target_name) if not target_node_id: continue # V4 的推理分析在顶层,不是在 输出 下面 reasoning = result.get("推理分析", {}) # 单独推理的边 for item in reasoning.get("单独推理", []): source_name = item.get("来源特征", "") # 使用映射获取正确的节点ID(而非LLM返回的类型名) source_node_id = name_to_node_id.get(source_name) if not source_node_id: continue probability = item.get("可能性", 0) edge_id = f"{source_node_id}|推导|{target_node_id}" edges[edge_id] = { "source": source_node_id, "target": target_node_id, "type": "推导", "score": probability, "detail": { "推理类型": "单独推理", "结论": item.get("结论", "") } } # 组合推理的边 for item in reasoning.get("组合推理", []): members = item.get("组合成员", []) probability = item.get("可能性", 0) # 验证所有成员都存在于映射中 member_node_ids = [] valid = True for m in members: m_node_id = name_to_node_id.get(m) if not m_node_id: valid = False break member_node_ids.append((m, m_node_id)) if not valid: continue # 按名称排序 sorted_member_ids = sorted(member_node_ids, key=lambda x: x[0]) # 从节点ID中提取实际的维度类型(帖子:灵感点:标签:xxx -> 灵感点) combo_parts = [] for m_name, m_node_id in sorted_member_ids: parts = m_node_id.split(":") m_dimension = parts[1] if len(parts) > 1 else "关键点" combo_parts.append(f"{m_dimension}:{m_name}") combo_name = " + ".join(combo_parts) combo_node_id = f"帖子:组合:组合:{combo_name}" if combo_node_id not in nodes: nodes[combo_node_id] = { "name": combo_name, "type": "组合", "dimension": "组合", "domain": "帖子", "detail": { "成员": [m for m, _ in sorted_member_ids], "成员类型": [m_node_id.split(":")[1] for _, m_node_id in sorted_member_ids] } } # 组合节点到目标的边 edge_id = f"{combo_node_id}|推导|{target_node_id}" edges[edge_id] = { "source": combo_node_id, "target": target_node_id, "type": "推导", "score": probability, "detail": { "推理类型": "组合推理", "结论": item.get("结论", "") } } # 成员到组合节点的边 for m_name, m_node_id in sorted_member_ids: m_edge_id = f"{m_node_id}|组成|{combo_node_id}" if m_edge_id not in edges: edges[m_edge_id] = { "source": m_node_id, "target": combo_node_id, "type": "组成", "score": 1.0, "detail": {} } return { "meta": { "postId": post_id, "type": "推导图谱", "version": "v4", "stats": { "nodeCount": len(nodes), "edgeCount": len(edges) } }, "nodes": nodes, "edges": edges } # ===== 辅助函数 ===== def get_all_target_names(post_graph: Dict, dimensions: List[str] = None) -> List[str]: """获取所有可作为目标的特征名称""" if dimensions is None: dimensions = ["灵感点", "目的点", "关键点"] tags = extract_tags_from_post_graph(post_graph) return [t["name"] for t in tags if t["dimension"] in dimensions] def get_score_level(score: float) -> str: """根据分数返回等级""" if score >= 0.80: return "逻辑必然" elif score >= 0.50: return "高可能性" elif score >= 0.20: return "创意偏好" else: return "弱关联" def display_result(result: Dict): """显示单个分析结果""" output = result.get("输出") if output: print(f"\n目标关键特征: {output.get('目标关键特征', 'N/A')}") reasoning = output.get("推理分析", {}) # 显示单独推理 single = reasoning.get("单独推理", []) if single: print(" 【单独推理】") for item in single[:5]: score = item.get("可能性", 0) level = get_score_level(score) print(f" [{score:.2f} {level}] {item.get('来源特征', '')}") # 显示组合推理 combo = reasoning.get("组合推理", []) if combo: print(" 【组合推理】") for item in combo[:3]: members = " + ".join(item.get("组合成员", [])) score = item.get("可能性", 0) level = get_score_level(score) print(f" [{score:.2f} {level}] {members}") else: error = result.get("错误", "") if error: print(f" 分析失败: {error}") else: print(f" {result.get('说明', '无结果')}") # ===== 单帖子处理函数 ===== async def process_single_post( post_file: Path, config: PathConfig, target_name: str = None, num_targets: int = 999, dimensions: List[str] = None ): """处理单个帖子""" if dimensions is None: dimensions = ["灵感点", "目的点", "关键点"] # 为每个帖子生成独立的 trace current_time, log_url = set_trace() # 加载帖子图谱 post_graph = load_post_graph(post_file) actual_post_id = post_graph.get("meta", {}).get("postId", "unknown") print(f"\n{'=' * 60}") print(f"帖子ID: {actual_post_id}") print(f"Trace URL: {log_url}") # 确定要分析的目标特征列表 if target_name: target_names = [target_name] else: all_targets = get_all_target_names(post_graph, dimensions) target_names = all_targets[:num_targets] print(f"待分析目标特征: {target_names}") print("-" * 60) # 输出目录 output_dir = config.intermediate_dir / "node_origin_analysis" output_dir.mkdir(parents=True, exist_ok=True) # 使用 trace 上下文包裹单个帖子的分析 with trace(f"节点来源分析 V4 - {actual_post_id}"): # 并发分析所有目标特征 async def analyze_single(name: str, index: int): print(f"\n[{index}/{len(target_names)}] 开始分析: {name}") result = await analyze_node_origin( post_id=actual_post_id, target_name=name, config=config ) print(f"[{index}/{len(target_names)}] 完成: {name}") display_result(result) output = result.get("输出", {}) return { "目标特征": result.get("目标节点"), "筛选结果": result.get("筛选结果"), "推理分析": output.get("推理分析", {}) if output else {}, "输入": result.get("输入"), "错误": result.get("错误"), "说明": result.get("说明") } # 创建并发任务 tasks = [ analyze_single(name, i) for i, name in enumerate(target_names, 1) ] # 并发执行 all_results = await asyncio.gather(*tasks) # 合并保存到一个文件 merged_output = { "元数据": { "current_time": current_time, "log_url": log_url, "model": MODEL_NAME, "version": "v4" }, "帖子id": actual_post_id, "分析结果列表": all_results } output_file = output_dir / f"{actual_post_id}_来源分析_v4.json" with open(output_file, "w", encoding="utf-8") as f: json.dump(merged_output, f, ensure_ascii=False, indent=2) # 生成推导关系图谱 graph_output = build_origin_graph(all_results, actual_post_id) graph_file = output_dir / f"{actual_post_id}_推导图谱_v4.json" with open(graph_file, "w", encoding="utf-8") as f: json.dump(graph_output, f, ensure_ascii=False, indent=2) print(f"\n完成! 共分析 {len(target_names)} 个目标特征") print(f"分析结果: {output_file}") print(f"推导图谱: {graph_file}") print(f"Trace: {log_url}") return actual_post_id # ===== 主函数 ===== async def main( post_id: str = None, target_name: str = None, num_targets: int = 999, dimensions: List[str] = None, all_posts: bool = False ): """主函数""" if dimensions is None: dimensions = ["灵感点", "目的点", "关键点"] config = PathConfig() print(f"账号: {config.account_name}") print(f"使用模型: {MODEL_NAME}") print(f"分析维度: {dimensions}") print(f"版本: V4 (两步法: 筛选 + 评估)") # 获取帖子图谱文件 post_graph_files = get_post_graph_files(config) if not post_graph_files: print("错误: 没有找到帖子图谱文件") return # 确定要处理的帖子列表 if post_id: target_file = next( (f for f in post_graph_files if post_id in f.name), None ) if not target_file: print(f"错误: 未找到帖子 {post_id}") return files_to_process = [target_file] elif all_posts: files_to_process = post_graph_files else: files_to_process = [post_graph_files[0]] print(f"待处理帖子数: {len(files_to_process)}") # 逐个处理帖子 processed_posts = [] for i, post_file in enumerate(files_to_process, 1): print(f"\n{'#' * 60}") print(f"# 处理帖子 {i}/{len(files_to_process)}") print(f"{'#' * 60}") post_id_result = await process_single_post( post_file=post_file, config=config, target_name=target_name, num_targets=num_targets, dimensions=dimensions ) processed_posts.append(post_id_result) print(f"\n{'#' * 60}") print(f"# 全部完成! 共处理 {len(processed_posts)} 个帖子") print(f"{'#' * 60}") def rebuild_graph_from_file(analysis_file: Path) -> None: """从已有的分析结果文件重建图谱""" with open(analysis_file, "r", encoding="utf-8") as f: data = json.load(f) post_id = data.get("帖子id", "unknown") all_results = data.get("分析结果列表", []) print(f"从分析文件重建图谱: {analysis_file.name}") print(f"帖子ID: {post_id}") print(f"分析结果数: {len(all_results)}") # 构建图谱 graph_output = build_origin_graph(all_results, post_id) # 保存图谱 graph_file = analysis_file.parent / f"{post_id}_推导图谱_v4.json" with open(graph_file, "w", encoding="utf-8") as f: json.dump(graph_output, f, ensure_ascii=False, indent=2) print(f"图谱已保存: {graph_file}") print(f"节点数: {graph_output['meta']['stats']['nodeCount']}") print(f"边数: {graph_output['meta']['stats']['edgeCount']}") if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="分析节点来源 (V4 两步法)") parser.add_argument("--post-id", type=str, help="帖子ID(指定则只处理该帖子)") parser.add_argument("--target", type=str, help="目标节点名称(指定则只分析这一个特征)") parser.add_argument("--num", type=int, default=999, help="要分析的目标特征数量") parser.add_argument("--dims", type=str, nargs="+", choices=["灵感点", "目的点", "关键点"], help="指定要分析的维度(默认全部)") parser.add_argument("--all-posts", action="store_true", help="处理所有帖子") parser.add_argument("--rebuild-graph", type=str, metavar="FILE", help="从已有分析文件重建图谱(不重新分析)") args = parser.parse_args() # 如果指定了 --rebuild-graph,只重建图谱 if args.rebuild_graph: rebuild_graph_from_file(Path(args.rebuild_graph)) else: # 确定维度(默认所有维度) if args.dims: dimensions = args.dims else: dimensions = ["灵感点", "目的点", "关键点"] # 运行主函数 asyncio.run(main( post_id=args.post_id, target_name=args.target, num_targets=args.num, dimensions=dimensions, all_posts=args.all_posts ))