analyze_node_origin.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 节点来源分析脚本
  5. 给定一个目标节点,推断它可能由哪些候选节点推导而来。
  6. 输入:post_graph 目录中的帖子图谱文件
  7. 输出:节点来源分析结果
  8. """
  9. import asyncio
  10. import json
  11. from pathlib import Path
  12. from typing import Dict, List, Optional, TypedDict
  13. import sys
  14. # 添加项目根目录到路径
  15. project_root = Path(__file__).parent.parent.parent
  16. sys.path.insert(0, str(project_root))
  17. from agents import Agent, Runner, ModelSettings, trace
  18. from agents.tracing.create import custom_span
  19. from lib.client import get_model
  20. from lib.my_trace import set_trace_smith as set_trace
  21. from script.data_processing.path_config import PathConfig
  22. # 模型配置
  23. MODEL_NAME = "google/gemini-3-pro-preview"
  24. # MODEL_NAME = 'deepseek/deepseek-v3.2'
  25. # MODEL_NAME = 'anthropic/claude-sonnet-4.5'
  26. agent = Agent(
  27. name="Node Origin Analyzer",
  28. model=get_model(MODEL_NAME),
  29. model_settings=ModelSettings(
  30. temperature=0.0,
  31. max_tokens=65536,
  32. ),
  33. tools=[],
  34. )
  35. # ===== 类型定义 =====
  36. class NodeInfo(TypedDict):
  37. 名称: str
  38. 描述: str
  39. class EdgeInfo(TypedDict):
  40. from_node: str
  41. to_node: str
  42. 关系: Optional[str]
  43. 概率: Optional[float]
  44. class AnalyzeInput(TypedDict):
  45. 目标节点: NodeInfo
  46. 候选节点: List[NodeInfo]
  47. 边关系: List[EdgeInfo]
  48. class OriginPossibility(TypedDict):
  49. 来源节点: List[str]
  50. 概率: float
  51. 推理依据: str
  52. class AnalyzeOutput(TypedDict):
  53. 推理过程: str
  54. 来源可能性: List[OriginPossibility]
  55. # ===== 数据提取函数 =====
  56. def get_post_graph_files(config: PathConfig) -> List[Path]:
  57. """获取所有帖子图谱文件"""
  58. post_graph_dir = config.intermediate_dir / "post_graph"
  59. return sorted(post_graph_dir.glob("*_帖子图谱.json"))
  60. def load_post_graph(file_path: Path) -> Dict:
  61. """加载帖子图谱"""
  62. with open(file_path, "r", encoding="utf-8") as f:
  63. return json.load(f)
  64. def extract_tags_from_post_graph(post_graph: Dict) -> List[Dict]:
  65. """
  66. 从帖子图谱中提取标签节点
  67. 筛选条件:type === "标签" 且 domain === "帖子"
  68. Returns:
  69. 标签节点列表
  70. """
  71. tags = []
  72. for node_id, node in post_graph.get("nodes", {}).items():
  73. if node.get("type") == "标签" and node.get("domain") == "帖子":
  74. tags.append({
  75. "id": node_id,
  76. "name": node.get("name", ""),
  77. "dimension": node.get("dimension", ""),
  78. "description": node.get("detail", {}).get("description", ""),
  79. "pointNames": node.get("detail", {}).get("pointNames", []),
  80. })
  81. return tags
  82. def prepare_analyze_input(
  83. post_graph: Dict,
  84. target_name: str = None
  85. ) -> AnalyzeInput:
  86. """
  87. 准备分析输入数据
  88. Args:
  89. post_graph: 帖子图谱数据
  90. target_name: 目标节点名称,如果为 None 则使用关键点标签的第一个
  91. Returns:
  92. AnalyzeInput 数据结构
  93. """
  94. # 提取所有标签节点
  95. tags = extract_tags_from_post_graph(post_graph)
  96. if not tags:
  97. raise ValueError("帖子图谱中没有找到标签节点")
  98. # 确定目标节点
  99. if target_name:
  100. target_tag = next((t for t in tags if t["name"] == target_name), None)
  101. if not target_tag:
  102. raise ValueError(f"未找到目标节点: {target_name}")
  103. else:
  104. # 默认使用关键点标签的第一个
  105. key_point_tags = [t for t in tags if t["dimension"] == "关键点"]
  106. if not key_point_tags:
  107. raise ValueError("没有找到关键点标签")
  108. target_tag = key_point_tags[0]
  109. # 候选节点(排除目标节点)
  110. candidate_tags = [t for t in tags if t["name"] != target_tag["name"]]
  111. # 构建输入(包含特征类型信息)
  112. return {
  113. "目标特征": {
  114. "特征名称": target_tag["name"],
  115. "特征类型": target_tag["dimension"]
  116. },
  117. "候选特征": [
  118. {
  119. "特征名称": t["name"],
  120. "特征类型": t["dimension"]
  121. }
  122. for t in candidate_tags
  123. ],
  124. "边关系": [] # 暂时为空
  125. }
  126. # ===== Prompt 构建 =====
  127. def build_prompt(input_data: Dict, edges: List[EdgeInfo] = None) -> str:
  128. """
  129. 构建分析 prompt
  130. Args:
  131. input_data: 分析输入数据(包含目标节点和候选节点,都带维度信息)
  132. edges: 边关系列表
  133. Returns:
  134. prompt 文本
  135. """
  136. target = input_data["目标特征"]
  137. candidates = input_data["候选特征"]
  138. edges = edges or []
  139. # 构建候选特征列表
  140. candidates_text = []
  141. for c in candidates:
  142. candidates_text.append(f"- {c['特征名称']} ({c['特征类型']})")
  143. candidates_section = "\n".join(candidates_text)
  144. # 构建边关系文本
  145. if edges:
  146. edges_text = []
  147. for e in edges:
  148. edge_str = f"- {e['from_node']} → {e['to_node']}"
  149. if e.get("关系"):
  150. edge_str += f":{e['关系']}"
  151. if e.get("概率") is not None:
  152. edge_str += f"(概率: {e['概率']:.2f})"
  153. edges_text.append(edge_str)
  154. edges_section = "\n".join(edges_text)
  155. else:
  156. edges_section = "(暂无已知关系)"
  157. return f'''你是一个内容创作逆向工程分析专家。你的任务是分析给定的目标特征可能由哪些候选特征推导而来。
  158. ## 目标关键特征
  159. {target['特征名称']} ({target['特征类型']})
  160. ## 候选特征
  161. {candidates_section}
  162. ## 已知特征关系(仅供参考)
  163. {edges_section}
  164. ## 第一步:固有资产判定
  165. 首先判断目标特征是否为**固有资产/前提条件**:
  166. **固有资产的特征**:
  167. - 账号的基础设定(如:萌宠账号的"猫咪主角"、美食博主的"厨艺技能")
  168. - 创作者本身拥有的资源(如:有猫、会画画、有专业知识)
  169. - 不是针对某个内容"选择"的,而是账号/创作者的固有属性
  170. **判定方法**:
  171. 问自己:这个特征是"创作者选择加入的"还是"创作者本来就有的"?
  172. - 如果是"本来就有的" → 固有资产,不需要从其他特征推导
  173. - 如果是"选择加入的" → 可推导特征,继续分析
  174. **重要**:如果目标特征是固有资产,应该返回空的来源列表,并说明原因。
  175. ## 第二步:因果方向检验(仅当目标特征非固有资产时)
  176. 在判断"候选特征 A 是否能推导出目标特征 T"之前,必须进行因果方向检验:
  177. 1. **正向概率 P(A→T)**:假设 A 存在,推导出 T 的概率
  178. 2. **反向概率 P(T→A)**:假设 T 存在,推导出 A 的概率
  179. **判定规则**:
  180. - 只有当 P(A→T) > P(T→A) 时,A 才能作为 T 的来源特征
  181. - 如果 P(T→A) >= P(A→T),说明 A 更可能是 T 的结果/表现形式
  182. **警惕"利用关系"伪装成"因果关系"**:
  183. - 错误:因为要"提供情绪价值",所以选择了"猫咪主角"
  184. - 正确:因为已有"猫咪主角"(固有资产),所以用它来"提供情绪价值"
  185. - 区别:"提供情绪价值"是对猫咪的利用方式,不是选择猫咪的原因
  186. ## 输出格式
  187. 使用JSON格式输出,结构如下:
  188. {{
  189. "目标关键特征": "...",
  190. "固有资产判定": {{
  191. "是否固有资产": true/false,
  192. "判定理由": "..."
  193. }},
  194. "推理类型分类": {{
  195. "单独推理": [
  196. {{
  197. "排名": 1,
  198. "特征名称": "...",
  199. "特征类型": "灵感点/目的点/关键点",
  200. "正向概率": 0.xx,
  201. "反向概率": 0.xx,
  202. "可能性": 0.xx,
  203. "推理说明": "..."
  204. }}
  205. ],
  206. "组合推理": [
  207. {{
  208. "组合编号": 1,
  209. "组合成员": ["...", "..."],
  210. "成员类型": ["...", "..."],
  211. "正向概率": 0.xx,
  212. "反向概率": 0.xx,
  213. "可能性": 0.xx,
  214. "单独可能性": {{
  215. "成员1": 0.xx,
  216. "成员2": 0.xx
  217. }},
  218. "协同效应分析": {{
  219. "单独平均值": 0.xx,
  220. "协同增益": 0.xx,
  221. "增益说明": "..."
  222. }},
  223. "推理说明": "..."
  224. }}
  225. ],
  226. "排除特征": [
  227. {{
  228. "特征名称": "...",
  229. "特征类型": "...",
  230. "正向概率": 0.xx,
  231. "反向概率": 0.xx,
  232. "排除原因": "..."
  233. }}
  234. ]
  235. }}
  236. }}
  237. **注意**:如果目标特征是固有资产,"单独推理"和"组合推理"应为空数组,所有候选特征都应放入"排除特征"。
  238. ## 注意事项
  239. 1. **固有资产优先判定**:先判断目标特征是否为固有资产
  240. 2. **警惕利用关系**:目的点对关键点的"利用"不等于"推导"
  241. 3. 可能性数值需要合理评估,范围在0-1之间
  242. 4. 单独推理按可能性从高到低排序
  243. 5. 组合推理必须包含2个或以上成员
  244. 6. 推理说明要清晰说明推导逻辑
  245. '''.strip()
  246. # ===== 主分析函数 =====
  247. async def analyze_node_origin(
  248. post_id: str = None,
  249. target_name: str = None,
  250. config: PathConfig = None
  251. ) -> Dict:
  252. """
  253. 分析目标节点可能由哪些候选节点推导而来
  254. Args:
  255. post_id: 帖子ID,默认使用第一个帖子
  256. target_name: 目标节点名称,默认使用关键点标签的第一个
  257. config: 路径配置,如果为 None 则创建默认配置
  258. Returns:
  259. 分析结果
  260. """
  261. if config is None:
  262. config = PathConfig()
  263. # 获取帖子图谱文件
  264. post_graph_files = get_post_graph_files(config)
  265. if not post_graph_files:
  266. raise ValueError("没有找到帖子图谱文件")
  267. # 选择帖子
  268. if post_id:
  269. target_file = next(
  270. (f for f in post_graph_files if post_id in f.name),
  271. None
  272. )
  273. if not target_file:
  274. raise ValueError(f"未找到帖子: {post_id}")
  275. else:
  276. target_file = post_graph_files[0] # 默认第一个
  277. # 加载帖子图谱
  278. post_graph = load_post_graph(target_file)
  279. actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
  280. # 准备输入数据
  281. input_data = prepare_analyze_input(post_graph, target_name)
  282. actual_target_name = input_data["目标特征"]["特征名称"]
  283. # 构建 prompt
  284. prompt = build_prompt(input_data, input_data.get("边关系", []))
  285. print(f"帖子ID: {actual_post_id}")
  286. print(f"目标特征: {actual_target_name}")
  287. print(f"候选特征数: {len(input_data['候选特征'])}")
  288. print()
  289. # 调试:打印 prompt(取消注释以启用)
  290. # print("=" * 40)
  291. # print("Prompt 预览:")
  292. # print(prompt[:2000])
  293. # print("...")
  294. # print("=" * 40)
  295. # 使用 custom_span 标识分析流程
  296. with custom_span(
  297. name=f"分析特征来源 - {actual_target_name}",
  298. data={
  299. "帖子id": actual_post_id,
  300. "目标特征": actual_target_name,
  301. "候选特征数": len(input_data["候选特征"]),
  302. "模型": MODEL_NAME
  303. }
  304. ):
  305. # 调用 agent
  306. result = await Runner.run(agent, input=prompt)
  307. output = result.final_output
  308. # 解析 JSON
  309. try:
  310. if "```json" in output:
  311. json_start = output.find("```json") + 7
  312. json_end = output.find("```", json_start)
  313. json_str = output[json_start:json_end].strip()
  314. elif "{" in output and "}" in output:
  315. json_start = output.find("{")
  316. json_end = output.rfind("}") + 1
  317. json_str = output[json_start:json_end]
  318. else:
  319. json_str = output
  320. analysis_result = json.loads(json_str)
  321. return {
  322. "帖子id": actual_post_id,
  323. "目标节点": actual_target_name,
  324. "模型": MODEL_NAME,
  325. "输入": input_data,
  326. "输出": analysis_result
  327. }
  328. except Exception as e:
  329. return {
  330. "帖子id": actual_post_id,
  331. "目标节点": actual_target_name,
  332. "模型": MODEL_NAME,
  333. "输入": input_data,
  334. "输出": None,
  335. "错误": str(e),
  336. "原始输出": output
  337. }
  338. # ===== 辅助函数 =====
  339. def get_all_target_names(post_graph: Dict) -> List[str]:
  340. """获取所有可作为目标的特征名称(关键点标签)"""
  341. tags = extract_tags_from_post_graph(post_graph)
  342. # 返回所有关键点标签的名称
  343. return [t["name"] for t in tags if t["dimension"] == "关键点"]
  344. def display_result(result: Dict):
  345. """显示单个分析结果"""
  346. output = result.get("输出")
  347. if output:
  348. print(f"\n目标关键特征: {output.get('目标关键特征', 'N/A')}")
  349. # 固有资产判定
  350. asset_check = output.get("固有资产判定", {})
  351. if asset_check.get("是否固有资产"):
  352. print(f" → 固有资产: {asset_check.get('判定理由', '')[:60]}...")
  353. else:
  354. reasoning = output.get("推理类型分类", {})
  355. # 显示单独推理
  356. single = reasoning.get("单独推理", [])
  357. if single:
  358. print(" 【单独推理】")
  359. for item in single[:3]: # 只显示前3个
  360. print(f" [{item.get('可能性', 0):.2f}] {item.get('特征名称', '')}")
  361. # 显示组合推理
  362. combo = reasoning.get("组合推理", [])
  363. if combo:
  364. print(" 【组合推理】")
  365. for item in combo[:2]: # 只显示前2个
  366. members = " + ".join(item.get("组合成员", []))
  367. print(f" [{item.get('可能性', 0):.2f}] {members}")
  368. else:
  369. print(f" 分析失败: {result.get('错误', 'N/A')}")
  370. # ===== 主函数 =====
  371. async def main(
  372. post_id: str = None,
  373. target_name: str = None,
  374. num_targets: int = 1,
  375. current_time: str = None,
  376. log_url: str = None
  377. ):
  378. """
  379. 主函数
  380. Args:
  381. post_id: 帖子ID,可选
  382. target_name: 目标节点名称,可选(如果指定则只分析这一个)
  383. num_targets: 要分析的目标特征数量(当 target_name 为空时生效)
  384. current_time: 当前时间戳(从外部传入)
  385. log_url: 日志链接(从外部传入)
  386. """
  387. config = PathConfig()
  388. print(f"账号: {config.account_name}")
  389. print(f"使用模型: {MODEL_NAME}")
  390. if log_url:
  391. print(f"Trace URL: {log_url}")
  392. print()
  393. # 获取帖子图谱文件
  394. post_graph_files = get_post_graph_files(config)
  395. if not post_graph_files:
  396. print("错误: 没有找到帖子图谱文件")
  397. return
  398. # 选择帖子
  399. if post_id:
  400. target_file = next(
  401. (f for f in post_graph_files if post_id in f.name),
  402. None
  403. )
  404. if not target_file:
  405. print(f"错误: 未找到帖子 {post_id}")
  406. return
  407. else:
  408. target_file = post_graph_files[0]
  409. # 加载帖子图谱
  410. post_graph = load_post_graph(target_file)
  411. actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
  412. print(f"帖子ID: {actual_post_id}")
  413. # 确定要分析的目标特征列表
  414. if target_name:
  415. target_names = [target_name]
  416. else:
  417. all_targets = get_all_target_names(post_graph)
  418. target_names = all_targets[:num_targets]
  419. print(f"待分析目标特征: {target_names}")
  420. print("=" * 60)
  421. # 输出目录
  422. output_dir = config.intermediate_dir / "node_origin_analysis"
  423. output_dir.mkdir(parents=True, exist_ok=True)
  424. # 并发分析所有目标特征
  425. async def analyze_single(name: str, index: int):
  426. print(f"\n[{index}/{len(target_names)}] 开始分析: {name}")
  427. result = await analyze_node_origin(
  428. post_id=post_id,
  429. target_name=name,
  430. config=config
  431. )
  432. print(f"[{index}/{len(target_names)}] 完成: {name}")
  433. display_result(result)
  434. return {
  435. "目标特征": result.get("目标节点"),
  436. "固有资产判定": result.get("输出", {}).get("固有资产判定", {}),
  437. "推理类型分类": result.get("输出", {}).get("推理类型分类", {}),
  438. "输入": result.get("输入"),
  439. "错误": result.get("错误")
  440. }
  441. # 创建并发任务
  442. tasks = [
  443. analyze_single(name, i)
  444. for i, name in enumerate(target_names, 1)
  445. ]
  446. # 并发执行
  447. all_results = await asyncio.gather(*tasks)
  448. # 合并保存到一个文件
  449. merged_output = {
  450. "元数据": {
  451. "current_time": current_time,
  452. "log_url": log_url,
  453. "model": MODEL_NAME
  454. },
  455. "帖子id": actual_post_id,
  456. "分析结果列表": all_results
  457. }
  458. output_file = output_dir / f"{actual_post_id}_来源分析.json"
  459. with open(output_file, "w", encoding="utf-8") as f:
  460. json.dump(merged_output, f, ensure_ascii=False, indent=2)
  461. print("\n" + "=" * 60)
  462. print(f"完成! 共分析 {len(target_names)} 个目标特征")
  463. print(f"结果已保存到: {output_file}")
  464. if log_url:
  465. print(f"Trace: {log_url}")
  466. if __name__ == "__main__":
  467. import argparse
  468. parser = argparse.ArgumentParser(description="分析节点来源")
  469. parser.add_argument("--post-id", type=str, help="帖子ID")
  470. parser.add_argument("--target", type=str, help="目标节点名称(指定则只分析这一个)")
  471. parser.add_argument("--num", type=int, default=1, help="要分析的目标特征数量(默认1)")
  472. parser.add_argument("--all", action="store_true", help="分析所有关键点")
  473. args = parser.parse_args()
  474. # 如果指定了 --all,则设置 num 为一个很大的数
  475. if args.all:
  476. args.num = 999
  477. # 设置 trace
  478. current_time, log_url = set_trace()
  479. # 使用 trace 上下文包裹整个执行流程
  480. with trace("节点来源分析"):
  481. asyncio.run(main(
  482. post_id=args.post_id,
  483. target_name=args.target,
  484. num_targets=args.num,
  485. current_time=current_time,
  486. log_url=log_url
  487. ))