Преглед изворни кода

feat: 添加节点来源分析脚本

- 支持分析目标特征可能由哪些候选特征推导而来
- 增加固有资产判定,区分账号基础设定和创作决策
- 增加因果方向检验,避免因果颠倒
- 支持并发分析多个目标特征
- 结果合并保存到单个文件

🤖 Generated with [Claude Code](https://claude.ai/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
yangxiaohui пре 1 дан
родитељ
комит
9a3fcafe6e

+ 593 - 0
script/data_processing/analyze_node_origin.py

@@ -0,0 +1,593 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+节点来源分析脚本
+
+给定一个目标节点,推断它可能由哪些候选节点推导而来。
+
+输入:post_graph 目录中的帖子图谱文件
+输出:节点来源分析结果
+"""
+
+import asyncio
+import json
+from pathlib import Path
+from typing import Dict, List, Optional, TypedDict
+import sys
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(project_root))
+
+from agents import Agent, Runner, ModelSettings, trace
+from agents.tracing.create import custom_span
+from lib.client import get_model
+from lib.my_trace import set_trace_smith as set_trace
+from script.data_processing.path_config import PathConfig
+
+# 模型配置
+MODEL_NAME = "google/gemini-3-pro-preview"
+# MODEL_NAME = 'deepseek/deepseek-v3.2'
+# MODEL_NAME = 'anthropic/claude-sonnet-4.5'
+
+agent = Agent(
+    name="Node Origin Analyzer",
+    model=get_model(MODEL_NAME),
+    model_settings=ModelSettings(
+        temperature=0.0,
+        max_tokens=65536,
+    ),
+    tools=[],
+)
+
+
+# ===== 类型定义 =====
+
+class NodeInfo(TypedDict):
+    名称: str
+    描述: str
+
+
+class EdgeInfo(TypedDict):
+    from_node: str
+    to_node: str
+    关系: Optional[str]
+    概率: Optional[float]
+
+
+class AnalyzeInput(TypedDict):
+    目标节点: NodeInfo
+    候选节点: List[NodeInfo]
+    边关系: List[EdgeInfo]
+
+
+class OriginPossibility(TypedDict):
+    来源节点: List[str]
+    概率: float
+    推理依据: str
+
+
+class AnalyzeOutput(TypedDict):
+    推理过程: str
+    来源可能性: List[OriginPossibility]
+
+
+# ===== 数据提取函数 =====
+
+def get_post_graph_files(config: PathConfig) -> List[Path]:
+    """获取所有帖子图谱文件"""
+    post_graph_dir = config.intermediate_dir / "post_graph"
+    return sorted(post_graph_dir.glob("*_帖子图谱.json"))
+
+
+def load_post_graph(file_path: Path) -> Dict:
+    """加载帖子图谱"""
+    with open(file_path, "r", encoding="utf-8") as f:
+        return json.load(f)
+
+
+def extract_tags_from_post_graph(post_graph: Dict) -> List[Dict]:
+    """
+    从帖子图谱中提取标签节点
+
+    筛选条件:type === "标签" 且 domain === "帖子"
+
+    Returns:
+        标签节点列表
+    """
+    tags = []
+    for node_id, node in post_graph.get("nodes", {}).items():
+        if node.get("type") == "标签" and node.get("domain") == "帖子":
+            tags.append({
+                "id": node_id,
+                "name": node.get("name", ""),
+                "dimension": node.get("dimension", ""),
+                "description": node.get("detail", {}).get("description", ""),
+                "pointNames": node.get("detail", {}).get("pointNames", []),
+            })
+    return tags
+
+
+def prepare_analyze_input(
+    post_graph: Dict,
+    target_name: str = None
+) -> AnalyzeInput:
+    """
+    准备分析输入数据
+
+    Args:
+        post_graph: 帖子图谱数据
+        target_name: 目标节点名称,如果为 None 则使用关键点标签的第一个
+
+    Returns:
+        AnalyzeInput 数据结构
+    """
+    # 提取所有标签节点
+    tags = extract_tags_from_post_graph(post_graph)
+
+    if not tags:
+        raise ValueError("帖子图谱中没有找到标签节点")
+
+    # 确定目标节点
+    if target_name:
+        target_tag = next((t for t in tags if t["name"] == target_name), None)
+        if not target_tag:
+            raise ValueError(f"未找到目标节点: {target_name}")
+    else:
+        # 默认使用关键点标签的第一个
+        key_point_tags = [t for t in tags if t["dimension"] == "关键点"]
+        if not key_point_tags:
+            raise ValueError("没有找到关键点标签")
+        target_tag = key_point_tags[0]
+
+    # 候选节点(排除目标节点)
+    candidate_tags = [t for t in tags if t["name"] != target_tag["name"]]
+
+    # 构建输入(包含特征类型信息)
+    return {
+        "目标特征": {
+            "特征名称": target_tag["name"],
+            "特征类型": target_tag["dimension"]
+        },
+        "候选特征": [
+            {
+                "特征名称": t["name"],
+                "特征类型": t["dimension"]
+            }
+            for t in candidate_tags
+        ],
+        "边关系": []  # 暂时为空
+    }
+
+
+# ===== Prompt 构建 =====
+
+def build_prompt(input_data: Dict, edges: List[EdgeInfo] = None) -> str:
+    """
+    构建分析 prompt
+
+    Args:
+        input_data: 分析输入数据(包含目标节点和候选节点,都带维度信息)
+        edges: 边关系列表
+
+    Returns:
+        prompt 文本
+    """
+    target = input_data["目标特征"]
+    candidates = input_data["候选特征"]
+    edges = edges or []
+
+    # 构建候选特征列表
+    candidates_text = []
+    for c in candidates:
+        candidates_text.append(f"- {c['特征名称']} ({c['特征类型']})")
+    candidates_section = "\n".join(candidates_text)
+
+    # 构建边关系文本
+    if edges:
+        edges_text = []
+        for e in edges:
+            edge_str = f"- {e['from_node']} → {e['to_node']}"
+            if e.get("关系"):
+                edge_str += f":{e['关系']}"
+            if e.get("概率") is not None:
+                edge_str += f"(概率: {e['概率']:.2f})"
+            edges_text.append(edge_str)
+        edges_section = "\n".join(edges_text)
+    else:
+        edges_section = "(暂无已知关系)"
+
+    return f'''你是一个内容创作逆向工程分析专家。你的任务是分析给定的目标特征可能由哪些候选特征推导而来。
+
+## 目标关键特征
+{target['特征名称']} ({target['特征类型']})
+
+## 候选特征
+{candidates_section}
+
+## 已知特征关系(仅供参考)
+{edges_section}
+
+## 第一步:固有资产判定
+
+首先判断目标特征是否为**固有资产/前提条件**:
+
+**固有资产的特征**:
+- 账号的基础设定(如:萌宠账号的"猫咪主角"、美食博主的"厨艺技能")
+- 创作者本身拥有的资源(如:有猫、会画画、有专业知识)
+- 不是针对某个内容"选择"的,而是账号/创作者的固有属性
+
+**判定方法**:
+问自己:这个特征是"创作者选择加入的"还是"创作者本来就有的"?
+- 如果是"本来就有的" → 固有资产,不需要从其他特征推导
+- 如果是"选择加入的" → 可推导特征,继续分析
+
+**重要**:如果目标特征是固有资产,应该返回空的来源列表,并说明原因。
+
+## 第二步:因果方向检验(仅当目标特征非固有资产时)
+
+在判断"候选特征 A 是否能推导出目标特征 T"之前,必须进行因果方向检验:
+
+1. **正向概率 P(A→T)**:假设 A 存在,推导出 T 的概率
+2. **反向概率 P(T→A)**:假设 T 存在,推导出 A 的概率
+
+**判定规则**:
+- 只有当 P(A→T) > P(T→A) 时,A 才能作为 T 的来源特征
+- 如果 P(T→A) >= P(A→T),说明 A 更可能是 T 的结果/表现形式
+
+**警惕"利用关系"伪装成"因果关系"**:
+- 错误:因为要"提供情绪价值",所以选择了"猫咪主角"
+- 正确:因为已有"猫咪主角"(固有资产),所以用它来"提供情绪价值"
+- 区别:"提供情绪价值"是对猫咪的利用方式,不是选择猫咪的原因
+
+## 输出格式
+使用JSON格式输出,结构如下:
+{{
+  "目标关键特征": "...",
+  "固有资产判定": {{
+    "是否固有资产": true/false,
+    "判定理由": "..."
+  }},
+  "推理类型分类": {{
+    "单独推理": [
+      {{
+        "排名": 1,
+        "特征名称": "...",
+        "特征类型": "灵感点/目的点/关键点",
+        "正向概率": 0.xx,
+        "反向概率": 0.xx,
+        "可能性": 0.xx,
+        "推理说明": "..."
+      }}
+    ],
+    "组合推理": [
+      {{
+        "组合编号": 1,
+        "组合成员": ["...", "..."],
+        "成员类型": ["...", "..."],
+        "正向概率": 0.xx,
+        "反向概率": 0.xx,
+        "可能性": 0.xx,
+        "单独可能性": {{
+          "成员1": 0.xx,
+          "成员2": 0.xx
+        }},
+        "协同效应分析": {{
+          "单独平均值": 0.xx,
+          "协同增益": 0.xx,
+          "增益说明": "..."
+        }},
+        "推理说明": "..."
+      }}
+    ],
+    "排除特征": [
+      {{
+        "特征名称": "...",
+        "特征类型": "...",
+        "正向概率": 0.xx,
+        "反向概率": 0.xx,
+        "排除原因": "..."
+      }}
+    ]
+  }}
+}}
+
+**注意**:如果目标特征是固有资产,"单独推理"和"组合推理"应为空数组,所有候选特征都应放入"排除特征"。
+
+## 注意事项
+1. **固有资产优先判定**:先判断目标特征是否为固有资产
+2. **警惕利用关系**:目的点对关键点的"利用"不等于"推导"
+3. 可能性数值需要合理评估,范围在0-1之间
+4. 单独推理按可能性从高到低排序
+5. 组合推理必须包含2个或以上成员
+6. 推理说明要清晰说明推导逻辑
+'''.strip()
+
+
+# ===== 主分析函数 =====
+
+async def analyze_node_origin(
+    post_id: str = None,
+    target_name: str = None,
+    config: PathConfig = None
+) -> Dict:
+    """
+    分析目标节点可能由哪些候选节点推导而来
+
+    Args:
+        post_id: 帖子ID,默认使用第一个帖子
+        target_name: 目标节点名称,默认使用关键点标签的第一个
+        config: 路径配置,如果为 None 则创建默认配置
+
+    Returns:
+        分析结果
+    """
+    if config is None:
+        config = PathConfig()
+
+    # 获取帖子图谱文件
+    post_graph_files = get_post_graph_files(config)
+    if not post_graph_files:
+        raise ValueError("没有找到帖子图谱文件")
+
+    # 选择帖子
+    if post_id:
+        target_file = next(
+            (f for f in post_graph_files if post_id in f.name),
+            None
+        )
+        if not target_file:
+            raise ValueError(f"未找到帖子: {post_id}")
+    else:
+        target_file = post_graph_files[0]  # 默认第一个
+
+    # 加载帖子图谱
+    post_graph = load_post_graph(target_file)
+    actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
+
+    # 准备输入数据
+    input_data = prepare_analyze_input(post_graph, target_name)
+    actual_target_name = input_data["目标特征"]["特征名称"]
+
+    # 构建 prompt
+    prompt = build_prompt(input_data, input_data.get("边关系", []))
+
+    print(f"帖子ID: {actual_post_id}")
+    print(f"目标特征: {actual_target_name}")
+    print(f"候选特征数: {len(input_data['候选特征'])}")
+    print()
+
+    # 调试:打印 prompt(取消注释以启用)
+    # print("=" * 40)
+    # print("Prompt 预览:")
+    # print(prompt[:2000])
+    # print("...")
+    # print("=" * 40)
+
+    # 使用 custom_span 标识分析流程
+    with custom_span(
+        name=f"分析特征来源 - {actual_target_name}",
+        data={
+            "帖子id": actual_post_id,
+            "目标特征": actual_target_name,
+            "候选特征数": len(input_data["候选特征"]),
+            "模型": MODEL_NAME
+        }
+    ):
+        # 调用 agent
+        result = await Runner.run(agent, input=prompt)
+        output = result.final_output
+
+    # 解析 JSON
+    try:
+        if "```json" in output:
+            json_start = output.find("```json") + 7
+            json_end = output.find("```", json_start)
+            json_str = output[json_start:json_end].strip()
+        elif "{" in output and "}" in output:
+            json_start = output.find("{")
+            json_end = output.rfind("}") + 1
+            json_str = output[json_start:json_end]
+        else:
+            json_str = output
+
+        analysis_result = json.loads(json_str)
+
+        return {
+            "帖子id": actual_post_id,
+            "目标节点": actual_target_name,
+            "模型": MODEL_NAME,
+            "输入": input_data,
+            "输出": analysis_result
+        }
+    except Exception as e:
+        return {
+            "帖子id": actual_post_id,
+            "目标节点": actual_target_name,
+            "模型": MODEL_NAME,
+            "输入": input_data,
+            "输出": None,
+            "错误": str(e),
+            "原始输出": output
+        }
+
+
+# ===== 辅助函数 =====
+
+def get_all_target_names(post_graph: Dict) -> List[str]:
+    """获取所有可作为目标的特征名称(关键点标签)"""
+    tags = extract_tags_from_post_graph(post_graph)
+    # 返回所有关键点标签的名称
+    return [t["name"] for t in tags if t["dimension"] == "关键点"]
+
+
+def display_result(result: Dict):
+    """显示单个分析结果"""
+    output = result.get("输出")
+    if output:
+        print(f"\n目标关键特征: {output.get('目标关键特征', 'N/A')}")
+
+        # 固有资产判定
+        asset_check = output.get("固有资产判定", {})
+        if asset_check.get("是否固有资产"):
+            print(f"  → 固有资产: {asset_check.get('判定理由', '')[:60]}...")
+        else:
+            reasoning = output.get("推理类型分类", {})
+
+            # 显示单独推理
+            single = reasoning.get("单独推理", [])
+            if single:
+                print("  【单独推理】")
+                for item in single[:3]:  # 只显示前3个
+                    print(f"    [{item.get('可能性', 0):.2f}] {item.get('特征名称', '')}")
+
+            # 显示组合推理
+            combo = reasoning.get("组合推理", [])
+            if combo:
+                print("  【组合推理】")
+                for item in combo[:2]:  # 只显示前2个
+                    members = " + ".join(item.get("组合成员", []))
+                    print(f"    [{item.get('可能性', 0):.2f}] {members}")
+    else:
+        print(f"  分析失败: {result.get('错误', 'N/A')}")
+
+
+# ===== 主函数 =====
+
+async def main(
+    post_id: str = None,
+    target_name: str = None,
+    num_targets: int = 1,
+    current_time: str = None,
+    log_url: str = None
+):
+    """
+    主函数
+
+    Args:
+        post_id: 帖子ID,可选
+        target_name: 目标节点名称,可选(如果指定则只分析这一个)
+        num_targets: 要分析的目标特征数量(当 target_name 为空时生效)
+        current_time: 当前时间戳(从外部传入)
+        log_url: 日志链接(从外部传入)
+    """
+    config = PathConfig()
+
+    print(f"账号: {config.account_name}")
+    print(f"使用模型: {MODEL_NAME}")
+    if log_url:
+        print(f"Trace URL: {log_url}")
+    print()
+
+    # 获取帖子图谱文件
+    post_graph_files = get_post_graph_files(config)
+    if not post_graph_files:
+        print("错误: 没有找到帖子图谱文件")
+        return
+
+    # 选择帖子
+    if post_id:
+        target_file = next(
+            (f for f in post_graph_files if post_id in f.name),
+            None
+        )
+        if not target_file:
+            print(f"错误: 未找到帖子 {post_id}")
+            return
+    else:
+        target_file = post_graph_files[0]
+
+    # 加载帖子图谱
+    post_graph = load_post_graph(target_file)
+    actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
+    print(f"帖子ID: {actual_post_id}")
+
+    # 确定要分析的目标特征列表
+    if target_name:
+        target_names = [target_name]
+    else:
+        all_targets = get_all_target_names(post_graph)
+        target_names = all_targets[:num_targets]
+
+    print(f"待分析目标特征: {target_names}")
+    print("=" * 60)
+
+    # 输出目录
+    output_dir = config.intermediate_dir / "node_origin_analysis"
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    # 并发分析所有目标特征
+    async def analyze_single(name: str, index: int):
+        print(f"\n[{index}/{len(target_names)}] 开始分析: {name}")
+        result = await analyze_node_origin(
+            post_id=post_id,
+            target_name=name,
+            config=config
+        )
+        print(f"[{index}/{len(target_names)}] 完成: {name}")
+        display_result(result)
+        return {
+            "目标特征": result.get("目标节点"),
+            "固有资产判定": result.get("输出", {}).get("固有资产判定", {}),
+            "推理类型分类": result.get("输出", {}).get("推理类型分类", {}),
+            "输入": result.get("输入"),
+            "错误": result.get("错误")
+        }
+
+    # 创建并发任务
+    tasks = [
+        analyze_single(name, i)
+        for i, name in enumerate(target_names, 1)
+    ]
+
+    # 并发执行
+    all_results = await asyncio.gather(*tasks)
+
+    # 合并保存到一个文件
+    merged_output = {
+        "元数据": {
+            "current_time": current_time,
+            "log_url": log_url,
+            "model": MODEL_NAME
+        },
+        "帖子id": actual_post_id,
+        "分析结果列表": all_results
+    }
+
+    output_file = output_dir / f"{actual_post_id}_来源分析.json"
+    with open(output_file, "w", encoding="utf-8") as f:
+        json.dump(merged_output, f, ensure_ascii=False, indent=2)
+
+    print("\n" + "=" * 60)
+    print(f"完成! 共分析 {len(target_names)} 个目标特征")
+    print(f"结果已保存到: {output_file}")
+    if log_url:
+        print(f"Trace: {log_url}")
+
+
+if __name__ == "__main__":
+    import argparse
+
+    parser = argparse.ArgumentParser(description="分析节点来源")
+    parser.add_argument("--post-id", type=str, help="帖子ID")
+    parser.add_argument("--target", type=str, help="目标节点名称(指定则只分析这一个)")
+    parser.add_argument("--num", type=int, default=1, help="要分析的目标特征数量(默认1)")
+    parser.add_argument("--all", action="store_true", help="分析所有关键点")
+    args = parser.parse_args()
+
+    # 如果指定了 --all,则设置 num 为一个很大的数
+    if args.all:
+        args.num = 999
+
+    # 设置 trace
+    current_time, log_url = set_trace()
+
+    # 使用 trace 上下文包裹整个执行流程
+    with trace("节点来源分析"):
+        asyncio.run(main(
+            post_id=args.post_id,
+            target_name=args.target,
+            num_targets=args.num,
+            current_time=current_time,
+            log_url=log_url
+        ))

+ 425 - 0
script/data_processing/analyze_node_origin_v2.py

@@ -0,0 +1,425 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+特征来源分析脚本 V2
+
+基于过滤后的 how 解构结果,分析目标特征可能由哪些其他特征推导而来。
+
+输入:intermediate/filtered_results/ 中的过滤结果
+输出:特征来源分析结果
+"""
+
+import asyncio
+import json
+from pathlib import Path
+from typing import Dict, List, Optional
+import sys
+
+# 添加项目根目录到路径
+project_root = Path(__file__).parent.parent.parent
+sys.path.insert(0, str(project_root))
+
+from agents import Agent, Runner, ModelSettings, trace
+from agents.tracing.create import custom_span
+from lib.client import get_model
+from lib.my_trace import set_trace_smith as set_trace
+from script.data_processing.path_config import PathConfig
+
+# 模型配置
+MODEL_NAME = "google/gemini-3-pro-preview"
+# MODEL_NAME = 'anthropic/claude-sonnet-4.5'
+
+agent = Agent(
+    name="Feature Origin Analyzer",
+    model=get_model(MODEL_NAME),
+    model_settings=ModelSettings(
+        temperature=0.0,
+        max_tokens=65536,
+    ),
+    tools=[],
+)
+
+
+# ===== 数据提取函数 =====
+
+def extract_post_info(how_result: Dict) -> Dict:
+    """
+    从 how 解构结果中提取帖子信息(灵感点、目的点、关键点列表)
+
+    Args:
+        how_result: how解构结果
+
+    Returns:
+        包含三类点列表的字典,每个点含名称、描述、特征列表
+    """
+    result = {}
+
+    for point_type in ["灵感点", "目的点", "关键点"]:
+        point_list_key = f"{point_type}列表"
+        point_list = how_result.get(point_list_key, [])
+
+        extracted_points = []
+        for point in point_list:
+            # 提取特征名称列表
+            feature_names = []
+            for feature in point.get("特征列表", []):
+                feature_name = feature.get("特征名称", "")
+                if feature_name:
+                    feature_names.append(feature_name)
+
+            extracted_points.append({
+                "名称": point.get("名称", ""),
+                "描述": point.get("描述", ""),
+                "特征列表": feature_names
+            })
+
+        if extracted_points:
+            result[point_list_key] = extracted_points
+
+    return result
+
+
+def get_all_features(post_info: Dict) -> List[Dict]:
+    """
+    从帖子信息中提取所有特征(点+特征列表中的特征)
+
+    Args:
+        post_info: 帖子信息
+
+    Returns:
+        所有特征列表,包含名称和类型
+    """
+    features = []
+
+    for point_type in ["灵感点", "目的点", "关键点"]:
+        point_list_key = f"{point_type}列表"
+        for point in post_info.get(point_list_key, []):
+            # 添加点本身作为特征
+            features.append({
+                "特征名称": point["名称"],
+                "特征类型": point_type,
+                "描述": point.get("描述", "")
+            })
+
+    return features
+
+
+# ===== Prompt 构建 =====
+
+def build_prompt(target_feature: str, post_info: Dict) -> str:
+    """
+    构建分析 prompt
+
+    Args:
+        target_feature: 目标关键特征名称
+        post_info: 帖子信息
+
+    Returns:
+        prompt 文本
+    """
+    # 将帖子信息转为 JSON 格式
+    post_info_json = json.dumps(post_info, ensure_ascii=False, indent=4)
+
+    return f'''你是一个内容创作逆向工程分析专家。你的任务是分析给定的特征是如何从其他特征中推理得出的。
+请按照以下要求进行分析:
+
+## 目标关键特征
+{target_feature}
+
+## 帖子信息
+
+{post_info_json}
+
+## 分析任务
+将所有来源特征分为两类:
+
+### 1. 单独推理
+- 定义: 该特征单独存在时,可以独立推导出目标关键特征,无需其他特征辅助
+
+### 2. 组合推理
+- 定义: 2个或更多特征必须同时存在才能有效推导出目标关键特征
+
+## 输出格式
+使用JSON格式输出,结构如下:
+{{
+  "目标关键特征": "...",
+  "推理类型分类": {{
+    "单独推理": [
+      {{
+        "排名": 1,
+        "特征名称": "...",
+        "特征类型": "灵感点/目的点/关键点",
+        "可能性": 0.xx,
+        "推理说明": "..."
+      }}
+    ],
+    "组合推理": [
+      {{
+        "组合编号": 1,
+        "组合成员": ["...", "..."],
+        "成员类型": ["...", "..."],
+        "可能性": 0.xx,
+        "单独可能性": {{
+          "成员1": 0.xx,
+          "成员2": 0.xx
+        }},
+        "协同效应分析": {{
+          "单独平均值": 0.xx,
+          "协同增益": 0.xx,
+          "增益说明": "..."
+        }},
+        "推理说明": "..."
+      }}
+    ]
+  }}
+}}
+
+## 注意事项
+1. 可能性数值需要合理评估,范围在0-1之间
+2. 单独推理按可能性从高到低排序
+3. 组合推理必须包含2个或以上成员
+4. 协同增益 = 组合可能性 - 单独平均值
+5. 推理说明要清晰说明推导逻辑,避免空洞表述
+6. 每个特征只能属于一种推理类型,不能既是单独推理又是组合推理的成员
+7. 优先识别组合推理,剩余的特征作为单独推理
+8. 一般先有实质,再有形式,如,先有角色,再有服化道;除非形式是关键特征
+'''.strip()
+
+
+# ===== 主分析函数 =====
+
+async def analyze_feature_origin(
+    post_data: Dict,
+    target_feature: str = None
+) -> Dict:
+    """
+    分析单个帖子中目标特征的来源
+
+    Args:
+        post_data: 帖子数据(包含 how解构结果)
+        target_feature: 目标特征名称,如果为 None 则使用关键点的第一个
+
+    Returns:
+        分析结果
+    """
+    post_id = post_data.get("帖子id", "")
+    how_result = post_data.get("how解构结果", {})
+
+    # 提取帖子信息
+    post_info = extract_post_info(how_result)
+
+    if not post_info:
+        return {
+            "帖子id": post_id,
+            "模型": MODEL_NAME,
+            "输入": {"帖子信息": {}},
+            "输出": None,
+            "错误": "没有可分析的点"
+        }
+
+    # 确定目标特征
+    if target_feature is None:
+        # 默认使用关键点的第一个
+        key_points = post_info.get("关键点列表", [])
+        if key_points:
+            target_feature = key_points[0]["名称"]
+        else:
+            return {
+                "帖子id": post_id,
+                "模型": MODEL_NAME,
+                "输入": {"帖子信息": post_info},
+                "输出": None,
+                "错误": "没有找到关键点"
+            }
+
+    # 构建 prompt
+    prompt = build_prompt(target_feature, post_info)
+
+    # 使用 custom_span 标识分析流程
+    with custom_span(
+        name=f"分析特征来源 - {target_feature}",
+        data={
+            "帖子id": post_id,
+            "目标特征": target_feature,
+            "模型": MODEL_NAME
+        }
+    ):
+        # 调用 agent
+        result = await Runner.run(agent, input=prompt)
+        output = result.final_output
+
+    # 解析 JSON
+    try:
+        if "```json" in output:
+            json_start = output.find("```json") + 7
+            json_end = output.find("```", json_start)
+            json_str = output[json_start:json_end].strip()
+        elif "{" in output and "}" in output:
+            json_start = output.find("{")
+            json_end = output.rfind("}") + 1
+            json_str = output[json_start:json_end]
+        else:
+            json_str = output
+
+        analysis_result = json.loads(json_str)
+
+        return {
+            "帖子id": post_id,
+            "目标特征": target_feature,
+            "模型": MODEL_NAME,
+            "输入": {
+                "帖子信息": post_info,
+                "prompt": prompt
+            },
+            "输出": analysis_result
+        }
+    except Exception as e:
+        return {
+            "帖子id": post_id,
+            "目标特征": target_feature,
+            "模型": MODEL_NAME,
+            "输入": {
+                "帖子信息": post_info,
+                "prompt": prompt
+            },
+            "输出": None,
+            "错误": str(e),
+            "原始输出": output
+        }
+
+
+# ===== 主函数 =====
+
+async def main(
+    post_id: str = None,
+    target_feature: str = None,
+    current_time: str = None,
+    log_url: str = None
+):
+    """
+    主函数
+
+    Args:
+        post_id: 帖子ID,可选(默认使用第一个)
+        target_feature: 目标特征名称,可选(默认使用关键点第一个)
+        current_time: 当前时间戳(从外部传入)
+        log_url: 日志链接(从外部传入)
+    """
+    config = PathConfig()
+
+    # 获取输入目录
+    input_dir = config.intermediate_dir / "filtered_results"
+    output_dir = config.intermediate_dir / "feature_origin_analysis"
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    print(f"账号: {config.account_name}")
+    print(f"输入目录: {input_dir}")
+    print(f"输出目录: {output_dir}")
+    print(f"使用模型: {MODEL_NAME}")
+    if log_url:
+        print(f"Trace URL: {log_url}")
+    print()
+
+    # 获取输入文件
+    input_files = sorted(input_dir.glob("*_filtered.json"))
+    if not input_files:
+        print(f"错误: 在 {input_dir} 中没有找到任何 *_filtered.json 文件")
+        return
+
+    # 选择帖子
+    if post_id:
+        target_file = next(
+            (f for f in input_files if post_id in f.name),
+            None
+        )
+        if not target_file:
+            print(f"错误: 未找到帖子 {post_id}")
+            return
+    else:
+        target_file = input_files[0]  # 默认第一个
+
+    # 读取文件
+    with open(target_file, "r", encoding="utf-8") as f:
+        post_data = json.load(f)
+
+    actual_post_id = post_data.get("帖子id", "unknown")
+    print(f"帖子ID: {actual_post_id}")
+    print(f"目标特征: {target_feature or '(默认关键点第一个)'}")
+    print()
+
+    # 分析
+    result = await analyze_feature_origin(post_data, target_feature)
+
+    # 显示结果
+    output = result.get("输出")
+    if output:
+        print("=" * 60)
+        print("分析结果")
+        print("=" * 60)
+        print(f"\n目标关键特征: {output.get('目标关键特征', 'N/A')}\n")
+
+        reasoning = output.get("推理类型分类", {})
+
+        # 显示单独推理
+        single = reasoning.get("单独推理", [])
+        if single:
+            print("【单独推理】")
+            for item in single:
+                print(f"  #{item.get('排名', '-')} [{item.get('可能性', 0):.2f}] {item.get('特征名称', '')} ({item.get('特征类型', '')})")
+                print(f"      {item.get('推理说明', '')}")
+
+        # 显示组合推理
+        combo = reasoning.get("组合推理", [])
+        if combo:
+            print("\n【组合推理】")
+            for item in combo:
+                members = " + ".join(item.get("组合成员", []))
+                prob = item.get("可能性", 0)
+                synergy = item.get("协同效应分析", {})
+                gain = synergy.get("协同增益", 0)
+                print(f"  组合{item.get('组合编号', '-')}: [{prob:.2f}] {members}")
+                print(f"      协同增益: {gain:+.2f}")
+                print(f"      {item.get('推理说明', '')}")
+    else:
+        print(f"分析失败: {result.get('错误', 'N/A')}")
+
+    # 保存结果
+    target_name = result.get("目标特征", "unknown")
+    output_file = output_dir / f"{actual_post_id}_{target_name}_来源分析.json"
+
+    save_data = {
+        "元数据": {
+            "current_time": current_time,
+            "log_url": log_url,
+            "model": MODEL_NAME
+        },
+        **result
+    }
+
+    with open(output_file, "w", encoding="utf-8") as f:
+        json.dump(save_data, f, ensure_ascii=False, indent=2)
+
+    print(f"\n结果已保存到: {output_file}")
+    if log_url:
+        print(f"Trace: {log_url}")
+
+
+if __name__ == "__main__":
+    import argparse
+
+    parser = argparse.ArgumentParser(description="分析特征来源 V2")
+    parser.add_argument("--post-id", type=str, help="帖子ID")
+    parser.add_argument("--target", type=str, help="目标特征名称")
+    args = parser.parse_args()
+
+    # 设置 trace
+    current_time, log_url = set_trace()
+
+    # 使用 trace 上下文包裹整个执行流程
+    with trace("特征来源分析V2"):
+        asyncio.run(main(
+            post_id=args.post_id,
+            target_feature=args.target,
+            current_time=current_time,
+            log_url=log_url
+        ))