analyze_node_origin_v3.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. 节点来源分析脚本 V3
  5. 采用"法庭取证式"思维,通过两步验证法(竞品列举 + 排他性检验)严格推导特征来源。
  6. 核心改进:
  7. 1. 两步验证法:先列举竞品,再做排他性检验
  8. 2. 严格评分:无排他性证据时可能性不超过0.4
  9. 3. 微观逻辑优先:组合推理寻找化学反应而非宏观目的
  10. 输入:post_graph 目录中的帖子图谱文件
  11. 输出:节点来源分析结果
  12. """
  13. import asyncio
  14. import json
  15. from pathlib import Path
  16. from typing import Dict, List, Optional
  17. import sys
  18. # 添加项目根目录到路径
  19. project_root = Path(__file__).parent.parent.parent
  20. sys.path.insert(0, str(project_root))
  21. from agents import Agent, Runner, ModelSettings, trace
  22. from agents.tracing.create import custom_span
  23. from lib.client import get_model
  24. from lib.my_trace import set_trace_smith as set_trace
  25. from script.data_processing.path_config import PathConfig
  26. # 模型配置
  27. MODEL_NAME = "google/gemini-3-pro-preview"
  28. # MODEL_NAME = 'deepseek/deepseek-v3.2'
  29. # MODEL_NAME = 'anthropic/claude-sonnet-4.5'
  30. agent = Agent(
  31. name="Node Origin Analyzer V3",
  32. model=get_model(MODEL_NAME),
  33. model_settings=ModelSettings(
  34. temperature=0.0,
  35. max_tokens=65536,
  36. ),
  37. tools=[],
  38. )
  39. # ===== 数据提取函数 =====
  40. def get_post_graph_files(config: PathConfig) -> List[Path]:
  41. """获取所有帖子图谱文件"""
  42. post_graph_dir = config.intermediate_dir / "post_graph"
  43. return sorted(post_graph_dir.glob("*_帖子图谱.json"))
  44. def load_post_graph(file_path: Path) -> Dict:
  45. """加载帖子图谱"""
  46. with open(file_path, "r", encoding="utf-8") as f:
  47. return json.load(f)
  48. def extract_tags_from_post_graph(post_graph: Dict) -> List[Dict]:
  49. """
  50. 从帖子图谱中提取标签节点
  51. 筛选条件:type === "标签" 且 domain === "帖子"
  52. Returns:
  53. 标签节点列表
  54. """
  55. tags = []
  56. for node_id, node in post_graph.get("nodes", {}).items():
  57. if node.get("type") == "标签" and node.get("domain") == "帖子":
  58. tags.append({
  59. "id": node_id,
  60. "name": node.get("name", ""),
  61. "dimension": node.get("dimension", ""),
  62. "description": node.get("detail", {}).get("description", ""),
  63. "pointNames": node.get("detail", {}).get("pointNames", []),
  64. })
  65. return tags
  66. def prepare_analyze_input(
  67. post_graph: Dict,
  68. target_name: str = None
  69. ) -> Dict:
  70. """
  71. 准备分析输入数据
  72. Args:
  73. post_graph: 帖子图谱数据
  74. target_name: 目标节点名称,如果为 None 则使用关键点标签的第一个
  75. Returns:
  76. 分析输入数据结构
  77. """
  78. # 提取所有标签节点
  79. tags = extract_tags_from_post_graph(post_graph)
  80. if not tags:
  81. raise ValueError("帖子图谱中没有找到标签节点")
  82. # 确定目标节点
  83. if target_name:
  84. target_tag = next((t for t in tags if t["name"] == target_name), None)
  85. if not target_tag:
  86. raise ValueError(f"未找到目标节点: {target_name}")
  87. else:
  88. # 默认使用关键点标签的第一个
  89. key_point_tags = [t for t in tags if t["dimension"] == "关键点"]
  90. if not key_point_tags:
  91. raise ValueError("没有找到关键点标签")
  92. target_tag = key_point_tags[0]
  93. # 候选节点筛选逻辑:
  94. # - 排除目标节点本身
  95. # - 如果目标是灵感点或目的点,排除关键点(关键点由灵感点/目的点推导,不应反推)
  96. target_dimension = target_tag["dimension"]
  97. candidate_tags = []
  98. for t in tags:
  99. if t["name"] == target_tag["name"]:
  100. continue # 排除目标节点本身
  101. if target_dimension in ["灵感点", "目的点"] and t["dimension"] == "关键点":
  102. continue # 灵感点/目的点的候选集排除关键点
  103. candidate_tags.append(t)
  104. # 构建输入(包含特征类型信息)
  105. return {
  106. "目标特征": {
  107. "特征名称": target_tag["name"],
  108. "特征类型": target_tag["dimension"]
  109. },
  110. "候选特征": [
  111. {
  112. "特征名称": t["name"],
  113. "特征类型": t["dimension"]
  114. }
  115. for t in candidate_tags
  116. ],
  117. "边关系": []
  118. }
  119. # ===== Prompt 构建 =====
  120. def build_prompt(input_data: Dict) -> str:
  121. """
  122. 构建分析 prompt(V3 版本:法庭取证式两步验证法)
  123. Args:
  124. input_data: 分析输入数据(包含目标节点和候选节点,都带维度信息)
  125. Returns:
  126. prompt 文本
  127. """
  128. target = input_data["目标特征"]
  129. candidates = input_data["候选特征"]
  130. # 构建候选特征列表
  131. candidates_text = []
  132. for c in candidates:
  133. candidates_text.append(f"- {c['特征名称']} ({c['特征类型']})")
  134. candidates_section = "\n".join(candidates_text)
  135. return f'''# Role
  136. 你是一名严谨的内容逆向工程分析师,专门擅长拆解创意决策背后的逻辑链条。你的思维方式是"法庭取证式"的,只承认证据确凿的推导,坚决反对没有任何依据的"脑补"连接。
  137. # Task
  138. 分析给定的【帖子特征列表】是如何推导出【目标特征】的。
  139. **本次分析的目标特征是:{target['特征名称']}**
  140. # 核心推理协议
  141. 为了防止过度联想,你必须对每一个推理组合执行以下**两步验证法**。跳过步骤将视为分析失败。
  142. ## 步骤 1:列举强力竞品
  143. 不要默认必须选择目标({target['特征名称']})。基于【来源特征】的意图,思考还有什么其他形式能达到同样的效果?
  144. * *要求*:必须列出至少 2 个**除了目标以外**的合理选项(例如:如果是为了"互动",竞品可以是抽奖、投票、话题挑战;如果是为了"搞笑",竞品可以是段子、四格漫画)。
  145. ## 步骤 2:排他性检验
  146. 这是最关键的一步。检查【来源特征】中是否有具体的细节,能够**从逻辑上杀死**步骤 1 中的竞品?
  147. * *判定标准*:
  148. * 如果有特征明确指向"{target['特征名称']}"的独有属性,则具有排他性。
  149. * 如果仅仅是泛化的目的(如"为了搞笑"、"为了互动"),这些特征**无法排除**其他竞品。
  150. * **如果没有排他性证据,该组合的推导可能性严禁超过 0.4。**
  151. # 评分标准
  152. | 分数范围 | 等级 | 说明 |
  153. |---------|------|------|
  154. | 0.80 - 1.00 | 逻辑必然 | 存在无可辩驳的证据表明,必须采用目标形式,否则内容的核心功能或分发需求无法满足。 |
  155. | 0.50 - 0.79 | 高适配性 | 虽然没有绝对的强制性,但结合内容特性和市场/文化习惯,目标形式是最贴切、最有效的选择,其他形式会显得低效或别扭。 |
  156. | 0.20 - 0.49 | 创意偏好 | 目标形式是一个可行的、不错的创意选择,但其他形式也同样适用,甚至可能更优。决策更倾向于创意团队的偏好。 |
  157. | 0.00 - 0.19 | 弱关联 | 特征与目标形式之间缺乏有效的逻辑连接,关联性很弱或属于主观臆测。 |
  158. 如果没有合适的选项,无需强行推理。
  159. # 组合推理特别规则
  160. 微观逻辑优先:组合推理不应好高骛远。优先寻找微观的化学反应(例如 A+B 变成了 C),而不是宏观的目的(例如 A+B 为了引流)。组合数量通常小于等于 3 个。
  161. # 输入数据
  162. {candidates_section}
  163. # 输出格式 (JSON)
  164. 1. 在 `单独推理` 中,`来源特征` 字段**严禁出现** "+"、"和"、"&" 等连接符,必须是输入中的原话。
  165. 2. 如果你觉得两个特征必须在一起说才有意义,请直接跳过单独推理,将其放入 `组合推理`。
  166. 请严格按照以下 JSON 结构输出,不要包含任何 Markdown 格式以外的废话:
  167. ```json
  168. {{
  169. "目标关键特征": "{target['特征名称']}",
  170. "推理分析": {{
  171. "单独推理": [
  172. {{
  173. "来源特征": "...",
  174. "来源特征类型": "灵感点/目的点/关键点",
  175. "1_替代方案竞品": ["...", "..."],
  176. "2_排他性检验": "分析来源特征是否包含能排除上述竞品的证据。如果没有,请明确写出'无法排除竞品'。",
  177. "可能性": 0.xx,
  178. "结论": "..."
  179. }}
  180. ],
  181. "组合推理": [
  182. {{
  183. "组合成员": ["...", "..."],
  184. "成员类型": ["灵感点/目的点/关键点", "..."],
  185. "1_替代方案竞品": ["...", "..."],
  186. "2_排他性检验": "分析组合在一起后,是否产生了排除竞品的新逻辑?",
  187. "可能性": 0.xx,
  188. "结论": "..."
  189. }}
  190. ]
  191. }}
  192. }}
  193. ```
  194. '''.strip()
  195. # ===== 主分析函数 =====
  196. async def analyze_node_origin(
  197. post_id: str = None,
  198. target_name: str = None,
  199. config: PathConfig = None
  200. ) -> Dict:
  201. """
  202. 分析目标节点可能由哪些候选节点推导而来
  203. Args:
  204. post_id: 帖子ID,默认使用第一个帖子
  205. target_name: 目标节点名称,默认使用关键点标签的第一个
  206. config: 路径配置,如果为 None 则创建默认配置
  207. Returns:
  208. 分析结果
  209. """
  210. if config is None:
  211. config = PathConfig()
  212. # 获取帖子图谱文件
  213. post_graph_files = get_post_graph_files(config)
  214. if not post_graph_files:
  215. raise ValueError("没有找到帖子图谱文件")
  216. # 选择帖子
  217. if post_id:
  218. target_file = next(
  219. (f for f in post_graph_files if post_id in f.name),
  220. None
  221. )
  222. if not target_file:
  223. raise ValueError(f"未找到帖子: {post_id}")
  224. else:
  225. target_file = post_graph_files[0]
  226. # 加载帖子图谱
  227. post_graph = load_post_graph(target_file)
  228. actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
  229. # 准备输入数据
  230. input_data = prepare_analyze_input(post_graph, target_name)
  231. actual_target_name = input_data["目标特征"]["特征名称"]
  232. # 构建 prompt
  233. prompt = build_prompt(input_data)
  234. print(f"帖子ID: {actual_post_id}")
  235. print(f"目标特征: {actual_target_name}")
  236. print(f"候选特征数: {len(input_data['候选特征'])}")
  237. print()
  238. # 使用 custom_span 标识分析流程
  239. with custom_span(
  240. name=f"分析特征来源 V3 - {actual_target_name}",
  241. data={
  242. "帖子id": actual_post_id,
  243. "目标特征": actual_target_name,
  244. "候选特征数": len(input_data["候选特征"]),
  245. "模型": MODEL_NAME
  246. }
  247. ):
  248. # 调用 agent
  249. result = await Runner.run(agent, input=prompt)
  250. output = result.final_output
  251. # 解析 JSON
  252. try:
  253. if "```json" in output:
  254. json_start = output.find("```json") + 7
  255. json_end = output.find("```", json_start)
  256. json_str = output[json_start:json_end].strip()
  257. elif "{" in output and "}" in output:
  258. json_start = output.find("{")
  259. json_end = output.rfind("}") + 1
  260. json_str = output[json_start:json_end]
  261. else:
  262. json_str = output
  263. analysis_result = json.loads(json_str)
  264. return {
  265. "帖子id": actual_post_id,
  266. "目标节点": actual_target_name,
  267. "模型": MODEL_NAME,
  268. "输入": input_data,
  269. "输出": analysis_result
  270. }
  271. except Exception as e:
  272. return {
  273. "帖子id": actual_post_id,
  274. "目标节点": actual_target_name,
  275. "模型": MODEL_NAME,
  276. "输入": input_data,
  277. "输出": None,
  278. "错误": str(e),
  279. "原始输出": output
  280. }
  281. # ===== 图谱构建函数 =====
  282. def build_origin_graph(all_results: List[Dict], post_id: str) -> Dict:
  283. """
  284. 将分析结果转换为图谱格式
  285. Args:
  286. all_results: 所有目标特征的分析结果
  287. post_id: 帖子ID
  288. Returns:
  289. 图谱数据,包含 nodes 和 edges
  290. """
  291. nodes = {}
  292. edges = {}
  293. # 从输入收集所有特征节点
  294. for result in all_results:
  295. target_input = result.get("输入", {})
  296. # 添加目标节点
  297. target_info = target_input.get("目标特征", {})
  298. target_name = target_info.get("特征名称", "")
  299. target_type = target_info.get("特征类型", "关键点")
  300. node_id = f"帖子:{target_type}:标签:{target_name}"
  301. if node_id not in nodes:
  302. nodes[node_id] = {
  303. "name": target_name,
  304. "type": "标签",
  305. "dimension": target_type,
  306. "domain": "帖子",
  307. "detail": {}
  308. }
  309. # 添加候选特征节点
  310. for candidate in target_input.get("候选特征", []):
  311. c_name = candidate.get("特征名称", "")
  312. c_type = candidate.get("特征类型", "关键点")
  313. c_node_id = f"帖子:{c_type}:标签:{c_name}"
  314. if c_node_id not in nodes:
  315. nodes[c_node_id] = {
  316. "name": c_name,
  317. "type": "标签",
  318. "dimension": c_type,
  319. "domain": "帖子",
  320. "detail": {}
  321. }
  322. # 构建推导边
  323. for result in all_results:
  324. target_name = result.get("目标特征", "")
  325. target_input = result.get("输入", {})
  326. target_info = target_input.get("目标特征", {})
  327. target_type = target_info.get("特征类型", "关键点")
  328. target_node_id = f"帖子:{target_type}:标签:{target_name}"
  329. reasoning = result.get("推理分析", {})
  330. # 单独推理的边
  331. for item in reasoning.get("单独推理", []):
  332. source_name = item.get("来源特征", "")
  333. source_type = item.get("来源特征类型", "关键点")
  334. source_node_id = f"帖子:{source_type}:标签:{source_name}"
  335. probability = item.get("可能性", 0)
  336. edge_id = f"{source_node_id}|推导|{target_node_id}"
  337. edges[edge_id] = {
  338. "source": source_node_id,
  339. "target": target_node_id,
  340. "type": "推导",
  341. "score": probability,
  342. "detail": {
  343. "推理类型": "单独推理",
  344. "替代方案竞品": item.get("1_替代方案竞品", []),
  345. "排他性检验": item.get("2_排他性检验", ""),
  346. "结论": item.get("结论", "")
  347. }
  348. }
  349. # 组合推理的边(用虚拟节点表示组合)
  350. for item in reasoning.get("组合推理", []):
  351. members = item.get("组合成员", [])
  352. member_types = item.get("成员类型", [])
  353. probability = item.get("可能性", 0)
  354. # 创建组合虚拟节点(排序成员以保证唯一性)
  355. member_pairs = list(zip(members, member_types)) if len(member_types) == len(members) else [(m, "关键点") for m in members]
  356. sorted_pairs = sorted(member_pairs, key=lambda x: x[0])
  357. sorted_members = [p[0] for p in sorted_pairs]
  358. sorted_types = [p[1] for p in sorted_pairs]
  359. # 组合名称和ID包含类型信息
  360. combo_parts = [f"{sorted_types[i]}:{m}" for i, m in enumerate(sorted_members)]
  361. combo_name = " + ".join(combo_parts)
  362. combo_node_id = f"帖子:组合:组合:{combo_name}"
  363. if combo_node_id not in nodes:
  364. nodes[combo_node_id] = {
  365. "name": combo_name,
  366. "type": "组合",
  367. "dimension": "组合",
  368. "domain": "帖子",
  369. "detail": {
  370. "成员": sorted_members,
  371. "成员类型": sorted_types
  372. }
  373. }
  374. # 组合节点到目标的边
  375. edge_id = f"{combo_node_id}|推导|{target_node_id}"
  376. edges[edge_id] = {
  377. "source": combo_node_id,
  378. "target": target_node_id,
  379. "type": "推导",
  380. "score": probability,
  381. "detail": {
  382. "推理类型": "组合推理",
  383. "替代方案竞品": item.get("1_替代方案竞品", []),
  384. "排他性检验": item.get("2_排他性检验", ""),
  385. "结论": item.get("结论", "")
  386. }
  387. }
  388. # 成员到组合节点的边
  389. for i, member in enumerate(sorted_members):
  390. m_type = sorted_types[i]
  391. m_node_id = f"帖子:{m_type}:标签:{member}"
  392. m_edge_id = f"{m_node_id}|组成|{combo_node_id}"
  393. if m_edge_id not in edges:
  394. edges[m_edge_id] = {
  395. "source": m_node_id,
  396. "target": combo_node_id,
  397. "type": "组成",
  398. "score": 1.0,
  399. "detail": {}
  400. }
  401. return {
  402. "meta": {
  403. "postId": post_id,
  404. "type": "推导图谱",
  405. "version": "v3",
  406. "stats": {
  407. "nodeCount": len(nodes),
  408. "edgeCount": len(edges)
  409. }
  410. },
  411. "nodes": nodes,
  412. "edges": edges
  413. }
  414. # ===== 辅助函数 =====
  415. def get_all_target_names(post_graph: Dict, dimensions: List[str] = None) -> List[str]:
  416. """
  417. 获取所有可作为目标的特征名称
  418. Args:
  419. post_graph: 帖子图谱数据
  420. dimensions: 要包含的维度列表,默认只包含关键点
  421. 可选值: ["灵感点", "目的点", "关键点"]
  422. Returns:
  423. 特征名称列表
  424. """
  425. if dimensions is None:
  426. dimensions = ["关键点"]
  427. tags = extract_tags_from_post_graph(post_graph)
  428. return [t["name"] for t in tags if t["dimension"] in dimensions]
  429. def get_score_level(score: float) -> str:
  430. """根据分数返回等级"""
  431. if score >= 0.80:
  432. return "逻辑必然"
  433. elif score >= 0.50:
  434. return "高适配性"
  435. elif score >= 0.20:
  436. return "创意偏好"
  437. else:
  438. return "弱关联"
  439. def display_result(result: Dict):
  440. """显示单个分析结果"""
  441. output = result.get("输出")
  442. if output:
  443. print(f"\n目标关键特征: {output.get('目标关键特征', 'N/A')}")
  444. reasoning = output.get("推理分析", {})
  445. # 显示单独推理
  446. single = reasoning.get("单独推理", [])
  447. if single:
  448. print(" 【单独推理】")
  449. for item in single[:5]:
  450. score = item.get("可能性", 0)
  451. level = get_score_level(score)
  452. print(f" [{score:.2f} {level}] {item.get('来源特征', '')}")
  453. exclusivity = item.get("2_排他性检验", "")
  454. if len(exclusivity) > 60:
  455. exclusivity = exclusivity[:60] + "..."
  456. print(f" 排他性: {exclusivity}")
  457. # 显示组合推理
  458. combo = reasoning.get("组合推理", [])
  459. if combo:
  460. print(" 【组合推理】")
  461. for item in combo[:3]:
  462. members = " + ".join(item.get("组合成员", []))
  463. score = item.get("可能性", 0)
  464. level = get_score_level(score)
  465. print(f" [{score:.2f} {level}] {members}")
  466. exclusivity = item.get("2_排他性检验", "")
  467. if len(exclusivity) > 60:
  468. exclusivity = exclusivity[:60] + "..."
  469. print(f" 排他性: {exclusivity}")
  470. else:
  471. print(f" 分析失败: {result.get('错误', 'N/A')}")
  472. # ===== 单帖子处理函数 =====
  473. async def process_single_post(
  474. post_file: Path,
  475. config: PathConfig,
  476. target_name: str = None,
  477. num_targets: int = 999,
  478. dimensions: List[str] = None
  479. ):
  480. """
  481. 处理单个帖子
  482. Args:
  483. post_file: 帖子图谱文件路径
  484. config: 路径配置
  485. target_name: 目标节点名称,可选
  486. num_targets: 要分析的目标特征数量
  487. dimensions: 要分析的特征维度
  488. """
  489. if dimensions is None:
  490. dimensions = ["关键点"]
  491. # 为每个帖子生成独立的 trace
  492. current_time, log_url = set_trace()
  493. # 加载帖子图谱
  494. post_graph = load_post_graph(post_file)
  495. actual_post_id = post_graph.get("meta", {}).get("postId", "unknown")
  496. print(f"\n{'=' * 60}")
  497. print(f"帖子ID: {actual_post_id}")
  498. print(f"Trace URL: {log_url}")
  499. # 确定要分析的目标特征列表
  500. if target_name:
  501. target_names = [target_name]
  502. else:
  503. all_targets = get_all_target_names(post_graph, dimensions)
  504. target_names = all_targets[:num_targets]
  505. print(f"待分析目标特征: {target_names}")
  506. print("-" * 60)
  507. # 输出目录
  508. output_dir = config.intermediate_dir / "node_origin_analysis"
  509. output_dir.mkdir(parents=True, exist_ok=True)
  510. # 使用 trace 上下文包裹单个帖子的分析
  511. with trace(f"节点来源分析 V3 - {actual_post_id}"):
  512. # 并发分析所有目标特征
  513. async def analyze_single(name: str, index: int):
  514. print(f"\n[{index}/{len(target_names)}] 开始分析: {name}")
  515. result = await analyze_node_origin(
  516. post_id=actual_post_id,
  517. target_name=name,
  518. config=config
  519. )
  520. print(f"[{index}/{len(target_names)}] 完成: {name}")
  521. display_result(result)
  522. # 提取输出中的推理分析(V3 格式)
  523. output = result.get("输出", {})
  524. return {
  525. "目标特征": result.get("目标节点"),
  526. "推理分析": output.get("推理分析", {}),
  527. "输入": result.get("输入"),
  528. "错误": result.get("错误")
  529. }
  530. # 创建并发任务
  531. tasks = [
  532. analyze_single(name, i)
  533. for i, name in enumerate(target_names, 1)
  534. ]
  535. # 并发执行
  536. all_results = await asyncio.gather(*tasks)
  537. # 合并保存到一个文件
  538. merged_output = {
  539. "元数据": {
  540. "current_time": current_time,
  541. "log_url": log_url,
  542. "model": MODEL_NAME,
  543. "version": "v3"
  544. },
  545. "帖子id": actual_post_id,
  546. "分析结果列表": all_results
  547. }
  548. output_file = output_dir / f"{actual_post_id}_来源分析_v3.json"
  549. with open(output_file, "w", encoding="utf-8") as f:
  550. json.dump(merged_output, f, ensure_ascii=False, indent=2)
  551. # 生成推导关系图谱
  552. graph_output = build_origin_graph(all_results, actual_post_id)
  553. graph_file = output_dir / f"{actual_post_id}_推导图谱_v3.json"
  554. with open(graph_file, "w", encoding="utf-8") as f:
  555. json.dump(graph_output, f, ensure_ascii=False, indent=2)
  556. print(f"\n完成! 共分析 {len(target_names)} 个目标特征")
  557. print(f"分析结果: {output_file}")
  558. print(f"推导图谱: {graph_file}")
  559. print(f"Trace: {log_url}")
  560. return actual_post_id
  561. # ===== 主函数 =====
  562. async def main(
  563. post_id: str = None,
  564. target_name: str = None,
  565. num_targets: int = 999,
  566. dimensions: List[str] = None,
  567. all_posts: bool = False
  568. ):
  569. """
  570. 主函数
  571. Args:
  572. post_id: 帖子ID,可选(指定则只处理该帖子)
  573. target_name: 目标节点名称,可选(如果指定则只分析这一个)
  574. num_targets: 要分析的目标特征数量
  575. dimensions: 要分析的特征维度,默认只关键点
  576. all_posts: 是否处理所有帖子
  577. """
  578. if dimensions is None:
  579. dimensions = ["关键点"]
  580. config = PathConfig()
  581. print(f"账号: {config.account_name}")
  582. print(f"使用模型: {MODEL_NAME}")
  583. print(f"分析维度: {dimensions}")
  584. print(f"版本: V3 (法庭取证式两步验证法)")
  585. # 获取帖子图谱文件
  586. post_graph_files = get_post_graph_files(config)
  587. if not post_graph_files:
  588. print("错误: 没有找到帖子图谱文件")
  589. return
  590. # 确定要处理的帖子列表
  591. if post_id:
  592. # 指定了帖子ID
  593. target_file = next(
  594. (f for f in post_graph_files if post_id in f.name),
  595. None
  596. )
  597. if not target_file:
  598. print(f"错误: 未找到帖子 {post_id}")
  599. return
  600. files_to_process = [target_file]
  601. elif all_posts:
  602. # 处理所有帖子
  603. files_to_process = post_graph_files
  604. else:
  605. # 默认只处理第一个帖子
  606. files_to_process = [post_graph_files[0]]
  607. print(f"待处理帖子数: {len(files_to_process)}")
  608. # 逐个处理帖子(每个帖子独立的 trace)
  609. processed_posts = []
  610. for i, post_file in enumerate(files_to_process, 1):
  611. print(f"\n{'#' * 60}")
  612. print(f"# 处理帖子 {i}/{len(files_to_process)}")
  613. print(f"{'#' * 60}")
  614. post_id_result = await process_single_post(
  615. post_file=post_file,
  616. config=config,
  617. target_name=target_name,
  618. num_targets=num_targets,
  619. dimensions=dimensions
  620. )
  621. processed_posts.append(post_id_result)
  622. print(f"\n{'#' * 60}")
  623. print(f"# 全部完成! 共处理 {len(processed_posts)} 个帖子")
  624. print(f"{'#' * 60}")
  625. if __name__ == "__main__":
  626. import argparse
  627. parser = argparse.ArgumentParser(description="分析节点来源 (V3 法庭取证式)")
  628. parser.add_argument("--post-id", type=str, help="帖子ID(指定则只处理该帖子)")
  629. parser.add_argument("--target", type=str, help="目标节点名称(指定则只分析这一个特征)")
  630. parser.add_argument("--num", type=int, default=999, help="要分析的目标特征数量")
  631. parser.add_argument("--dims", type=str, nargs="+",
  632. choices=["灵感点", "目的点", "关键点"],
  633. help="指定要分析的维度(默认全部)")
  634. parser.add_argument("--all-posts", action="store_true", help="处理所有帖子")
  635. args = parser.parse_args()
  636. # 确定维度(默认所有维度)
  637. if args.dims:
  638. dimensions = args.dims
  639. else:
  640. dimensions = ["灵感点", "目的点", "关键点"]
  641. # 运行主函数(每个帖子内部会独立生成 trace)
  642. asyncio.run(main(
  643. post_id=args.post_id,
  644. target_name=args.target,
  645. num_targets=args.num,
  646. dimensions=dimensions,
  647. all_posts=args.all_posts
  648. ))