Kaynağa Gözat

feat: 添加节点来源分析脚本 V3

采用"法庭取证式"两步验证法:
- 步骤1:列举强力竞品
- 步骤2:排他性检验(无排他性证据时可能性≤0.4)

主要改进:
- 新评分标准:逻辑必然/高适配性/创意偏好/弱关联
- 灵感点/目的点的候选集排除关键点
- 每个帖子独立生成 trace 和 log_url
- 默认分析所有维度(灵感点+目的点+关键点)
- 支持批量处理多个帖子(--all-posts)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
yangxiaohui 17 saat önce
ebeveyn
işleme
fee3f89a44
1 değiştirilmiş dosya ile 775 ekleme ve 0 silme
  1. 775 0
      script/data_processing/analyze_node_origin_v3.py

+ 775 - 0
script/data_processing/analyze_node_origin_v3.py

@@ -0,0 +1,775 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+节点来源分析脚本 V3
+
+采用"法庭取证式"思维,通过两步验证法(竞品列举 + 排他性检验)严格推导特征来源。
+
+核心改进:
+1. 两步验证法:先列举竞品,再做排他性检验
+2. 严格评分:无排他性证据时可能性不超过0.4
+3. 微观逻辑优先:组合推理寻找化学反应而非宏观目的
+
+输入:post_graph 目录中的帖子图谱文件
+输出:节点来源分析结果
+"""
+
+import asyncio
+import json
+from pathlib import Path
+from typing import Dict, List, Optional
+import sys
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(project_root))
+
+from agents import Agent, Runner, ModelSettings, trace
+from agents.tracing.create import custom_span
+from lib.client import get_model
+from lib.my_trace import set_trace_smith as set_trace
+from script.data_processing.path_config import PathConfig
+
+# 模型配置
+MODEL_NAME = "google/gemini-3-pro-preview"
+# MODEL_NAME = 'deepseek/deepseek-v3.2'
+# MODEL_NAME = 'anthropic/claude-sonnet-4.5'
+
+agent = Agent(
+    name="Node Origin Analyzer V3",
+    model=get_model(MODEL_NAME),
+    model_settings=ModelSettings(
+        temperature=0.0,
+        max_tokens=65536,
+    ),
+    tools=[],
+)
+
+
+# ===== 数据提取函数 =====
+
+def get_post_graph_files(config: PathConfig) -> List[Path]:
+    """获取所有帖子图谱文件"""
+    post_graph_dir = config.intermediate_dir / "post_graph"
+    return sorted(post_graph_dir.glob("*_帖子图谱.json"))
+
+
+def load_post_graph(file_path: Path) -> Dict:
+    """加载帖子图谱"""
+    with open(file_path, "r", encoding="utf-8") as f:
+        return json.load(f)
+
+
+def extract_tags_from_post_graph(post_graph: Dict) -> List[Dict]:
+    """
+    从帖子图谱中提取标签节点
+
+    筛选条件:type === "标签" 且 domain === "帖子"
+
+    Returns:
+        标签节点列表
+    """
+    tags = []
+    for node_id, node in post_graph.get("nodes", {}).items():
+        if node.get("type") == "标签" and node.get("domain") == "帖子":
+            tags.append({
+                "id": node_id,
+                "name": node.get("name", ""),
+                "dimension": node.get("dimension", ""),
+                "description": node.get("detail", {}).get("description", ""),
+                "pointNames": node.get("detail", {}).get("pointNames", []),
+            })
+    return tags
+
+
+def prepare_analyze_input(
+    post_graph: Dict,
+    target_name: str = None
+) -> Dict:
+    """
+    准备分析输入数据
+
+    Args:
+        post_graph: 帖子图谱数据
+        target_name: 目标节点名称,如果为 None 则使用关键点标签的第一个
+
+    Returns:
+        分析输入数据结构
+    """
+    # 提取所有标签节点
+    tags = extract_tags_from_post_graph(post_graph)
+
+    if not tags:
+        raise ValueError("帖子图谱中没有找到标签节点")
+
+    # 确定目标节点
+    if target_name:
+        target_tag = next((t for t in tags if t["name"] == target_name), None)
+        if not target_tag:
+            raise ValueError(f"未找到目标节点: {target_name}")
+    else:
+        # 默认使用关键点标签的第一个
+        key_point_tags = [t for t in tags if t["dimension"] == "关键点"]
+        if not key_point_tags:
+            raise ValueError("没有找到关键点标签")
+        target_tag = key_point_tags[0]
+
+    # 候选节点筛选逻辑:
+    # - 排除目标节点本身
+    # - 如果目标是灵感点或目的点,排除关键点(关键点由灵感点/目的点推导,不应反推)
+    target_dimension = target_tag["dimension"]
+    candidate_tags = []
+    for t in tags:
+        if t["name"] == target_tag["name"]:
+            continue  # 排除目标节点本身
+        if target_dimension in ["灵感点", "目的点"] and t["dimension"] == "关键点":
+            continue  # 灵感点/目的点的候选集排除关键点
+        candidate_tags.append(t)
+
+    # 构建输入(包含特征类型信息)
+    return {
+        "目标特征": {
+            "特征名称": target_tag["name"],
+            "特征类型": target_tag["dimension"]
+        },
+        "候选特征": [
+            {
+                "特征名称": t["name"],
+                "特征类型": t["dimension"]
+            }
+            for t in candidate_tags
+        ],
+        "边关系": []
+    }
+
+
+# ===== Prompt 构建 =====
+
+def build_prompt(input_data: Dict) -> str:
+    """
+    构建分析 prompt(V3 版本:法庭取证式两步验证法)
+
+    Args:
+        input_data: 分析输入数据(包含目标节点和候选节点,都带维度信息)
+
+    Returns:
+        prompt 文本
+    """
+    target = input_data["目标特征"]
+    candidates = input_data["候选特征"]
+
+    # 构建候选特征列表
+    candidates_text = []
+    for c in candidates:
+        candidates_text.append(f"- {c['特征名称']} ({c['特征类型']})")
+    candidates_section = "\n".join(candidates_text)
+
+    return f'''# Role
+你是一名严谨的内容逆向工程分析师,专门擅长拆解创意决策背后的逻辑链条。你的思维方式是"法庭取证式"的,只承认证据确凿的推导,坚决反对没有任何依据的"脑补"连接。
+
+# Task
+分析给定的【帖子特征列表】是如何推导出【目标特征】的。
+**本次分析的目标特征是:{target['特征名称']}**
+
+# 核心推理协议
+
+为了防止过度联想,你必须对每一个推理组合执行以下**两步验证法**。跳过步骤将视为分析失败。
+
+## 步骤 1:列举强力竞品
+不要默认必须选择目标({target['特征名称']})。基于【来源特征】的意图,思考还有什么其他形式能达到同样的效果?
+*   *要求*:必须列出至少 2 个**除了目标以外**的合理选项(例如:如果是为了"互动",竞品可以是抽奖、投票、话题挑战;如果是为了"搞笑",竞品可以是段子、四格漫画)。
+
+## 步骤 2:排他性检验
+这是最关键的一步。检查【来源特征】中是否有具体的细节,能够**从逻辑上杀死**步骤 1 中的竞品?
+*   *判定标准*:
+    *   如果有特征明确指向"{target['特征名称']}"的独有属性,则具有排他性。
+    *   如果仅仅是泛化的目的(如"为了搞笑"、"为了互动"),这些特征**无法排除**其他竞品。
+    *   **如果没有排他性证据,该组合的推导可能性严禁超过 0.4。**
+
+# 评分标准
+
+| 分数范围 | 等级 | 说明 |
+|---------|------|------|
+| 0.80 - 1.00 | 逻辑必然 | 存在无可辩驳的证据表明,必须采用目标形式,否则内容的核心功能或分发需求无法满足。 |
+| 0.50 - 0.79 | 高适配性 | 虽然没有绝对的强制性,但结合内容特性和市场/文化习惯,目标形式是最贴切、最有效的选择,其他形式会显得低效或别扭。 |
+| 0.20 - 0.49 | 创意偏好 | 目标形式是一个可行的、不错的创意选择,但其他形式也同样适用,甚至可能更优。决策更倾向于创意团队的偏好。 |
+| 0.00 - 0.19 | 弱关联 | 特征与目标形式之间缺乏有效的逻辑连接,关联性很弱或属于主观臆测。 |
+
+如果没有合适的选项,无需强行推理。
+
+# 组合推理特别规则
+微观逻辑优先:组合推理不应好高骛远。优先寻找微观的化学反应(例如 A+B 变成了 C),而不是宏观的目的(例如 A+B 为了引流)。组合数量通常小于等于 3 个。
+
+# 输入数据
+{candidates_section}
+
+# 输出格式 (JSON)
+1. 在 `单独推理` 中,`来源特征` 字段**严禁出现** "+"、"和"、"&" 等连接符,必须是输入中的原话。
+2. 如果你觉得两个特征必须在一起说才有意义,请直接跳过单独推理,将其放入 `组合推理`。
+
+请严格按照以下 JSON 结构输出,不要包含任何 Markdown 格式以外的废话:
+
+```json
+{{
+  "目标关键特征": "{target['特征名称']}",
+  "推理分析": {{
+    "单独推理": [
+      {{
+        "来源特征": "...",
+        "来源特征类型": "灵感点/目的点/关键点",
+        "1_替代方案竞品": ["...", "..."],
+        "2_排他性检验": "分析来源特征是否包含能排除上述竞品的证据。如果没有,请明确写出'无法排除竞品'。",
+        "可能性": 0.xx,
+        "结论": "..."
+      }}
+    ],
+    "组合推理": [
+      {{
+        "组合成员": ["...", "..."],
+        "成员类型": ["灵感点/目的点/关键点", "..."],
+        "1_替代方案竞品": ["...", "..."],
+        "2_排他性检验": "分析组合在一起后,是否产生了排除竞品的新逻辑?",
+        "可能性": 0.xx,
+        "结论": "..."
+      }}
+    ]
+  }}
+}}
+```
+'''.strip()
+
+
+# ===== 主分析函数 =====
+
+async def analyze_node_origin(
+    post_id: str = None,
+    target_name: str = None,
+    config: PathConfig = None
+) -> Dict:
+    """
+    分析目标节点可能由哪些候选节点推导而来
+
+    Args:
+        post_id: 帖子ID,默认使用第一个帖子
+        target_name: 目标节点名称,默认使用关键点标签的第一个
+        config: 路径配置,如果为 None 则创建默认配置
+
+    Returns:
+        分析结果
+    """
+    if config is None:
+        config = PathConfig()
+
+    # 获取帖子图谱文件
+    post_graph_files = get_post_graph_files(config)
+    if not post_graph_files:
+        raise ValueError("没有找到帖子图谱文件")
+
+    # 选择帖子
+    if post_id:
+        target_file = next(
+            (f for f in post_graph_files if post_id in f.name),
+            None
+        )
+        if not target_file:
+            raise ValueError(f"未找到帖子: {post_id}")
+    else:
+        target_file = post_graph_files[0]
+
+    # 加载帖子图谱
+    post_graph = load_post_graph(target_file)
+    actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
+
+    # 准备输入数据
+    input_data = prepare_analyze_input(post_graph, target_name)
+    actual_target_name = input_data["目标特征"]["特征名称"]
+
+    # 构建 prompt
+    prompt = build_prompt(input_data)
+
+    print(f"帖子ID: {actual_post_id}")
+    print(f"目标特征: {actual_target_name}")
+    print(f"候选特征数: {len(input_data['候选特征'])}")
+    print()
+
+    # 使用 custom_span 标识分析流程
+    with custom_span(
+        name=f"分析特征来源 V3 - {actual_target_name}",
+        data={
+            "帖子id": actual_post_id,
+            "目标特征": actual_target_name,
+            "候选特征数": len(input_data["候选特征"]),
+            "模型": MODEL_NAME
+        }
+    ):
+        # 调用 agent
+        result = await Runner.run(agent, input=prompt)
+        output = result.final_output
+
+    # 解析 JSON
+    try:
+        if "```json" in output:
+            json_start = output.find("```json") + 7
+            json_end = output.find("```", json_start)
+            json_str = output[json_start:json_end].strip()
+        elif "{" in output and "}" in output:
+            json_start = output.find("{")
+            json_end = output.rfind("}") + 1
+            json_str = output[json_start:json_end]
+        else:
+            json_str = output
+
+        analysis_result = json.loads(json_str)
+
+        return {
+            "帖子id": actual_post_id,
+            "目标节点": actual_target_name,
+            "模型": MODEL_NAME,
+            "输入": input_data,
+            "输出": analysis_result
+        }
+    except Exception as e:
+        return {
+            "帖子id": actual_post_id,
+            "目标节点": actual_target_name,
+            "模型": MODEL_NAME,
+            "输入": input_data,
+            "输出": None,
+            "错误": str(e),
+            "原始输出": output
+        }
+
+
+# ===== 图谱构建函数 =====
+
+def build_origin_graph(all_results: List[Dict], post_id: str) -> Dict:
+    """
+    将分析结果转换为图谱格式
+
+    Args:
+        all_results: 所有目标特征的分析结果
+        post_id: 帖子ID
+
+    Returns:
+        图谱数据,包含 nodes 和 edges
+    """
+    nodes = {}
+    edges = {}
+
+    # 从输入收集所有特征节点
+    for result in all_results:
+        target_input = result.get("输入", {})
+
+        # 添加目标节点
+        target_info = target_input.get("目标特征", {})
+        target_name = target_info.get("特征名称", "")
+        target_type = target_info.get("特征类型", "关键点")
+        node_id = f"帖子:{target_type}:标签:{target_name}"
+        if node_id not in nodes:
+            nodes[node_id] = {
+                "name": target_name,
+                "type": "标签",
+                "dimension": target_type,
+                "domain": "帖子",
+                "detail": {}
+            }
+
+        # 添加候选特征节点
+        for candidate in target_input.get("候选特征", []):
+            c_name = candidate.get("特征名称", "")
+            c_type = candidate.get("特征类型", "关键点")
+            c_node_id = f"帖子:{c_type}:标签:{c_name}"
+            if c_node_id not in nodes:
+                nodes[c_node_id] = {
+                    "name": c_name,
+                    "type": "标签",
+                    "dimension": c_type,
+                    "domain": "帖子",
+                    "detail": {}
+                }
+
+    # 构建推导边
+    for result in all_results:
+        target_name = result.get("目标特征", "")
+        target_input = result.get("输入", {})
+        target_info = target_input.get("目标特征", {})
+        target_type = target_info.get("特征类型", "关键点")
+        target_node_id = f"帖子:{target_type}:标签:{target_name}"
+
+        reasoning = result.get("推理分析", {})
+
+        # 单独推理的边
+        for item in reasoning.get("单独推理", []):
+            source_name = item.get("来源特征", "")
+            source_type = item.get("来源特征类型", "关键点")
+            source_node_id = f"帖子:{source_type}:标签:{source_name}"
+            probability = item.get("可能性", 0)
+
+            edge_id = f"{source_node_id}|推导|{target_node_id}"
+            edges[edge_id] = {
+                "source": source_node_id,
+                "target": target_node_id,
+                "type": "推导",
+                "score": probability,
+                "detail": {
+                    "推理类型": "单独推理",
+                    "替代方案竞品": item.get("1_替代方案竞品", []),
+                    "排他性检验": item.get("2_排他性检验", ""),
+                    "结论": item.get("结论", "")
+                }
+            }
+
+        # 组合推理的边(用虚拟节点表示组合)
+        for item in reasoning.get("组合推理", []):
+            members = item.get("组合成员", [])
+            member_types = item.get("成员类型", [])
+            probability = item.get("可能性", 0)
+
+            # 创建组合虚拟节点(排序成员以保证唯一性)
+            member_pairs = list(zip(members, member_types)) if len(member_types) == len(members) else [(m, "关键点") for m in members]
+            sorted_pairs = sorted(member_pairs, key=lambda x: x[0])
+            sorted_members = [p[0] for p in sorted_pairs]
+            sorted_types = [p[1] for p in sorted_pairs]
+
+            # 组合名称和ID包含类型信息
+            combo_parts = [f"{sorted_types[i]}:{m}" for i, m in enumerate(sorted_members)]
+            combo_name = " + ".join(combo_parts)
+            combo_node_id = f"帖子:组合:组合:{combo_name}"
+            if combo_node_id not in nodes:
+                nodes[combo_node_id] = {
+                    "name": combo_name,
+                    "type": "组合",
+                    "dimension": "组合",
+                    "domain": "帖子",
+                    "detail": {
+                        "成员": sorted_members,
+                        "成员类型": sorted_types
+                    }
+                }
+
+            # 组合节点到目标的边
+            edge_id = f"{combo_node_id}|推导|{target_node_id}"
+            edges[edge_id] = {
+                "source": combo_node_id,
+                "target": target_node_id,
+                "type": "推导",
+                "score": probability,
+                "detail": {
+                    "推理类型": "组合推理",
+                    "替代方案竞品": item.get("1_替代方案竞品", []),
+                    "排他性检验": item.get("2_排他性检验", ""),
+                    "结论": item.get("结论", "")
+                }
+            }
+
+            # 成员到组合节点的边
+            for i, member in enumerate(sorted_members):
+                m_type = sorted_types[i]
+                m_node_id = f"帖子:{m_type}:标签:{member}"
+                m_edge_id = f"{m_node_id}|组成|{combo_node_id}"
+                if m_edge_id not in edges:
+                    edges[m_edge_id] = {
+                        "source": m_node_id,
+                        "target": combo_node_id,
+                        "type": "组成",
+                        "score": 1.0,
+                        "detail": {}
+                    }
+
+    return {
+        "meta": {
+            "postId": post_id,
+            "type": "推导图谱",
+            "version": "v3",
+            "stats": {
+                "nodeCount": len(nodes),
+                "edgeCount": len(edges)
+            }
+        },
+        "nodes": nodes,
+        "edges": edges
+    }
+
+
+# ===== 辅助函数 =====
+
+def get_all_target_names(post_graph: Dict, dimensions: List[str] = None) -> List[str]:
+    """
+    获取所有可作为目标的特征名称
+
+    Args:
+        post_graph: 帖子图谱数据
+        dimensions: 要包含的维度列表,默认只包含关键点
+                   可选值: ["灵感点", "目的点", "关键点"]
+
+    Returns:
+        特征名称列表
+    """
+    if dimensions is None:
+        dimensions = ["关键点"]
+
+    tags = extract_tags_from_post_graph(post_graph)
+    return [t["name"] for t in tags if t["dimension"] in dimensions]
+
+
+def get_score_level(score: float) -> str:
+    """根据分数返回等级"""
+    if score >= 0.80:
+        return "逻辑必然"
+    elif score >= 0.50:
+        return "高适配性"
+    elif score >= 0.20:
+        return "创意偏好"
+    else:
+        return "弱关联"
+
+
+def display_result(result: Dict):
+    """显示单个分析结果"""
+    output = result.get("输出")
+    if output:
+        print(f"\n目标关键特征: {output.get('目标关键特征', 'N/A')}")
+
+        reasoning = output.get("推理分析", {})
+
+        # 显示单独推理
+        single = reasoning.get("单独推理", [])
+        if single:
+            print("  【单独推理】")
+            for item in single[:5]:
+                score = item.get("可能性", 0)
+                level = get_score_level(score)
+                print(f"    [{score:.2f} {level}] {item.get('来源特征', '')}")
+                exclusivity = item.get("2_排他性检验", "")
+                if len(exclusivity) > 60:
+                    exclusivity = exclusivity[:60] + "..."
+                print(f"      排他性: {exclusivity}")
+
+        # 显示组合推理
+        combo = reasoning.get("组合推理", [])
+        if combo:
+            print("  【组合推理】")
+            for item in combo[:3]:
+                members = " + ".join(item.get("组合成员", []))
+                score = item.get("可能性", 0)
+                level = get_score_level(score)
+                print(f"    [{score:.2f} {level}] {members}")
+                exclusivity = item.get("2_排他性检验", "")
+                if len(exclusivity) > 60:
+                    exclusivity = exclusivity[:60] + "..."
+                print(f"      排他性: {exclusivity}")
+    else:
+        print(f"  分析失败: {result.get('错误', 'N/A')}")
+
+
+# ===== 单帖子处理函数 =====
+
+async def process_single_post(
+    post_file: Path,
+    config: PathConfig,
+    target_name: str = None,
+    num_targets: int = 999,
+    dimensions: List[str] = None
+):
+    """
+    处理单个帖子
+
+    Args:
+        post_file: 帖子图谱文件路径
+        config: 路径配置
+        target_name: 目标节点名称,可选
+        num_targets: 要分析的目标特征数量
+        dimensions: 要分析的特征维度
+    """
+    if dimensions is None:
+        dimensions = ["关键点"]
+
+    # 为每个帖子生成独立的 trace
+    current_time, log_url = set_trace()
+
+    # 加载帖子图谱
+    post_graph = load_post_graph(post_file)
+    actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
+
+    print(f"\n{'=' * 60}")
+    print(f"帖子ID: {actual_post_id}")
+    print(f"Trace URL: {log_url}")
+
+    # 确定要分析的目标特征列表
+    if target_name:
+        target_names = [target_name]
+    else:
+        all_targets = get_all_target_names(post_graph, dimensions)
+        target_names = all_targets[:num_targets]
+
+    print(f"待分析目标特征: {target_names}")
+    print("-" * 60)
+
+    # 输出目录
+    output_dir = config.intermediate_dir / "node_origin_analysis"
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    # 使用 trace 上下文包裹单个帖子的分析
+    with trace(f"节点来源分析 V3 - {actual_post_id}"):
+        # 并发分析所有目标特征
+        async def analyze_single(name: str, index: int):
+            print(f"\n[{index}/{len(target_names)}] 开始分析: {name}")
+            result = await analyze_node_origin(
+                post_id=actual_post_id,
+                target_name=name,
+                config=config
+            )
+            print(f"[{index}/{len(target_names)}] 完成: {name}")
+            display_result(result)
+
+            # 提取输出中的推理分析(V3 格式)
+            output = result.get("输出", {})
+            return {
+                "目标特征": result.get("目标节点"),
+                "推理分析": output.get("推理分析", {}),
+                "输入": result.get("输入"),
+                "错误": result.get("错误")
+            }
+
+        # 创建并发任务
+        tasks = [
+            analyze_single(name, i)
+            for i, name in enumerate(target_names, 1)
+        ]
+
+        # 并发执行
+        all_results = await asyncio.gather(*tasks)
+
+    # 合并保存到一个文件
+    merged_output = {
+        "元数据": {
+            "current_time": current_time,
+            "log_url": log_url,
+            "model": MODEL_NAME,
+            "version": "v3"
+        },
+        "帖子id": actual_post_id,
+        "分析结果列表": all_results
+    }
+
+    output_file = output_dir / f"{actual_post_id}_来源分析_v3.json"
+    with open(output_file, "w", encoding="utf-8") as f:
+        json.dump(merged_output, f, ensure_ascii=False, indent=2)
+
+    # 生成推导关系图谱
+    graph_output = build_origin_graph(all_results, actual_post_id)
+    graph_file = output_dir / f"{actual_post_id}_推导图谱_v3.json"
+    with open(graph_file, "w", encoding="utf-8") as f:
+        json.dump(graph_output, f, ensure_ascii=False, indent=2)
+
+    print(f"\n完成! 共分析 {len(target_names)} 个目标特征")
+    print(f"分析结果: {output_file}")
+    print(f"推导图谱: {graph_file}")
+    print(f"Trace: {log_url}")
+
+    return actual_post_id
+
+
+# ===== 主函数 =====
+
+async def main(
+    post_id: str = None,
+    target_name: str = None,
+    num_targets: int = 999,
+    dimensions: List[str] = None,
+    all_posts: bool = False
+):
+    """
+    主函数
+
+    Args:
+        post_id: 帖子ID,可选(指定则只处理该帖子)
+        target_name: 目标节点名称,可选(如果指定则只分析这一个)
+        num_targets: 要分析的目标特征数量
+        dimensions: 要分析的特征维度,默认只关键点
+        all_posts: 是否处理所有帖子
+    """
+    if dimensions is None:
+        dimensions = ["关键点"]
+
+    config = PathConfig()
+
+    print(f"账号: {config.account_name}")
+    print(f"使用模型: {MODEL_NAME}")
+    print(f"分析维度: {dimensions}")
+    print(f"版本: V3 (法庭取证式两步验证法)")
+
+    # 获取帖子图谱文件
+    post_graph_files = get_post_graph_files(config)
+    if not post_graph_files:
+        print("错误: 没有找到帖子图谱文件")
+        return
+
+    # 确定要处理的帖子列表
+    if post_id:
+        # 指定了帖子ID
+        target_file = next(
+            (f for f in post_graph_files if post_id in f.name),
+            None
+        )
+        if not target_file:
+            print(f"错误: 未找到帖子 {post_id}")
+            return
+        files_to_process = [target_file]
+    elif all_posts:
+        # 处理所有帖子
+        files_to_process = post_graph_files
+    else:
+        # 默认只处理第一个帖子
+        files_to_process = [post_graph_files[0]]
+
+    print(f"待处理帖子数: {len(files_to_process)}")
+
+    # 逐个处理帖子(每个帖子独立的 trace)
+    processed_posts = []
+    for i, post_file in enumerate(files_to_process, 1):
+        print(f"\n{'#' * 60}")
+        print(f"# 处理帖子 {i}/{len(files_to_process)}")
+        print(f"{'#' * 60}")
+
+        post_id_result = await process_single_post(
+            post_file=post_file,
+            config=config,
+            target_name=target_name,
+            num_targets=num_targets,
+            dimensions=dimensions
+        )
+        processed_posts.append(post_id_result)
+
+    print(f"\n{'#' * 60}")
+    print(f"# 全部完成! 共处理 {len(processed_posts)} 个帖子")
+    print(f"{'#' * 60}")
+
+
+if __name__ == "__main__":
+    import argparse
+
+    parser = argparse.ArgumentParser(description="分析节点来源 (V3 法庭取证式)")
+    parser.add_argument("--post-id", type=str, help="帖子ID(指定则只处理该帖子)")
+    parser.add_argument("--target", type=str, help="目标节点名称(指定则只分析这一个特征)")
+    parser.add_argument("--num", type=int, default=999, help="要分析的目标特征数量")
+    parser.add_argument("--dims", type=str, nargs="+",
+                       choices=["灵感点", "目的点", "关键点"],
+                       help="指定要分析的维度(默认全部)")
+    parser.add_argument("--all-posts", action="store_true", help="处理所有帖子")
+    args = parser.parse_args()
+
+    # 确定维度(默认所有维度)
+    if args.dims:
+        dimensions = args.dims
+    else:
+        dimensions = ["灵感点", "目的点", "关键点"]
+
+    # 运行主函数(每个帖子内部会独立生成 trace)
+    asyncio.run(main(
+        post_id=args.post_id,
+        target_name=args.target,
+        num_targets=args.num,
+        dimensions=dimensions,
+        all_posts=args.all_posts
+    ))