generate_visualize_data.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. #!/usr/bin/env python3
  2. """
  3. 生成推导可视化数据。
  4. 输入参数:account_name, post_id, log_id
  5. - 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表
  6. - 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取推导与评估 JSON,生成:
  7. 1. output/{account_name}/整体推导结果/{post_id}.json
  8. 2. output/{account_name}/整体推导路径可视化/{post_id}.json
  9. """
  10. import argparse
  11. import json
  12. import re
  13. from pathlib import Path
  14. from typing import Any
  15. def _collect_dimension_names(point_data: dict) -> dict[str, str]:
  16. """从点的 实质/形式/意图 中收集 名称 -> dimension。"""
  17. name_to_dim = {}
  18. if "实质" in point_data and point_data["实质"]:
  19. for key in ("具体元素", "具象概念", "抽象概念"):
  20. for item in (point_data["实质"].get(key) or []):
  21. n = item.get("名称")
  22. if n:
  23. name_to_dim[n] = "实质"
  24. if "形式" in point_data and point_data["形式"]:
  25. for key in ("具体元素形式", "具象概念形式", "整体形式"):
  26. for item in (point_data["形式"].get(key) or []):
  27. n = item.get("名称")
  28. if n:
  29. name_to_dim[n] = "形式"
  30. if point_data.get("意图"):
  31. for item in point_data["意图"]:
  32. n = item.get("名称")
  33. if n:
  34. name_to_dim[n] = "意图"
  35. return name_to_dim
  36. def parse_topic_points_from_deconstruct(deconstruct_path: Path) -> list[dict[str, Any]]:
  37. """
  38. 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表。
  39. 选题点来自分词结果中的「词」,字段:name, point, dimension, root_source, root_sources_desc。
  40. """
  41. if not deconstruct_path.exists():
  42. raise FileNotFoundError(f"解构内容文件不存在: {deconstruct_path}")
  43. with open(deconstruct_path, "r", encoding="utf-8") as f:
  44. data = json.load(f)
  45. result = []
  46. for point_type in ("灵感点", "目的点", "关键点"):
  47. for point in data.get(point_type) or []:
  48. root_source = point.get("点", "")
  49. root_sources_desc = point.get("点描述", "")
  50. name_to_dim = _collect_dimension_names(point)
  51. for word_item in point.get("分词结果") or []:
  52. name = word_item.get("词", "").strip()
  53. if not name:
  54. continue
  55. dimension = name_to_dim.get(name, "实质")
  56. result.append({
  57. "name": name,
  58. "point": point_type,
  59. "dimension": dimension,
  60. "root_source": root_source,
  61. "root_sources_desc": root_sources_desc,
  62. })
  63. return result
  64. def _topic_point_key(t: dict) -> tuple:
  65. return (t["name"], t["point"], t["dimension"])
  66. def load_derivation_logs(log_dir: Path) -> tuple[list[dict], list[dict]]:
  67. """
  68. 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取所有 {轮次}_推导.json 与 {轮次}_评估.json。
  69. 返回 (推导列表按轮次序, 评估列表按轮次序)。
  70. """
  71. if not log_dir.is_dir():
  72. raise FileNotFoundError(f"推导日志目录不存在: {log_dir}")
  73. derivation_by_round = {}
  74. eval_by_round = {}
  75. for p in log_dir.glob("*.json"):
  76. base = p.stem
  77. m = re.match(r"^(\d+)_(推导|评估)$", base)
  78. if not m:
  79. continue
  80. round_num = int(m.group(1))
  81. with open(p, "r", encoding="utf-8") as f:
  82. content = json.load(f)
  83. if m.group(2) == "推导":
  84. derivation_by_round[round_num] = content
  85. else:
  86. eval_by_round[round_num] = content
  87. rounds = sorted(set(derivation_by_round) | set(eval_by_round))
  88. derivations = [derivation_by_round[r] for r in rounds if r in derivation_by_round]
  89. evals = [eval_by_round[r] for r in rounds if r in eval_by_round]
  90. return derivations, evals
  91. def build_derivation_result(
  92. topic_points: list[dict],
  93. derivations: list[dict],
  94. evals: list[dict],
  95. ) -> list[dict]:
  96. """
  97. 生成整体推导结果:每轮 轮次、推导成功的选题点、未推导成功的选题点、本次新推导成功的选题点。
  98. 选题点用 topic_points 中的完整信息;按 name 判定是否被推导(评估中的 match_post_point)。
  99. """
  100. all_keys = {_topic_point_key(t) for t in topic_points}
  101. topic_by_key = {_topic_point_key(t): t for t in topic_points}
  102. result = []
  103. derived_names_so_far: set[str] = set()
  104. for i, (derivation, eval_data) in enumerate(zip(derivations, evals)):
  105. round_num = derivation.get("round", i + 1)
  106. eval_results = eval_data.get("eval_results") or []
  107. matched_post_points = set()
  108. for er in eval_results:
  109. if er.get("match_result") != "匹配":
  110. continue
  111. mp = (er.get("match_post_point") or "").strip()
  112. if mp:
  113. matched_post_points.add(mp)
  114. new_derived_names = matched_post_points - derived_names_so_far
  115. derived_names_so_far |= matched_post_points
  116. # 推导成功的选题点:name 在 derived_names_so_far 中的选题点(每 name 取一条,与 topic_points 顺序一致)
  117. derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in derived_names_so_far}
  118. new_derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in new_derived_names}
  119. not_derived_keys = all_keys - derived_keys
  120. derived_list = [dict(topic_by_key[k]) for k in sorted(derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
  121. new_list = [dict(topic_by_key[k]) for k in sorted(new_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
  122. not_derived_list = [dict(topic_by_key[k]) for k in sorted(not_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
  123. result.append({
  124. "轮次": round_num,
  125. "推导成功的选题点": derived_list,
  126. "未推导成功的选题点": not_derived_list,
  127. "本次新推导成功的选题点": new_list,
  128. })
  129. return result
  130. def _to_tree_node(name: str, extra: dict | None = None) -> dict:
  131. d = {"name": name}
  132. if extra:
  133. d.update(extra)
  134. return d
  135. def _to_pattern_node(pattern_name: str) -> dict:
  136. """将 pattern 字符串转为 input_pattern_nodes 的一项(简化版)。"""
  137. items = [x.strip() for x in pattern_name.replace("+", " ").split() if x.strip()]
  138. return {
  139. "items": [{"name": x, "point": "关键点", "dimension": "形式", "type": "标签"} for x in items],
  140. "match_items": items,
  141. }
  142. def build_visualize_edges(
  143. derivations: list[dict],
  144. evals: list[dict],
  145. topic_points: list[dict],
  146. ) -> tuple[list[dict], list[dict]]:
  147. """
  148. 生成 node_list(所有评估通过的帖子选题点)和 edge_list(只保留评估通过的推导路径)。
  149. """
  150. topic_by_name = {}
  151. for t in topic_points:
  152. name = t["name"]
  153. if name not in topic_by_name:
  154. topic_by_name[name] = t
  155. derivation_output_to_match = {}
  156. for eval_data in evals:
  157. for er in eval_data.get("eval_results") or []:
  158. if er.get("match_result") != "匹配":
  159. continue
  160. out_point = (er.get("derivation_output_point") or "").strip()
  161. match_point = (er.get("match_post_point") or "").strip()
  162. if out_point and match_point:
  163. derivation_output_to_match[out_point] = {
  164. "match_post_point": match_point,
  165. "match_reason": er.get("match_reason", ""),
  166. "eval": er,
  167. }
  168. node_list = []
  169. seen_nodes = set()
  170. edge_list = []
  171. level_by_name = {}
  172. for round_idx, derivation in enumerate(derivations):
  173. round_num = derivation.get("round", round_idx + 1)
  174. for dr in derivation.get("derivation_results") or []:
  175. output_list = dr.get("output") or []
  176. matched_outputs = []
  177. for out_item in output_list:
  178. info = derivation_output_to_match.get(out_item)
  179. if not info:
  180. continue
  181. mp = info["match_post_point"]
  182. if not mp:
  183. continue
  184. matched_outputs.append(mp)
  185. if mp not in seen_nodes:
  186. seen_nodes.add(mp)
  187. node = dict(topic_by_name.get(mp, {"name": mp, "point": "", "dimension": "", "root_source": "", "root_sources_desc": ""}))
  188. node["level"] = round_num
  189. if "original_word" not in node:
  190. node["original_word"] = node.get("name", mp)
  191. node["derivation_type"] = dr.get("method", "")
  192. level_by_name[mp] = round_num
  193. node_list.append(node)
  194. if not matched_outputs:
  195. continue
  196. input_data = dr.get("input") or {}
  197. derived_nodes = input_data.get("derived_nodes") or []
  198. tree_nodes = input_data.get("tree_nodes") or []
  199. patterns = input_data.get("patterns") or []
  200. input_post_nodes = [{"name": x} for x in derived_nodes]
  201. input_tree_nodes = [_to_tree_node(x) for x in tree_nodes]
  202. if patterns and isinstance(patterns[0], str):
  203. input_pattern_nodes = [_to_pattern_node(p) for p in patterns]
  204. elif patterns and isinstance(patterns[0], dict):
  205. input_pattern_nodes = patterns
  206. else:
  207. input_pattern_nodes = []
  208. output_nodes = [{"name": x} for x in matched_outputs]
  209. detail = {
  210. "reason": dr.get("reason", ""),
  211. "评估结果": "匹配成功",
  212. }
  213. if dr.get("tools"):
  214. detail["tools"] = dr["tools"]
  215. edge_list.append({
  216. "name": dr.get("method", "") or f"推导-{round_num}",
  217. "input_post_nodes": input_post_nodes,
  218. "input_tree_nodes": input_tree_nodes,
  219. "input_pattern_nodes": input_pattern_nodes,
  220. "output_nodes": output_nodes,
  221. "detail": detail,
  222. })
  223. return node_list, edge_list
  224. def generate_visualize_data(account_name: str, post_id: str, log_id: str, base_dir: Path | None = None) -> None:
  225. """
  226. 主流程:读取解构内容与推导日志,生成整体推导结果与整体推导路径可视化两个 JSON。
  227. """
  228. if base_dir is None:
  229. base_dir = Path(__file__).resolve().parent
  230. input_dir = base_dir / "input" / account_name / "原始数据" / "解构内容"
  231. log_dir = base_dir / "output" / account_name / "推导日志" / post_id / log_id
  232. result_dir = base_dir / "output" / account_name / "整体推导结果"
  233. visualize_dir = base_dir / "output" / account_name / "整体推导路径可视化"
  234. deconstruct_path = input_dir / f"{post_id}.json"
  235. topic_points = parse_topic_points_from_deconstruct(deconstruct_path)
  236. derivations, evals = load_derivation_logs(log_dir)
  237. if not derivations or not evals:
  238. raise ValueError(f"推导或评估数据为空: {log_dir}")
  239. # 2.1 整体推导结果
  240. derivation_result = build_derivation_result(topic_points, derivations, evals)
  241. result_dir.mkdir(parents=True, exist_ok=True)
  242. result_path = result_dir / f"{post_id}.json"
  243. with open(result_path, "w", encoding="utf-8") as f:
  244. json.dump(derivation_result, f, ensure_ascii=False, indent=4)
  245. print(f"已写入整体推导结果: {result_path}")
  246. # 2.2 整体推导路径可视化
  247. node_list, edge_list = build_visualize_edges(derivations, evals, topic_points)
  248. visualize_path = visualize_dir / f"{post_id}.json"
  249. visualize_dir.mkdir(parents=True, exist_ok=True)
  250. with open(visualize_path, "w", encoding="utf-8") as f:
  251. json.dump({"node_list": node_list, "edge_list": edge_list}, f, ensure_ascii=False, indent=4)
  252. print(f"已写入整体推导路径可视化: {visualize_path}")
  253. def main(account_name, post_id, log_id):
  254. # parser = argparse.ArgumentParser(description="生成推导可视化数据")
  255. # parser.add_argument("account_name", help="账号名,如 家有大志")
  256. # parser.add_argument("post_id", help="帖子 ID")
  257. # parser.add_argument("log_id", help="推导日志 ID,如 20260303204232")
  258. # parser.add_argument("--base-dir", type=Path, default=None, help="项目根目录,默认为本脚本所在目录")
  259. # args = parser.parse_args()
  260. generate_visualize_data(account_name=account_name, post_id=post_id, log_id=log_id)
  261. if __name__ == "__main__":
  262. account_name="家有大志"
  263. post_id = "68fb6a5c000000000302e5de"
  264. log_id="20260304161832"
  265. main(account_name, post_id, log_id)