| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 节点来源分析脚本
- 给定一个目标节点,推断它可能由哪些候选节点推导而来。
- 输入:post_graph 目录中的帖子图谱文件
- 输出:节点来源分析结果
- """
- import asyncio
- import json
- from pathlib import Path
- from typing import Dict, List, Optional, TypedDict
- import sys
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(project_root))
- from agents import Agent, Runner, ModelSettings, trace
- from agents.tracing.create import custom_span
- from lib.client import get_model
- from lib.my_trace import set_trace_smith as set_trace
- from script.data_processing.path_config import PathConfig
- # 模型配置
- MODEL_NAME = "google/gemini-3-pro-preview"
- # MODEL_NAME = 'deepseek/deepseek-v3.2'
- # MODEL_NAME = 'anthropic/claude-sonnet-4.5'
- agent = Agent(
- name="Node Origin Analyzer",
- model=get_model(MODEL_NAME),
- model_settings=ModelSettings(
- temperature=0.0,
- max_tokens=65536,
- ),
- tools=[],
- )
- # ===== 类型定义 =====
- class NodeInfo(TypedDict):
- 名称: str
- 描述: str
- class EdgeInfo(TypedDict):
- from_node: str
- to_node: str
- 关系: Optional[str]
- 概率: Optional[float]
- class AnalyzeInput(TypedDict):
- 目标节点: NodeInfo
- 候选节点: List[NodeInfo]
- 边关系: List[EdgeInfo]
- class OriginPossibility(TypedDict):
- 来源节点: List[str]
- 概率: float
- 推理依据: str
- class AnalyzeOutput(TypedDict):
- 推理过程: str
- 来源可能性: List[OriginPossibility]
- # ===== 数据提取函数 =====
- def get_post_graph_files(config: PathConfig) -> List[Path]:
- """获取所有帖子图谱文件"""
- post_graph_dir = config.intermediate_dir / "post_graph"
- return sorted(post_graph_dir.glob("*_帖子图谱.json"))
- def load_post_graph(file_path: Path) -> Dict:
- """加载帖子图谱"""
- with open(file_path, "r", encoding="utf-8") as f:
- return json.load(f)
- def extract_tags_from_post_graph(post_graph: Dict) -> List[Dict]:
- """
- 从帖子图谱中提取标签节点
- 筛选条件:type === "标签" 且 domain === "帖子"
- Returns:
- 标签节点列表
- """
- tags = []
- for node_id, node in post_graph.get("nodes", {}).items():
- if node.get("type") == "标签" and node.get("domain") == "帖子":
- tags.append({
- "id": node_id,
- "name": node.get("name", ""),
- "dimension": node.get("dimension", ""),
- "description": node.get("detail", {}).get("description", ""),
- "pointNames": node.get("detail", {}).get("pointNames", []),
- })
- return tags
- def prepare_analyze_input(
- post_graph: Dict,
- target_name: str = None
- ) -> AnalyzeInput:
- """
- 准备分析输入数据
- Args:
- post_graph: 帖子图谱数据
- target_name: 目标节点名称,如果为 None 则使用关键点标签的第一个
- Returns:
- AnalyzeInput 数据结构
- """
- # 提取所有标签节点
- tags = extract_tags_from_post_graph(post_graph)
- if not tags:
- raise ValueError("帖子图谱中没有找到标签节点")
- # 确定目标节点
- if target_name:
- target_tag = next((t for t in tags if t["name"] == target_name), None)
- if not target_tag:
- raise ValueError(f"未找到目标节点: {target_name}")
- else:
- # 默认使用关键点标签的第一个
- key_point_tags = [t for t in tags if t["dimension"] == "关键点"]
- if not key_point_tags:
- raise ValueError("没有找到关键点标签")
- target_tag = key_point_tags[0]
- # 候选节点(排除目标节点)
- candidate_tags = [t for t in tags if t["name"] != target_tag["name"]]
- # 构建输入(包含特征类型信息)
- return {
- "目标特征": {
- "特征名称": target_tag["name"],
- "特征类型": target_tag["dimension"]
- },
- "候选特征": [
- {
- "特征名称": t["name"],
- "特征类型": t["dimension"]
- }
- for t in candidate_tags
- ],
- "边关系": [] # 暂时为空
- }
- # ===== Prompt 构建 =====
- def build_prompt(input_data: Dict, edges: List[EdgeInfo] = None) -> str:
- """
- 构建分析 prompt
- Args:
- input_data: 分析输入数据(包含目标节点和候选节点,都带维度信息)
- edges: 边关系列表
- Returns:
- prompt 文本
- """
- target = input_data["目标特征"]
- candidates = input_data["候选特征"]
- edges = edges or []
- # 构建候选特征列表
- candidates_text = []
- for c in candidates:
- candidates_text.append(f"- {c['特征名称']} ({c['特征类型']})")
- candidates_section = "\n".join(candidates_text)
- # 构建边关系文本
- if edges:
- edges_text = []
- for e in edges:
- edge_str = f"- {e['from_node']} → {e['to_node']}"
- if e.get("关系"):
- edge_str += f":{e['关系']}"
- if e.get("概率") is not None:
- edge_str += f"(概率: {e['概率']:.2f})"
- edges_text.append(edge_str)
- edges_section = "\n".join(edges_text)
- else:
- edges_section = "(暂无已知关系)"
- return f'''你是一个内容创作逆向工程分析专家。你的任务是分析给定的目标特征可能由哪些候选特征推导而来。
- ## 目标关键特征
- {target['特征名称']} ({target['特征类型']})
- ## 候选特征
- {candidates_section}
- ## 已知特征关系(仅供参考)
- {edges_section}
- ## 第一步:固有资产判定
- 首先判断目标特征是否为**固有资产/前提条件**:
- **固有资产的特征**:
- - 账号的基础设定(如:萌宠账号的"猫咪主角"、美食博主的"厨艺技能")
- - 创作者本身拥有的资源(如:有猫、会画画、有专业知识)
- - 不是针对某个内容"选择"的,而是账号/创作者的固有属性
- **判定方法**:
- 问自己:这个特征是"创作者选择加入的"还是"创作者本来就有的"?
- - 如果是"本来就有的" → 固有资产,不需要从其他特征推导
- - 如果是"选择加入的" → 可推导特征,继续分析
- **重要**:如果目标特征是固有资产,应该返回空的来源列表,并说明原因。
- ## 第二步:因果方向检验(仅当目标特征非固有资产时)
- 在判断"候选特征 A 是否能推导出目标特征 T"之前,必须进行因果方向检验:
- 1. **正向概率 P(A→T)**:假设 A 存在,推导出 T 的概率
- 2. **反向概率 P(T→A)**:假设 T 存在,推导出 A 的概率
- **判定规则**:
- - 只有当 P(A→T) > P(T→A) 时,A 才能作为 T 的来源特征
- - 如果 P(T→A) >= P(A→T),说明 A 更可能是 T 的结果/表现形式
- **警惕"利用关系"伪装成"因果关系"**:
- - 错误:因为要"提供情绪价值",所以选择了"猫咪主角"
- - 正确:因为已有"猫咪主角"(固有资产),所以用它来"提供情绪价值"
- - 区别:"提供情绪价值"是对猫咪的利用方式,不是选择猫咪的原因
- ## 输出格式
- 使用JSON格式输出,结构如下:
- {{
- "目标关键特征": "...",
- "固有资产判定": {{
- "是否固有资产": true/false,
- "判定理由": "..."
- }},
- "推理类型分类": {{
- "单独推理": [
- {{
- "排名": 1,
- "特征名称": "...",
- "特征类型": "灵感点/目的点/关键点",
- "正向概率": 0.xx,
- "反向概率": 0.xx,
- "可能性": 0.xx,
- "推理说明": "..."
- }}
- ],
- "组合推理": [
- {{
- "组合编号": 1,
- "组合成员": ["...", "..."],
- "成员类型": ["...", "..."],
- "正向概率": 0.xx,
- "反向概率": 0.xx,
- "可能性": 0.xx,
- "单独可能性": {{
- "成员1": 0.xx,
- "成员2": 0.xx
- }},
- "协同效应分析": {{
- "单独平均值": 0.xx,
- "协同增益": 0.xx,
- "增益说明": "..."
- }},
- "推理说明": "..."
- }}
- ],
- "排除特征": [
- {{
- "特征名称": "...",
- "特征类型": "...",
- "正向概率": 0.xx,
- "反向概率": 0.xx,
- "排除原因": "..."
- }}
- ]
- }}
- }}
- **注意**:如果目标特征是固有资产,"单独推理"和"组合推理"应为空数组,所有候选特征都应放入"排除特征"。
- ## 注意事项
- 1. **固有资产优先判定**:先判断目标特征是否为固有资产
- 2. **警惕利用关系**:目的点对关键点的"利用"不等于"推导"
- 3. 可能性数值需要合理评估,范围在0-1之间
- 4. 单独推理按可能性从高到低排序
- 5. 组合推理必须包含2个或以上成员
- 6. 推理说明要清晰说明推导逻辑
- '''.strip()
- # ===== 主分析函数 =====
- async def analyze_node_origin(
- post_id: str = None,
- target_name: str = None,
- config: PathConfig = None
- ) -> Dict:
- """
- 分析目标节点可能由哪些候选节点推导而来
- Args:
- post_id: 帖子ID,默认使用第一个帖子
- target_name: 目标节点名称,默认使用关键点标签的第一个
- config: 路径配置,如果为 None 则创建默认配置
- Returns:
- 分析结果
- """
- if config is None:
- config = PathConfig()
- # 获取帖子图谱文件
- post_graph_files = get_post_graph_files(config)
- if not post_graph_files:
- raise ValueError("没有找到帖子图谱文件")
- # 选择帖子
- if post_id:
- target_file = next(
- (f for f in post_graph_files if post_id in f.name),
- None
- )
- if not target_file:
- raise ValueError(f"未找到帖子: {post_id}")
- else:
- target_file = post_graph_files[0] # 默认第一个
- # 加载帖子图谱
- post_graph = load_post_graph(target_file)
- actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
- # 准备输入数据
- input_data = prepare_analyze_input(post_graph, target_name)
- actual_target_name = input_data["目标特征"]["特征名称"]
- # 构建 prompt
- prompt = build_prompt(input_data, input_data.get("边关系", []))
- print(f"帖子ID: {actual_post_id}")
- print(f"目标特征: {actual_target_name}")
- print(f"候选特征数: {len(input_data['候选特征'])}")
- print()
- # 调试:打印 prompt(取消注释以启用)
- # print("=" * 40)
- # print("Prompt 预览:")
- # print(prompt[:2000])
- # print("...")
- # print("=" * 40)
- # 使用 custom_span 标识分析流程
- with custom_span(
- name=f"分析特征来源 - {actual_target_name}",
- data={
- "帖子id": actual_post_id,
- "目标特征": actual_target_name,
- "候选特征数": len(input_data["候选特征"]),
- "模型": MODEL_NAME
- }
- ):
- # 调用 agent
- result = await Runner.run(agent, input=prompt)
- output = result.final_output
- # 解析 JSON
- try:
- if "```json" in output:
- json_start = output.find("```json") + 7
- json_end = output.find("```", json_start)
- json_str = output[json_start:json_end].strip()
- elif "{" in output and "}" in output:
- json_start = output.find("{")
- json_end = output.rfind("}") + 1
- json_str = output[json_start:json_end]
- else:
- json_str = output
- analysis_result = json.loads(json_str)
- return {
- "帖子id": actual_post_id,
- "目标节点": actual_target_name,
- "模型": MODEL_NAME,
- "输入": input_data,
- "输出": analysis_result
- }
- except Exception as e:
- return {
- "帖子id": actual_post_id,
- "目标节点": actual_target_name,
- "模型": MODEL_NAME,
- "输入": input_data,
- "输出": None,
- "错误": str(e),
- "原始输出": output
- }
- # ===== 图谱构建函数 =====
- def build_origin_graph(all_results: List[Dict], post_id: str) -> Dict:
- """
- 将分析结果转换为图谱格式
- Args:
- all_results: 所有目标特征的分析结果
- post_id: 帖子ID
- Returns:
- 图谱数据,包含 nodes 和 edges
- """
- nodes = {}
- edges = {}
- # 从输入收集所有特征节点(不添加额外信息)
- for result in all_results:
- target_input = result.get("输入", {})
- # 添加目标节点
- target_info = target_input.get("目标特征", {})
- target_name = target_info.get("特征名称", "")
- target_type = target_info.get("特征类型", "关键点")
- node_id = f"帖子:{target_type}:标签:{target_name}"
- if node_id not in nodes:
- nodes[node_id] = {
- "name": target_name,
- "type": "标签",
- "dimension": target_type,
- "domain": "帖子",
- "detail": {}
- }
- # 添加候选特征节点
- for candidate in target_input.get("候选特征", []):
- c_name = candidate.get("特征名称", "")
- c_type = candidate.get("特征类型", "关键点")
- c_node_id = f"帖子:{c_type}:标签:{c_name}"
- if c_node_id not in nodes:
- nodes[c_node_id] = {
- "name": c_name,
- "type": "标签",
- "dimension": c_type,
- "domain": "帖子",
- "detail": {}
- }
- # 构建推导边
- for result in all_results:
- target_name = result.get("目标特征", "")
- target_input = result.get("输入", {})
- target_info = target_input.get("目标特征", {})
- target_type = target_info.get("特征类型", "关键点")
- target_node_id = f"帖子:{target_type}:标签:{target_name}"
- reasoning = result.get("推理类型分类", {})
- # 单独推理的边
- for item in reasoning.get("单独推理", []):
- source_name = item.get("特征名称", "")
- source_type = item.get("特征类型", "关键点")
- source_node_id = f"帖子:{source_type}:标签:{source_name}"
- probability = item.get("可能性", 0)
- edge_id = f"{source_node_id}|推导|{target_node_id}"
- edges[edge_id] = {
- "source": source_node_id,
- "target": target_node_id,
- "type": "推导",
- "score": probability,
- "detail": {
- "推理类型": "单独推理",
- "正向概率": item.get("正向概率", 0),
- "反向概率": item.get("反向概率", 0),
- "推理说明": item.get("推理说明", "")
- }
- }
- # 组合推理的边(用虚拟节点表示组合)
- for item in reasoning.get("组合推理", []):
- members = item.get("组合成员", [])
- member_types = item.get("成员类型", [])
- probability = item.get("可能性", 0)
- # 创建组合虚拟节点(排序成员以保证唯一性)
- # 将成员和类型配对后排序
- member_pairs = list(zip(members, member_types)) if len(member_types) == len(members) else [(m, "关键点") for m in members]
- sorted_pairs = sorted(member_pairs, key=lambda x: x[0])
- sorted_members = [p[0] for p in sorted_pairs]
- sorted_types = [p[1] for p in sorted_pairs]
- # 组合名称和ID包含类型信息
- combo_parts = [f"{sorted_types[i]}:{m}" for i, m in enumerate(sorted_members)]
- combo_name = " + ".join(combo_parts)
- combo_node_id = f"帖子:组合:组合:{combo_name}"
- if combo_node_id not in nodes:
- nodes[combo_node_id] = {
- "name": combo_name,
- "type": "组合",
- "dimension": "组合",
- "domain": "帖子",
- "detail": {
- "成员": sorted_members,
- "成员类型": sorted_types
- }
- }
- # 组合节点到目标的边
- edge_id = f"{combo_node_id}|推导|{target_node_id}"
- edges[edge_id] = {
- "source": combo_node_id,
- "target": target_node_id,
- "type": "推导",
- "score": probability,
- "detail": {
- "推理类型": "组合推理",
- "正向概率": item.get("正向概率", 0),
- "反向概率": item.get("反向概率", 0),
- "协同增益": item.get("协同效应分析", {}).get("协同增益", 0),
- "推理说明": item.get("推理说明", "")
- }
- }
- # 成员到组合节点的边
- for i, member in enumerate(sorted_members):
- m_type = sorted_types[i]
- m_node_id = f"帖子:{m_type}:标签:{member}"
- m_edge_id = f"{m_node_id}|组成|{combo_node_id}"
- if m_edge_id not in edges:
- edges[m_edge_id] = {
- "source": m_node_id,
- "target": combo_node_id,
- "type": "组成",
- "score": 1.0,
- "detail": {}
- }
- return {
- "meta": {
- "postId": post_id,
- "type": "推导图谱",
- "stats": {
- "nodeCount": len(nodes),
- "edgeCount": len(edges)
- }
- },
- "nodes": nodes,
- "edges": edges
- }
- # ===== 辅助函数 =====
- def get_all_target_names(post_graph: Dict) -> List[str]:
- """获取所有可作为目标的特征名称(关键点标签)"""
- tags = extract_tags_from_post_graph(post_graph)
- # 返回所有关键点标签的名称
- return [t["name"] for t in tags if t["dimension"] == "关键点"]
- def display_result(result: Dict):
- """显示单个分析结果"""
- output = result.get("输出")
- if output:
- print(f"\n目标关键特征: {output.get('目标关键特征', 'N/A')}")
- # 固有资产判定
- asset_check = output.get("固有资产判定", {})
- if asset_check.get("是否固有资产"):
- print(f" → 固有资产: {asset_check.get('判定理由', '')[:60]}...")
- else:
- reasoning = output.get("推理类型分类", {})
- # 显示单独推理
- single = reasoning.get("单独推理", [])
- if single:
- print(" 【单独推理】")
- for item in single[:3]: # 只显示前3个
- print(f" [{item.get('可能性', 0):.2f}] {item.get('特征名称', '')}")
- # 显示组合推理
- combo = reasoning.get("组合推理", [])
- if combo:
- print(" 【组合推理】")
- for item in combo[:2]: # 只显示前2个
- members = " + ".join(item.get("组合成员", []))
- print(f" [{item.get('可能性', 0):.2f}] {members}")
- else:
- print(f" 分析失败: {result.get('错误', 'N/A')}")
- # ===== 主函数 =====
- async def main(
- post_id: str = None,
- target_name: str = None,
- num_targets: int = 1,
- current_time: str = None,
- log_url: str = None
- ):
- """
- 主函数
- Args:
- post_id: 帖子ID,可选
- target_name: 目标节点名称,可选(如果指定则只分析这一个)
- num_targets: 要分析的目标特征数量(当 target_name 为空时生效)
- current_time: 当前时间戳(从外部传入)
- log_url: 日志链接(从外部传入)
- """
- config = PathConfig()
- print(f"账号: {config.account_name}")
- print(f"使用模型: {MODEL_NAME}")
- if log_url:
- print(f"Trace URL: {log_url}")
- print()
- # 获取帖子图谱文件
- post_graph_files = get_post_graph_files(config)
- if not post_graph_files:
- print("错误: 没有找到帖子图谱文件")
- return
- # 选择帖子
- if post_id:
- target_file = next(
- (f for f in post_graph_files if post_id in f.name),
- None
- )
- if not target_file:
- print(f"错误: 未找到帖子 {post_id}")
- return
- else:
- target_file = post_graph_files[0]
- # 加载帖子图谱
- post_graph = load_post_graph(target_file)
- actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
- print(f"帖子ID: {actual_post_id}")
- # 确定要分析的目标特征列表
- if target_name:
- target_names = [target_name]
- else:
- all_targets = get_all_target_names(post_graph)
- target_names = all_targets[:num_targets]
- print(f"待分析目标特征: {target_names}")
- print("=" * 60)
- # 输出目录
- output_dir = config.intermediate_dir / "node_origin_analysis"
- output_dir.mkdir(parents=True, exist_ok=True)
- # 并发分析所有目标特征
- async def analyze_single(name: str, index: int):
- print(f"\n[{index}/{len(target_names)}] 开始分析: {name}")
- result = await analyze_node_origin(
- post_id=post_id,
- target_name=name,
- config=config
- )
- print(f"[{index}/{len(target_names)}] 完成: {name}")
- display_result(result)
- return {
- "目标特征": result.get("目标节点"),
- "固有资产判定": result.get("输出", {}).get("固有资产判定", {}),
- "推理类型分类": result.get("输出", {}).get("推理类型分类", {}),
- "输入": result.get("输入"),
- "错误": result.get("错误")
- }
- # 创建并发任务
- tasks = [
- analyze_single(name, i)
- for i, name in enumerate(target_names, 1)
- ]
- # 并发执行
- all_results = await asyncio.gather(*tasks)
- # 合并保存到一个文件
- merged_output = {
- "元数据": {
- "current_time": current_time,
- "log_url": log_url,
- "model": MODEL_NAME
- },
- "帖子id": actual_post_id,
- "分析结果列表": all_results
- }
- output_file = output_dir / f"{actual_post_id}_来源分析.json"
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(merged_output, f, ensure_ascii=False, indent=2)
- # 生成推导关系图谱
- graph_output = build_origin_graph(all_results, actual_post_id)
- graph_file = output_dir / f"{actual_post_id}_推导图谱.json"
- with open(graph_file, "w", encoding="utf-8") as f:
- json.dump(graph_output, f, ensure_ascii=False, indent=2)
- print("\n" + "=" * 60)
- print(f"完成! 共分析 {len(target_names)} 个目标特征")
- print(f"分析结果: {output_file}")
- print(f"推导图谱: {graph_file}")
- if log_url:
- print(f"Trace: {log_url}")
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(description="分析节点来源")
- parser.add_argument("--post-id", type=str, help="帖子ID")
- parser.add_argument("--target", type=str, help="目标节点名称(指定则只分析这一个)")
- parser.add_argument("--num", type=int, default=1, help="要分析的目标特征数量(默认1)")
- parser.add_argument("--all", action="store_true", help="分析所有关键点")
- args = parser.parse_args()
- # 如果指定了 --all,则设置 num 为一个很大的数
- if args.all:
- args.num = 999
- # 设置 trace
- current_time, log_url = set_trace()
- # 使用 trace 上下文包裹整个执行流程
- with trace("节点来源分析"):
- asyncio.run(main(
- post_id=args.post_id,
- target_name=args.target,
- num_targets=args.num,
- current_time=current_time,
- log_url=log_url
- ))
|