generate_visualize_data.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. #!/usr/bin/env python3
  2. """
  3. 生成推导可视化数据。
  4. 输入参数:account_name, post_id, log_id
  5. - 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表
  6. - 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取推导与评估 JSON,生成:
  7. 1. output/{account_name}/整体推导结果/{post_id}.json
  8. 2. output/{account_name}/整体推导路径可视化/{post_id}.json
  9. """
  10. import argparse
  11. import json
  12. import re
  13. from pathlib import Path
  14. from typing import Any
  15. def _collect_dimension_names(point_data: dict) -> dict[str, str]:
  16. """从点的 实质/形式/意图 中收集 名称 -> dimension。"""
  17. name_to_dim = {}
  18. if "实质" in point_data and point_data["实质"]:
  19. for key in ("具体元素", "具象概念", "抽象概念"):
  20. for item in (point_data["实质"].get(key) or []):
  21. n = item.get("名称")
  22. if n:
  23. name_to_dim[n] = "实质"
  24. if "形式" in point_data and point_data["形式"]:
  25. for key in ("具体元素形式", "具象概念形式", "整体形式"):
  26. for item in (point_data["形式"].get(key) or []):
  27. n = item.get("名称")
  28. if n:
  29. name_to_dim[n] = "形式"
  30. if point_data.get("意图"):
  31. for item in point_data["意图"]:
  32. n = item.get("名称")
  33. if n:
  34. name_to_dim[n] = "意图"
  35. return name_to_dim
  36. def parse_topic_points_from_deconstruct(deconstruct_path: Path) -> list[dict[str, Any]]:
  37. """
  38. 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表。
  39. 选题点来自分词结果中的「词」,字段:name, point, dimension, root_source, root_sources_desc。
  40. """
  41. if not deconstruct_path.exists():
  42. raise FileNotFoundError(f"解构内容文件不存在: {deconstruct_path}")
  43. with open(deconstruct_path, "r", encoding="utf-8") as f:
  44. data = json.load(f)
  45. result = []
  46. for point_type in ("灵感点", "目的点", "关键点"):
  47. for point in data.get(point_type) or []:
  48. root_source = point.get("点", "")
  49. root_sources_desc = point.get("点描述", "")
  50. name_to_dim = _collect_dimension_names(point)
  51. for word_item in point.get("分词结果") or []:
  52. name = word_item.get("词", "").strip()
  53. if not name:
  54. continue
  55. dimension = name_to_dim.get(name, "实质")
  56. result.append({
  57. "name": name,
  58. "point": point_type,
  59. "dimension": dimension,
  60. "root_source": root_source,
  61. "root_sources_desc": root_sources_desc,
  62. })
  63. return result
  64. def _topic_point_key(t: dict) -> tuple:
  65. return (t["name"], t["point"], t["dimension"])
  66. def load_derivation_logs(log_dir: Path) -> tuple[list[dict], list[dict]]:
  67. """
  68. 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取所有 {轮次}_推导.json 与 {轮次}_评估.json。
  69. 返回 (推导列表按轮次序, 评估列表按轮次序)。
  70. """
  71. if not log_dir.is_dir():
  72. raise FileNotFoundError(f"推导日志目录不存在: {log_dir}")
  73. derivation_by_round = {}
  74. eval_by_round = {}
  75. for p in log_dir.glob("*.json"):
  76. base = p.stem
  77. m = re.match(r"^(\d+)_(推导|评估)$", base)
  78. if not m:
  79. continue
  80. round_num = int(m.group(1))
  81. with open(p, "r", encoding="utf-8") as f:
  82. content = json.load(f)
  83. if m.group(2) == "推导":
  84. derivation_by_round[round_num] = content
  85. else:
  86. eval_by_round[round_num] = content
  87. rounds = sorted(set(derivation_by_round) | set(eval_by_round))
  88. derivations = [derivation_by_round[r] for r in rounds if r in derivation_by_round]
  89. evals = [eval_by_round[r] for r in rounds if r in eval_by_round]
  90. return derivations, evals
  91. def build_derivation_result(
  92. topic_points: list[dict],
  93. derivations: list[dict],
  94. evals: list[dict],
  95. ) -> list[dict]:
  96. """
  97. 生成整体推导结果:每轮 轮次、推导成功的选题点、未推导成功的选题点、本次新推导成功的选题点。
  98. 选题点用 topic_points 中的完整信息;按 name 判定是否被推导(评估中的 match_post_point)。
  99. 若之前推导成功的选题点 is_fully_derived=false,本轮变为 is_fully_derived=true,则算本次新推导成功的选题点,
  100. 且 matched_score、is_fully_derived 在本轮后更新为该轮评估值。
  101. 推导成功的选题点:使用当前已更新的 best (matched_score, is_fully_derived)。
  102. 本次新推导成功的选题点:用当轮评估的 matched_score、is_fully_derived。
  103. 未推导成功的选题点:不包含 matched_score、is_fully_derived。
  104. """
  105. all_keys = {_topic_point_key(t) for t in topic_points}
  106. topic_by_key = {_topic_point_key(t): t for t in topic_points}
  107. # 分轮次收集 (round_num, name) -> (matched_score, is_fully_derived),同一轮同名保留 matched_score 最高的
  108. score_by_round_name: dict[tuple[int, str], tuple[float, bool]] = {}
  109. for round_idx, eval_data in enumerate(evals):
  110. round_num = eval_data.get("round", round_idx + 1)
  111. for er in eval_data.get("eval_results") or []:
  112. if not (er.get("is_matched") is True or er.get("match_result") == "匹配"):
  113. continue
  114. mp = (er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "").strip()
  115. if not mp:
  116. continue
  117. score = er.get("matched_score")
  118. if score is None:
  119. score = 1.0
  120. else:
  121. try:
  122. score = float(score)
  123. except (TypeError, ValueError):
  124. score = 1.0
  125. is_fully = er.get("is_fully_derived", True)
  126. key = (round_num, mp)
  127. if key not in score_by_round_name or score > score_by_round_name[key][0]:
  128. score_by_round_name[key] = (score, bool(is_fully))
  129. result = []
  130. derived_names_so_far: set[str] = set()
  131. fully_derived_names_so_far: set[str] = set() # 已出现过 is_fully_derived=true 的选题点
  132. # name -> (matched_score, is_fully_derived),一旦 is_fully_derived=True,后续轮次不再更新 matched_score
  133. best_score_by_name: dict[str, tuple[float, bool]] = {}
  134. for i, (derivation, eval_data) in enumerate(zip(derivations, evals)):
  135. round_num = derivation.get("round", i + 1)
  136. eval_results = eval_data.get("eval_results") or []
  137. matched_post_points = set()
  138. for er in eval_results:
  139. if not (er.get("is_matched") is True or er.get("match_result") == "匹配"):
  140. continue
  141. mp = er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or ""
  142. if mp and str(mp).strip():
  143. matched_post_points.add(str(mp).strip())
  144. # 本轮每个匹配名的 (score, is_fully)
  145. this_round_scores: dict[str, tuple[float, bool]] = {}
  146. for name in matched_post_points:
  147. val = score_by_round_name.get((round_num, name))
  148. if val is not None:
  149. this_round_scores[name] = val
  150. # 本次新推导成功:首次匹配 或 之前 is_fully=false 且本轮 is_fully=true
  151. new_derived_names = set()
  152. for name in matched_post_points:
  153. score, is_fully = this_round_scores.get(name, (None, False))
  154. if name not in derived_names_so_far:
  155. new_derived_names.add(name)
  156. elif name not in fully_derived_names_so_far and is_fully:
  157. new_derived_names.add(name)
  158. # 更新推导集合与 best:
  159. # - 首次出现时写入
  160. # - 若尚未 fully 且本轮 fully,则更新为 fully,并锁定,不再被后续轮次覆盖
  161. # - 若尚未 fully 且本轮仍为部分推导,可用更高分数更新
  162. derived_names_so_far |= matched_post_points
  163. for name in matched_post_points:
  164. val = this_round_scores.get(name)
  165. if val is None:
  166. continue
  167. score, is_fully = val
  168. if name not in best_score_by_name:
  169. best_score_by_name[name] = (score, is_fully)
  170. else:
  171. prev_score, prev_fully = best_score_by_name[name]
  172. # 已经 fully 的节点,后续轮次不再更新 matched_score
  173. if prev_fully:
  174. pass
  175. else:
  176. if is_fully:
  177. best_score_by_name[name] = (score, True)
  178. else:
  179. # 都是部分推导时,可以用更高分覆盖
  180. if score > prev_score:
  181. best_score_by_name[name] = (score, False)
  182. if is_fully:
  183. fully_derived_names_so_far.add(name)
  184. derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in derived_names_so_far}
  185. new_derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in new_derived_names}
  186. not_derived_keys = all_keys - derived_keys
  187. sort_derived = sorted(derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))
  188. sort_new = sorted(new_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))
  189. sort_not = sorted(not_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))
  190. def add_score_fields(keys: set, sort_keys: list, round_for_score: int | None) -> list[dict]:
  191. """round_for_score: 用该轮评估的分数;若为 None 则不添加 score 字段。"""
  192. out = []
  193. for k in sort_keys:
  194. if k not in keys:
  195. continue
  196. obj = dict(topic_by_key[k])
  197. if round_for_score is not None:
  198. name = obj.get("name", "")
  199. val = score_by_round_name.get((round_for_score, name))
  200. if val is not None:
  201. obj["matched_score"] = val[0]
  202. obj["is_fully_derived"] = val[1]
  203. else:
  204. obj["matched_score"] = None
  205. obj["is_fully_derived"] = False
  206. out.append(obj)
  207. return out
  208. # 推导成功的选题点:用当前已更新的 best (matched_score, is_fully_derived)
  209. derived_list = []
  210. for k in sort_derived:
  211. if k not in derived_keys:
  212. continue
  213. obj = dict(topic_by_key[k])
  214. name = obj.get("name", "")
  215. val = best_score_by_name.get(name)
  216. if val is not None:
  217. obj["matched_score"] = val[0]
  218. obj["is_fully_derived"] = val[1]
  219. else:
  220. obj["matched_score"] = None
  221. obj["is_fully_derived"] = False
  222. derived_list.append(obj)
  223. new_list = add_score_fields(new_derived_keys, sort_new, round_for_score=round_num)
  224. not_derived_list = [dict(topic_by_key[k]) for k in sort_not] # 不带 matched_score、is_fully_derived
  225. result.append({
  226. "轮次": round_num,
  227. "推导成功的选题点": derived_list,
  228. "未推导成功的选题点": not_derived_list,
  229. "本次新推导成功的选题点": new_list,
  230. })
  231. return result
  232. def _tree_node_display_name(raw: str) -> str:
  233. """人设节点可能是 a.b.c 路径形式,实际需要的是最后一段节点名 c。"""
  234. s = (raw or "").strip()
  235. if "." in s:
  236. return s.rsplit(".", 1)[-1].strip() or s
  237. return s
  238. def _to_tree_node(name: str, extra: dict | None = None) -> dict:
  239. d = {"name": name}
  240. if extra:
  241. d.update(extra)
  242. return d
  243. def _to_pattern_node(pattern_name: str) -> dict:
  244. """将 pattern 字符串转为 input_pattern_nodes 的一项(简化版)。"""
  245. items = [x.strip() for x in pattern_name.replace("+", " ").split() if x.strip()]
  246. return {
  247. "items": [{"name": x, "point": "关键点", "dimension": "形式", "type": "标签"} for x in items],
  248. "match_items": items,
  249. }
  250. def build_visualize_edges(
  251. derivations: list[dict],
  252. evals: list[dict],
  253. topic_points: list[dict],
  254. ) -> tuple[list[dict], list[dict]]:
  255. """
  256. 生成 node_list(所有评估通过的帖子选题点)和 edge_list(只保留评估通过的推导路径)。
  257. - node_list:同一轮内节点不重复,重复时保留 matched_score 更高的;节点带 matched_score、is_fully_derived。
  258. - edge_list:边带 level(与 output 节点 level 一致);同一轮内 output 节点不重复;若前面轮次该节点匹配分更高则本轮不保留该节点。
  259. 评估数据支持 path_id(对应推导 derivation_results[].id)、item_id(output 中元素从 1 起的序号)、matched_score、is_fully_derived。
  260. """
  261. derivations = sorted(derivations, key=lambda d: d.get("round", 0))
  262. evals = sorted(evals, key=lambda e: e.get("round", 0))
  263. topic_by_name = {t["name"]: t for t in topic_points}
  264. # 评估匹配:(round_num, path_id, item_id) -> (matched_post_point, matched_reason, matched_score, is_fully_derived)
  265. # path_id = 推导中 derivation_results[].id,item_id = output 中元素从 1 起的序号
  266. match_by_path_item: dict[tuple[int, int, int], tuple[str, str, float, bool]] = {}
  267. match_by_round_output: dict[tuple[int, str], tuple[str, str, float, bool]] = {} # 兼容无 path_id/item_id
  268. for round_idx, eval_data in enumerate(evals):
  269. round_num = eval_data.get("round", round_idx + 1)
  270. for er in eval_data.get("eval_results") or []:
  271. if not (er.get("is_matched") is True or er.get("match_result") == "匹配"):
  272. continue
  273. mp = (er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "").strip()
  274. if not mp:
  275. continue
  276. out_point = (er.get("derivation_output_point") or "").strip()
  277. reason = (er.get("matched_reason") or er.get("match_reason") or "").strip()
  278. score = er.get("matched_score")
  279. if score is None:
  280. score = 1.0
  281. else:
  282. try:
  283. score = float(score)
  284. except (TypeError, ValueError):
  285. score = 1.0
  286. is_fully = er.get("is_fully_derived", True)
  287. val = (mp, reason, score, bool(is_fully))
  288. path_id = er.get("path_id")
  289. item_id = er.get("item_id")
  290. if path_id is not None and item_id is not None:
  291. try:
  292. match_by_path_item[(round_num, int(path_id), int(item_id))] = val
  293. except (TypeError, ValueError):
  294. pass
  295. if out_point:
  296. k = (round_num, out_point)
  297. if k not in match_by_round_output:
  298. match_by_round_output[k] = val
  299. def get_match(round_num: int, path_id: int | None, item_id: int | None, out_item: str) -> tuple[str, str, float, bool] | None:
  300. if path_id is not None and item_id is not None:
  301. v = match_by_path_item.get((round_num, path_id, item_id))
  302. if v is not None:
  303. return v
  304. return match_by_round_output.get((round_num, out_item))
  305. # 第一遍:按 (round_num, mp) 聚合节点最佳信息(不考虑边是否最终保留)
  306. # (round_num, mp) -> (score, is_fully_derived, derivation_output_point, method)
  307. best_node_info_by_round_mp: dict[tuple[int, str], tuple[float, bool, str, str]] = {}
  308. for round_idx, derivation in enumerate(derivations):
  309. round_num = derivation.get("round", round_idx + 1)
  310. for dr in derivation.get("derivation_results") or []:
  311. output_list = dr.get("output") or []
  312. path_id = dr.get("id")
  313. for i, out_item in enumerate(output_list):
  314. item_id = i + 1
  315. v = get_match(round_num, path_id, item_id, out_item)
  316. if not v:
  317. continue
  318. mp, _reason, score, is_fully = v
  319. key = (round_num, mp)
  320. prev = best_node_info_by_round_mp.get(key)
  321. if prev is None or score > prev[0]:
  322. best_node_info_by_round_mp[key] = (score, bool(is_fully), out_item, dr.get("method", ""))
  323. edge_list = []
  324. round_output_seen: set[tuple[int, str]] = set() # (round_num, node_name) 本轮已作为某边的 output
  325. prev_best_by_node: dict[str, tuple[float, bool]] = {} # node_name -> (score, is_fully) of last included round
  326. for round_idx, derivation in enumerate(derivations):
  327. round_num = derivation.get("round", round_idx + 1)
  328. for dr in derivation.get("derivation_results") or []:
  329. output_list = dr.get("output") or []
  330. path_id = dr.get("id")
  331. matched: list[tuple[str, str, float, bool, str]] = [] # (mp, reason, score, is_fully, derivation_out)
  332. for i, out_item in enumerate(output_list):
  333. item_id = i + 1
  334. v = get_match(round_num, path_id, item_id, out_item)
  335. if not v:
  336. continue
  337. mp, reason, score, is_fully = v
  338. matched.append((mp, reason, score, is_fully, out_item))
  339. if not matched:
  340. continue
  341. # 同一轮内 output 节点不重复;若前面轮次该节点已完全推导,或分数未提升且未从 false 变 true,则本轮跳过;
  342. # 并且只保留与 node_list 中该轮该节点的最高分记录一致的边
  343. output_names_this_edge = []
  344. for mp, reason, score, is_fully, out_item in matched:
  345. if (round_num, mp) in round_output_seen:
  346. continue
  347. prev = prev_best_by_node.get(mp)
  348. if prev is not None:
  349. prev_score, prev_fully = prev
  350. if prev_fully:
  351. continue
  352. if not is_fully and score <= prev_score:
  353. continue
  354. best_info = best_node_info_by_round_mp.get((round_num, mp))
  355. if not best_info or score < best_info[0]:
  356. continue
  357. output_names_this_edge.append((mp, reason, score, is_fully, out_item))
  358. if not output_names_this_edge:
  359. continue
  360. for mp, _r, score, is_fully, _o in output_names_this_edge:
  361. round_output_seen.add((round_num, mp))
  362. prev = prev_best_by_node.get(mp)
  363. if prev is None or (not prev[1] and (is_fully or score > prev[0])):
  364. prev_best_by_node[mp] = (score, is_fully)
  365. input_data = dr.get("input") or {}
  366. derived_nodes = input_data.get("derived_nodes") or []
  367. tree_nodes = input_data.get("tree_nodes") or []
  368. patterns = input_data.get("patterns") or []
  369. input_post_nodes = [{"name": x} for x in derived_nodes]
  370. input_tree_nodes = [_to_tree_node(_tree_node_display_name(x)) for x in tree_nodes]
  371. if patterns and isinstance(patterns[0], str):
  372. input_pattern_nodes = [_to_pattern_node(p) for p in patterns]
  373. elif patterns and isinstance(patterns[0], dict):
  374. input_pattern_nodes = patterns
  375. else:
  376. input_pattern_nodes = []
  377. output_nodes = []
  378. reasons_list = []
  379. derivation_points_list = []
  380. for mp, reason, score, is_fully, out_item in output_names_this_edge:
  381. output_nodes.append({"name": mp, "matched_score": score, "is_fully_derived": is_fully})
  382. reasons_list.append(reason)
  383. derivation_points_list.append(out_item)
  384. detail = {
  385. "reason": dr.get("reason", ""),
  386. "评估结果": "匹配成功",
  387. }
  388. if any(reasons_list):
  389. detail["匹配理由"] = reasons_list
  390. detail["待比对的推导选题点"] = derivation_points_list
  391. if dr.get("tools"):
  392. detail["tools"] = dr["tools"]
  393. edge_list.append({
  394. "name": dr.get("method", "") or f"推导-{round_num}",
  395. "level": round_num,
  396. "input_post_nodes": input_post_nodes,
  397. "input_tree_nodes": input_tree_nodes,
  398. "input_pattern_nodes": input_pattern_nodes,
  399. "output_nodes": output_nodes,
  400. "detail": detail,
  401. })
  402. # 根据按 (round, mp) 聚合后的最佳信息生成 node_list
  403. # 规则:节点首次出现保留;is_fully_derived 从 false 变 true 时保留;
  404. # is_fully_derived=false 且分数高于之前已保留轮次时保留;其余情况跳过
  405. prev_node_best: dict[str, tuple[float, bool]] = {} # mp -> (score, is_fully) of last included round
  406. node_list: list[dict] = []
  407. for (round_num, mp), (score, is_fully, out_item, method) in sorted(
  408. best_node_info_by_round_mp.items(), key=lambda x: (x[0][0], x[0][1])
  409. ):
  410. prev = prev_node_best.get(mp)
  411. if prev is None:
  412. should_include = True
  413. else:
  414. prev_score, prev_fully = prev
  415. if prev_fully:
  416. should_include = False
  417. elif is_fully:
  418. should_include = True
  419. elif score > prev_score:
  420. should_include = True
  421. else:
  422. should_include = False
  423. if not should_include:
  424. continue
  425. prev_node_best[mp] = (score, is_fully)
  426. base = dict(topic_by_name.get(mp, {"name": mp, "point": "", "dimension": "", "root_source": "", "root_sources_desc": ""}))
  427. base["level"] = round_num
  428. base.setdefault("original_word", base.get("name", mp))
  429. base["derivation_type"] = method
  430. base["matched_score"] = score
  431. base["is_fully_derived"] = is_fully
  432. base["derivation_output_point"] = out_item
  433. node_list.append(base)
  434. node_list.sort(key=lambda n: (n.get("level", 0), str(n.get("name", ""))))
  435. return node_list, edge_list
  436. def _find_project_root() -> Path:
  437. """从脚本所在目录向上查找包含 .git 的项目根目录。"""
  438. p = Path(__file__).resolve().parent
  439. while p != p.parent:
  440. if (p / ".git").is_dir():
  441. return p
  442. p = p.parent
  443. return Path(__file__).resolve().parent
  444. def generate_visualize_data(account_name: str, post_id: str, log_id: str, base_dir: Path | None = None) -> None:
  445. """
  446. 主流程:读取解构内容与推导日志,生成整体推导结果与整体推导路径可视化两个 JSON。
  447. base_dir 默认为脚本所在目录;若其下 output/.../推导日志 不存在,则尝试项目根目录下的 output/...(兼容从项目根运行)。
  448. """
  449. if base_dir is None:
  450. base_dir = Path(__file__).resolve().parent
  451. input_dir = base_dir / "input" / account_name / "原始数据" / "解构内容"
  452. log_dir = base_dir / "output" / account_name / "推导日志" / post_id / log_id
  453. result_dir = base_dir / "output" / account_name / "整体推导结果"
  454. visualize_dir = base_dir / "output" / account_name / "整体推导路径可视化"
  455. # 兼容:若推导日志不在 base_dir 下,尝试项目根目录下的 output/
  456. if not log_dir.is_dir():
  457. project_root = _find_project_root()
  458. if project_root != base_dir:
  459. alt_log = project_root / "output" / account_name / "推导日志" / post_id / log_id
  460. if alt_log.is_dir():
  461. log_dir = alt_log
  462. result_dir = project_root / "output" / account_name / "整体推导结果"
  463. visualize_dir = project_root / "output" / account_name / "整体推导路径可视化"
  464. deconstruct_path = input_dir / f"{post_id}.json"
  465. topic_points = parse_topic_points_from_deconstruct(deconstruct_path)
  466. derivations, evals = load_derivation_logs(log_dir)
  467. if not derivations or not evals:
  468. raise ValueError(f"推导或评估数据为空: {log_dir}")
  469. # 2.1 整体推导结果
  470. derivation_result = build_derivation_result(topic_points, derivations, evals)
  471. result_dir.mkdir(parents=True, exist_ok=True)
  472. result_path = result_dir / f"{post_id}.json"
  473. with open(result_path, "w", encoding="utf-8") as f:
  474. json.dump(derivation_result, f, ensure_ascii=False, indent=4)
  475. print(f"已写入整体推导结果: {result_path}")
  476. # 2.2 整体推导路径可视化
  477. node_list, edge_list = build_visualize_edges(derivations, evals, topic_points)
  478. visualize_path = visualize_dir / f"{post_id}.json"
  479. visualize_dir.mkdir(parents=True, exist_ok=True)
  480. with open(visualize_path, "w", encoding="utf-8") as f:
  481. json.dump({"node_list": node_list, "edge_list": edge_list}, f, ensure_ascii=False, indent=4)
  482. print(f"已写入整体推导路径可视化: {visualize_path}")
  483. def main(account_name, post_id, log_id):
  484. # parser = argparse.ArgumentParser(description="生成推导可视化数据")
  485. # parser.add_argument("account_name", help="账号名,如 家有大志")
  486. # parser.add_argument("post_id", help="帖子 ID")
  487. # parser.add_argument("log_id", help="推导日志 ID,如 20260303204232")
  488. # parser.add_argument("--base-dir", type=Path, default=None, help="项目根目录,默认为本脚本所在目录")
  489. # args = parser.parse_args()
  490. generate_visualize_data(account_name=account_name, post_id=post_id, log_id=log_id)
  491. if __name__ == "__main__":
  492. account_name="家有大志"
  493. items = [
  494. {"post_id":"68fb6a5c000000000302e5de","log_id":"20260319134630"},
  495. {"post_id":"69185d49000000000d00f94e","log_id":"20260319140603"},
  496. {"post_id":"6921937a000000001b0278d1","log_id":"20260319141843"}
  497. ]
  498. for item in items:
  499. post_id = item["post_id"]
  500. log_id = item["log_id"]
  501. main(account_name, post_id, log_id)