| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 节点来源分析脚本
- 给定一个目标节点,推断它可能由哪些候选节点推导而来。
- 输入:post_graph 目录中的帖子图谱文件
- 输出:节点来源分析结果
- """
- import asyncio
- import json
- from pathlib import Path
- from typing import Dict, List, Optional, TypedDict
- import sys
- # 添加项目根目录到路径
- project_root = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(project_root))
- from agents import Agent, Runner, ModelSettings, trace
- from agents.tracing.create import custom_span
- from lib.client import get_model
- from lib.my_trace import set_trace_smith as set_trace
- from script.data_processing.path_config import PathConfig
- # 模型配置
- MODEL_NAME = "google/gemini-3-pro-preview"
- # MODEL_NAME = 'deepseek/deepseek-v3.2'
- # MODEL_NAME = 'anthropic/claude-sonnet-4.5'
- agent = Agent(
- name="Node Origin Analyzer",
- model=get_model(MODEL_NAME),
- model_settings=ModelSettings(
- temperature=0.0,
- max_tokens=65536,
- ),
- tools=[],
- )
- # ===== 类型定义 =====
- class NodeInfo(TypedDict):
- 名称: str
- 描述: str
- class EdgeInfo(TypedDict):
- from_node: str
- to_node: str
- 关系: Optional[str]
- 概率: Optional[float]
- class AnalyzeInput(TypedDict):
- 目标节点: NodeInfo
- 候选节点: List[NodeInfo]
- 边关系: List[EdgeInfo]
- class OriginPossibility(TypedDict):
- 来源节点: List[str]
- 概率: float
- 推理依据: str
- class AnalyzeOutput(TypedDict):
- 推理过程: str
- 来源可能性: List[OriginPossibility]
- # ===== 数据提取函数 =====
- def get_post_graph_files(config: PathConfig) -> List[Path]:
- """获取所有帖子图谱文件"""
- post_graph_dir = config.intermediate_dir / "post_graph"
- return sorted(post_graph_dir.glob("*_帖子图谱.json"))
- def load_post_graph(file_path: Path) -> Dict:
- """加载帖子图谱"""
- with open(file_path, "r", encoding="utf-8") as f:
- return json.load(f)
- def extract_tags_from_post_graph(post_graph: Dict) -> List[Dict]:
- """
- 从帖子图谱中提取标签节点
- 筛选条件:type === "标签" 且 domain === "帖子"
- Returns:
- 标签节点列表
- """
- tags = []
- for node_id, node in post_graph.get("nodes", {}).items():
- if node.get("type") == "标签" and node.get("domain") == "帖子":
- tags.append({
- "id": node_id,
- "name": node.get("name", ""),
- "dimension": node.get("dimension", ""),
- "description": node.get("detail", {}).get("description", ""),
- "pointNames": node.get("detail", {}).get("pointNames", []),
- })
- return tags
- def prepare_analyze_input(
- post_graph: Dict,
- target_name: str = None
- ) -> AnalyzeInput:
- """
- 准备分析输入数据
- Args:
- post_graph: 帖子图谱数据
- target_name: 目标节点名称,如果为 None 则使用关键点标签的第一个
- Returns:
- AnalyzeInput 数据结构
- """
- # 提取所有标签节点
- tags = extract_tags_from_post_graph(post_graph)
- if not tags:
- raise ValueError("帖子图谱中没有找到标签节点")
- # 确定目标节点
- if target_name:
- target_tag = next((t for t in tags if t["name"] == target_name), None)
- if not target_tag:
- raise ValueError(f"未找到目标节点: {target_name}")
- else:
- # 默认使用关键点标签的第一个
- key_point_tags = [t for t in tags if t["dimension"] == "关键点"]
- if not key_point_tags:
- raise ValueError("没有找到关键点标签")
- target_tag = key_point_tags[0]
- # 候选节点(排除目标节点)
- candidate_tags = [t for t in tags if t["name"] != target_tag["name"]]
- # 构建输入(包含特征类型信息)
- return {
- "目标特征": {
- "特征名称": target_tag["name"],
- "特征类型": target_tag["dimension"]
- },
- "候选特征": [
- {
- "特征名称": t["name"],
- "特征类型": t["dimension"]
- }
- for t in candidate_tags
- ],
- "边关系": [] # 暂时为空
- }
- # ===== Prompt 构建 =====
- def build_prompt(input_data: Dict, edges: List[EdgeInfo] = None) -> str:
- """
- 构建分析 prompt
- Args:
- input_data: 分析输入数据(包含目标节点和候选节点,都带维度信息)
- edges: 边关系列表
- Returns:
- prompt 文本
- """
- target = input_data["目标特征"]
- candidates = input_data["候选特征"]
- edges = edges or []
- # 构建候选特征列表
- candidates_text = []
- for c in candidates:
- candidates_text.append(f"- {c['特征名称']} ({c['特征类型']})")
- candidates_section = "\n".join(candidates_text)
- # 构建边关系文本
- if edges:
- edges_text = []
- for e in edges:
- edge_str = f"- {e['from_node']} → {e['to_node']}"
- if e.get("关系"):
- edge_str += f":{e['关系']}"
- if e.get("概率") is not None:
- edge_str += f"(概率: {e['概率']:.2f})"
- edges_text.append(edge_str)
- edges_section = "\n".join(edges_text)
- else:
- edges_section = "(暂无已知关系)"
- return f'''你是一个内容创作逆向工程分析专家。你的任务是分析给定的目标特征可能由哪些候选特征推导而来。
- ## 目标关键特征
- {target['特征名称']} ({target['特征类型']})
- ## 候选特征
- {candidates_section}
- ## 已知特征关系(仅供参考)
- {edges_section}
- ## 第一步:固有资产判定
- 首先判断目标特征是否为**固有资产/前提条件**:
- **固有资产的特征**:
- - 账号的基础设定(如:萌宠账号的"猫咪主角"、美食博主的"厨艺技能")
- - 创作者本身拥有的资源(如:有猫、会画画、有专业知识)
- - 不是针对某个内容"选择"的,而是账号/创作者的固有属性
- **判定方法**:
- 问自己:这个特征是"创作者选择加入的"还是"创作者本来就有的"?
- - 如果是"本来就有的" → 固有资产,不需要从其他特征推导
- - 如果是"选择加入的" → 可推导特征,继续分析
- **重要**:如果目标特征是固有资产,应该返回空的来源列表,并说明原因。
- ## 第二步:因果方向检验(仅当目标特征非固有资产时)
- 在判断"候选特征 A 是否能推导出目标特征 T"之前,必须进行因果方向检验:
- 1. **正向概率 P(A→T)**:假设 A 存在,推导出 T 的概率
- 2. **反向概率 P(T→A)**:假设 T 存在,推导出 A 的概率
- **判定规则**:
- - 只有当 P(A→T) > P(T→A) 时,A 才能作为 T 的来源特征
- - 如果 P(T→A) >= P(A→T),说明 A 更可能是 T 的结果/表现形式
- **警惕"利用关系"伪装成"因果关系"**:
- - 错误:因为要"提供情绪价值",所以选择了"猫咪主角"
- - 正确:因为已有"猫咪主角"(固有资产),所以用它来"提供情绪价值"
- - 区别:"提供情绪价值"是对猫咪的利用方式,不是选择猫咪的原因
- ## 输出格式
- 使用JSON格式输出,结构如下:
- {{
- "目标关键特征": "...",
- "固有资产判定": {{
- "是否固有资产": true/false,
- "判定理由": "..."
- }},
- "推理类型分类": {{
- "单独推理": [
- {{
- "排名": 1,
- "特征名称": "...",
- "特征类型": "灵感点/目的点/关键点",
- "正向概率": 0.xx,
- "反向概率": 0.xx,
- "可能性": 0.xx,
- "推理说明": "..."
- }}
- ],
- "组合推理": [
- {{
- "组合编号": 1,
- "组合成员": ["...", "..."],
- "成员类型": ["...", "..."],
- "正向概率": 0.xx,
- "反向概率": 0.xx,
- "可能性": 0.xx,
- "单独可能性": {{
- "成员1": 0.xx,
- "成员2": 0.xx
- }},
- "协同效应分析": {{
- "单独平均值": 0.xx,
- "协同增益": 0.xx,
- "增益说明": "..."
- }},
- "推理说明": "..."
- }}
- ],
- "排除特征": [
- {{
- "特征名称": "...",
- "特征类型": "...",
- "正向概率": 0.xx,
- "反向概率": 0.xx,
- "排除原因": "..."
- }}
- ]
- }}
- }}
- **注意**:如果目标特征是固有资产,"单独推理"和"组合推理"应为空数组,所有候选特征都应放入"排除特征"。
- ## 注意事项
- 1. **固有资产优先判定**:先判断目标特征是否为固有资产
- 2. **警惕利用关系**:目的点对关键点的"利用"不等于"推导"
- 3. 可能性数值需要合理评估,范围在0-1之间
- 4. 单独推理按可能性从高到低排序
- 5. 组合推理必须包含2个或以上成员
- 6. 推理说明要清晰说明推导逻辑
- '''.strip()
- # ===== 主分析函数 =====
- async def analyze_node_origin(
- post_id: str = None,
- target_name: str = None,
- config: PathConfig = None
- ) -> Dict:
- """
- 分析目标节点可能由哪些候选节点推导而来
- Args:
- post_id: 帖子ID,默认使用第一个帖子
- target_name: 目标节点名称,默认使用关键点标签的第一个
- config: 路径配置,如果为 None 则创建默认配置
- Returns:
- 分析结果
- """
- if config is None:
- config = PathConfig()
- # 获取帖子图谱文件
- post_graph_files = get_post_graph_files(config)
- if not post_graph_files:
- raise ValueError("没有找到帖子图谱文件")
- # 选择帖子
- if post_id:
- target_file = next(
- (f for f in post_graph_files if post_id in f.name),
- None
- )
- if not target_file:
- raise ValueError(f"未找到帖子: {post_id}")
- else:
- target_file = post_graph_files[0] # 默认第一个
- # 加载帖子图谱
- post_graph = load_post_graph(target_file)
- actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
- # 准备输入数据
- input_data = prepare_analyze_input(post_graph, target_name)
- actual_target_name = input_data["目标特征"]["特征名称"]
- # 构建 prompt
- prompt = build_prompt(input_data, input_data.get("边关系", []))
- print(f"帖子ID: {actual_post_id}")
- print(f"目标特征: {actual_target_name}")
- print(f"候选特征数: {len(input_data['候选特征'])}")
- print()
- # 调试:打印 prompt(取消注释以启用)
- # print("=" * 40)
- # print("Prompt 预览:")
- # print(prompt[:2000])
- # print("...")
- # print("=" * 40)
- # 使用 custom_span 标识分析流程
- with custom_span(
- name=f"分析特征来源 - {actual_target_name}",
- data={
- "帖子id": actual_post_id,
- "目标特征": actual_target_name,
- "候选特征数": len(input_data["候选特征"]),
- "模型": MODEL_NAME
- }
- ):
- # 调用 agent
- result = await Runner.run(agent, input=prompt)
- output = result.final_output
- # 解析 JSON
- try:
- if "```json" in output:
- json_start = output.find("```json") + 7
- json_end = output.find("```", json_start)
- json_str = output[json_start:json_end].strip()
- elif "{" in output and "}" in output:
- json_start = output.find("{")
- json_end = output.rfind("}") + 1
- json_str = output[json_start:json_end]
- else:
- json_str = output
- analysis_result = json.loads(json_str)
- return {
- "帖子id": actual_post_id,
- "目标节点": actual_target_name,
- "模型": MODEL_NAME,
- "输入": input_data,
- "输出": analysis_result
- }
- except Exception as e:
- return {
- "帖子id": actual_post_id,
- "目标节点": actual_target_name,
- "模型": MODEL_NAME,
- "输入": input_data,
- "输出": None,
- "错误": str(e),
- "原始输出": output
- }
- # ===== 辅助函数 =====
- def get_all_target_names(post_graph: Dict) -> List[str]:
- """获取所有可作为目标的特征名称(关键点标签)"""
- tags = extract_tags_from_post_graph(post_graph)
- # 返回所有关键点标签的名称
- return [t["name"] for t in tags if t["dimension"] == "关键点"]
- def display_result(result: Dict):
- """显示单个分析结果"""
- output = result.get("输出")
- if output:
- print(f"\n目标关键特征: {output.get('目标关键特征', 'N/A')}")
- # 固有资产判定
- asset_check = output.get("固有资产判定", {})
- if asset_check.get("是否固有资产"):
- print(f" → 固有资产: {asset_check.get('判定理由', '')[:60]}...")
- else:
- reasoning = output.get("推理类型分类", {})
- # 显示单独推理
- single = reasoning.get("单独推理", [])
- if single:
- print(" 【单独推理】")
- for item in single[:3]: # 只显示前3个
- print(f" [{item.get('可能性', 0):.2f}] {item.get('特征名称', '')}")
- # 显示组合推理
- combo = reasoning.get("组合推理", [])
- if combo:
- print(" 【组合推理】")
- for item in combo[:2]: # 只显示前2个
- members = " + ".join(item.get("组合成员", []))
- print(f" [{item.get('可能性', 0):.2f}] {members}")
- else:
- print(f" 分析失败: {result.get('错误', 'N/A')}")
- # ===== 主函数 =====
- async def main(
- post_id: str = None,
- target_name: str = None,
- num_targets: int = 1,
- current_time: str = None,
- log_url: str = None
- ):
- """
- 主函数
- Args:
- post_id: 帖子ID,可选
- target_name: 目标节点名称,可选(如果指定则只分析这一个)
- num_targets: 要分析的目标特征数量(当 target_name 为空时生效)
- current_time: 当前时间戳(从外部传入)
- log_url: 日志链接(从外部传入)
- """
- config = PathConfig()
- print(f"账号: {config.account_name}")
- print(f"使用模型: {MODEL_NAME}")
- if log_url:
- print(f"Trace URL: {log_url}")
- print()
- # 获取帖子图谱文件
- post_graph_files = get_post_graph_files(config)
- if not post_graph_files:
- print("错误: 没有找到帖子图谱文件")
- return
- # 选择帖子
- if post_id:
- target_file = next(
- (f for f in post_graph_files if post_id in f.name),
- None
- )
- if not target_file:
- print(f"错误: 未找到帖子 {post_id}")
- return
- else:
- target_file = post_graph_files[0]
- # 加载帖子图谱
- post_graph = load_post_graph(target_file)
- actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
- print(f"帖子ID: {actual_post_id}")
- # 确定要分析的目标特征列表
- if target_name:
- target_names = [target_name]
- else:
- all_targets = get_all_target_names(post_graph)
- target_names = all_targets[:num_targets]
- print(f"待分析目标特征: {target_names}")
- print("=" * 60)
- # 输出目录
- output_dir = config.intermediate_dir / "node_origin_analysis"
- output_dir.mkdir(parents=True, exist_ok=True)
- # 并发分析所有目标特征
- async def analyze_single(name: str, index: int):
- print(f"\n[{index}/{len(target_names)}] 开始分析: {name}")
- result = await analyze_node_origin(
- post_id=post_id,
- target_name=name,
- config=config
- )
- print(f"[{index}/{len(target_names)}] 完成: {name}")
- display_result(result)
- return {
- "目标特征": result.get("目标节点"),
- "固有资产判定": result.get("输出", {}).get("固有资产判定", {}),
- "推理类型分类": result.get("输出", {}).get("推理类型分类", {}),
- "输入": result.get("输入"),
- "错误": result.get("错误")
- }
- # 创建并发任务
- tasks = [
- analyze_single(name, i)
- for i, name in enumerate(target_names, 1)
- ]
- # 并发执行
- all_results = await asyncio.gather(*tasks)
- # 合并保存到一个文件
- merged_output = {
- "元数据": {
- "current_time": current_time,
- "log_url": log_url,
- "model": MODEL_NAME
- },
- "帖子id": actual_post_id,
- "分析结果列表": all_results
- }
- output_file = output_dir / f"{actual_post_id}_来源分析.json"
- with open(output_file, "w", encoding="utf-8") as f:
- json.dump(merged_output, f, ensure_ascii=False, indent=2)
- print("\n" + "=" * 60)
- print(f"完成! 共分析 {len(target_names)} 个目标特征")
- print(f"结果已保存到: {output_file}")
- if log_url:
- print(f"Trace: {log_url}")
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(description="分析节点来源")
- parser.add_argument("--post-id", type=str, help="帖子ID")
- parser.add_argument("--target", type=str, help="目标节点名称(指定则只分析这一个)")
- parser.add_argument("--num", type=int, default=1, help="要分析的目标特征数量(默认1)")
- parser.add_argument("--all", action="store_true", help="分析所有关键点")
- args = parser.parse_args()
- # 如果指定了 --all,则设置 num 为一个很大的数
- if args.all:
- args.num = 999
- # 设置 trace
- current_time, log_url = set_trace()
- # 使用 trace 上下文包裹整个执行流程
- with trace("节点来源分析"):
- asyncio.run(main(
- post_id=args.post_id,
- target_name=args.target,
- num_targets=args.num,
- current_time=current_time,
- log_url=log_url
- ))
|