|
|
@@ -0,0 +1,310 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+"""
|
|
|
+生成推导可视化数据。
|
|
|
+
|
|
|
+输入参数:account_name, post_id, log_id
|
|
|
+- 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表
|
|
|
+- 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取推导与评估 JSON,生成:
|
|
|
+ 1. output/{account_name}/整体推导结果/{post_id}.json
|
|
|
+ 2. output/{account_name}/整体推导路径可视化/{post_id}.json
|
|
|
+"""
|
|
|
+
|
|
|
+import argparse
|
|
|
+import json
|
|
|
+import re
|
|
|
+from pathlib import Path
|
|
|
+from typing import Any
|
|
|
+
|
|
|
+
|
|
|
+def _collect_dimension_names(point_data: dict) -> dict[str, str]:
|
|
|
+ """从点的 实质/形式/意图 中收集 名称 -> dimension。"""
|
|
|
+ name_to_dim = {}
|
|
|
+ if "实质" in point_data and point_data["实质"]:
|
|
|
+ for key in ("具体元素", "具象概念", "抽象概念"):
|
|
|
+ for item in (point_data["实质"].get(key) or []):
|
|
|
+ n = item.get("名称")
|
|
|
+ if n:
|
|
|
+ name_to_dim[n] = "实质"
|
|
|
+ if "形式" in point_data and point_data["形式"]:
|
|
|
+ for key in ("具体元素形式", "具象概念形式", "整体形式"):
|
|
|
+ for item in (point_data["形式"].get(key) or []):
|
|
|
+ n = item.get("名称")
|
|
|
+ if n:
|
|
|
+ name_to_dim[n] = "形式"
|
|
|
+ if point_data.get("意图"):
|
|
|
+ for item in point_data["意图"]:
|
|
|
+ n = item.get("名称")
|
|
|
+ if n:
|
|
|
+ name_to_dim[n] = "意图"
|
|
|
+ return name_to_dim
|
|
|
+
|
|
|
+
|
|
|
+def parse_topic_points_from_deconstruct(deconstruct_path: Path) -> list[dict[str, Any]]:
|
|
|
+ """
|
|
|
+ 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表。
|
|
|
+ 选题点来自分词结果中的「词」,字段:name, point, dimension, root_source, root_sources_desc。
|
|
|
+ """
|
|
|
+ if not deconstruct_path.exists():
|
|
|
+ raise FileNotFoundError(f"解构内容文件不存在: {deconstruct_path}")
|
|
|
+ with open(deconstruct_path, "r", encoding="utf-8") as f:
|
|
|
+ data = json.load(f)
|
|
|
+
|
|
|
+ result = []
|
|
|
+ for point_type in ("灵感点", "目的点", "关键点"):
|
|
|
+ for point in data.get(point_type) or []:
|
|
|
+ root_source = point.get("点", "")
|
|
|
+ root_sources_desc = point.get("点描述", "")
|
|
|
+ name_to_dim = _collect_dimension_names(point)
|
|
|
+ for word_item in point.get("分词结果") or []:
|
|
|
+ name = word_item.get("词", "").strip()
|
|
|
+ if not name:
|
|
|
+ continue
|
|
|
+ dimension = name_to_dim.get(name, "实质")
|
|
|
+ result.append({
|
|
|
+ "name": name,
|
|
|
+ "point": point_type,
|
|
|
+ "dimension": dimension,
|
|
|
+ "root_source": root_source,
|
|
|
+ "root_sources_desc": root_sources_desc,
|
|
|
+ })
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+def _topic_point_key(t: dict) -> tuple:
|
|
|
+ return (t["name"], t["point"], t["dimension"])
|
|
|
+
|
|
|
+
|
|
|
+def load_derivation_logs(log_dir: Path) -> tuple[list[dict], list[dict]]:
|
|
|
+ """
|
|
|
+ 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取所有 {轮次}_推导.json 与 {轮次}_评估.json。
|
|
|
+ 返回 (推导列表按轮次序, 评估列表按轮次序)。
|
|
|
+ """
|
|
|
+ if not log_dir.is_dir():
|
|
|
+ raise FileNotFoundError(f"推导日志目录不存在: {log_dir}")
|
|
|
+
|
|
|
+ derivation_by_round = {}
|
|
|
+ eval_by_round = {}
|
|
|
+ for p in log_dir.glob("*.json"):
|
|
|
+ base = p.stem
|
|
|
+ m = re.match(r"^(\d+)_(推导|评估)$", base)
|
|
|
+ if not m:
|
|
|
+ continue
|
|
|
+ round_num = int(m.group(1))
|
|
|
+ with open(p, "r", encoding="utf-8") as f:
|
|
|
+ content = json.load(f)
|
|
|
+ if m.group(2) == "推导":
|
|
|
+ derivation_by_round[round_num] = content
|
|
|
+ else:
|
|
|
+ eval_by_round[round_num] = content
|
|
|
+
|
|
|
+ rounds = sorted(set(derivation_by_round) | set(eval_by_round))
|
|
|
+ derivations = [derivation_by_round[r] for r in rounds if r in derivation_by_round]
|
|
|
+ evals = [eval_by_round[r] for r in rounds if r in eval_by_round]
|
|
|
+ return derivations, evals
|
|
|
+
|
|
|
+
|
|
|
+def build_derivation_result(
|
|
|
+ topic_points: list[dict],
|
|
|
+ derivations: list[dict],
|
|
|
+ evals: list[dict],
|
|
|
+) -> list[dict]:
|
|
|
+ """
|
|
|
+ 生成整体推导结果:每轮 轮次、推导成功的选题点、未推导成功的选题点、本次新推导成功的选题点。
|
|
|
+ 选题点用 topic_points 中的完整信息;按 name 判定是否被推导(评估中的 match_post_point)。
|
|
|
+ """
|
|
|
+ all_keys = {_topic_point_key(t) for t in topic_points}
|
|
|
+ topic_by_key = {_topic_point_key(t): t for t in topic_points}
|
|
|
+
|
|
|
+ result = []
|
|
|
+ derived_names_so_far: set[str] = set()
|
|
|
+
|
|
|
+ for i, (derivation, eval_data) in enumerate(zip(derivations, evals)):
|
|
|
+ round_num = derivation.get("round", i + 1)
|
|
|
+ eval_results = eval_data.get("eval_results") or []
|
|
|
+ matched_post_points = set()
|
|
|
+ for er in eval_results:
|
|
|
+ if er.get("match_result") != "匹配":
|
|
|
+ continue
|
|
|
+ mp = (er.get("match_post_point") or "").strip()
|
|
|
+ if mp:
|
|
|
+ matched_post_points.add(mp)
|
|
|
+
|
|
|
+ new_derived_names = matched_post_points - derived_names_so_far
|
|
|
+ derived_names_so_far |= matched_post_points
|
|
|
+
|
|
|
+ # 推导成功的选题点:name 在 derived_names_so_far 中的选题点(每 name 取一条,与 topic_points 顺序一致)
|
|
|
+ derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in derived_names_so_far}
|
|
|
+ new_derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in new_derived_names}
|
|
|
+ not_derived_keys = all_keys - derived_keys
|
|
|
+
|
|
|
+ derived_list = [dict(topic_by_key[k]) for k in sorted(derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
|
|
|
+ new_list = [dict(topic_by_key[k]) for k in sorted(new_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
|
|
|
+ not_derived_list = [dict(topic_by_key[k]) for k in sorted(not_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
|
|
|
+
|
|
|
+ result.append({
|
|
|
+ "轮次": round_num,
|
|
|
+ "推导成功的选题点": derived_list,
|
|
|
+ "未推导成功的选题点": not_derived_list,
|
|
|
+ "本次新推导成功的选题点": new_list,
|
|
|
+ })
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+def _to_tree_node(name: str, extra: dict | None = None) -> dict:
|
|
|
+ d = {"name": name}
|
|
|
+ if extra:
|
|
|
+ d.update(extra)
|
|
|
+ return d
|
|
|
+
|
|
|
+
|
|
|
+def _to_pattern_node(pattern_name: str) -> dict:
|
|
|
+ """将 pattern 字符串转为 input_pattern_nodes 的一项(简化版)。"""
|
|
|
+ items = [x.strip() for x in pattern_name.replace("+", " ").split() if x.strip()]
|
|
|
+ return {
|
|
|
+ "items": [{"name": x, "point": "关键点", "dimension": "形式", "type": "标签"} for x in items],
|
|
|
+ "match_items": items,
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+def build_visualize_edges(
|
|
|
+ derivations: list[dict],
|
|
|
+ evals: list[dict],
|
|
|
+ topic_points: list[dict],
|
|
|
+) -> tuple[list[dict], list[dict]]:
|
|
|
+ """
|
|
|
+ 生成 node_list(所有评估通过的帖子选题点)和 edge_list(只保留评估通过的推导路径)。
|
|
|
+ """
|
|
|
+ topic_by_name = {}
|
|
|
+ for t in topic_points:
|
|
|
+ name = t["name"]
|
|
|
+ if name not in topic_by_name:
|
|
|
+ topic_by_name[name] = t
|
|
|
+
|
|
|
+ derivation_output_to_match = {}
|
|
|
+ for eval_data in evals:
|
|
|
+ for er in eval_data.get("eval_results") or []:
|
|
|
+ if er.get("match_result") != "匹配":
|
|
|
+ continue
|
|
|
+ out_point = (er.get("derivation_output_point") or "").strip()
|
|
|
+ match_point = (er.get("match_post_point") or "").strip()
|
|
|
+ if out_point and match_point:
|
|
|
+ derivation_output_to_match[out_point] = {
|
|
|
+ "match_post_point": match_point,
|
|
|
+ "match_reason": er.get("match_reason", ""),
|
|
|
+ "eval": er,
|
|
|
+ }
|
|
|
+
|
|
|
+ node_list = []
|
|
|
+ seen_nodes = set()
|
|
|
+ edge_list = []
|
|
|
+ level_by_name = {}
|
|
|
+
|
|
|
+ for round_idx, derivation in enumerate(derivations):
|
|
|
+ round_num = derivation.get("round", round_idx + 1)
|
|
|
+ for dr in derivation.get("derivation_results") or []:
|
|
|
+ output_list = dr.get("output") or []
|
|
|
+ matched_outputs = []
|
|
|
+ for out_item in output_list:
|
|
|
+ info = derivation_output_to_match.get(out_item)
|
|
|
+ if not info:
|
|
|
+ continue
|
|
|
+ mp = info["match_post_point"]
|
|
|
+ if not mp:
|
|
|
+ continue
|
|
|
+ matched_outputs.append(mp)
|
|
|
+ if mp not in seen_nodes:
|
|
|
+ seen_nodes.add(mp)
|
|
|
+ node = dict(topic_by_name.get(mp, {"name": mp, "point": "", "dimension": "", "root_source": "", "root_sources_desc": ""}))
|
|
|
+ node["level"] = round_num
|
|
|
+ if "original_word" not in node:
|
|
|
+ node["original_word"] = node.get("name", mp)
|
|
|
+ node["derivation_type"] = dr.get("method", "")
|
|
|
+ level_by_name[mp] = round_num
|
|
|
+ node_list.append(node)
|
|
|
+
|
|
|
+ if not matched_outputs:
|
|
|
+ continue
|
|
|
+
|
|
|
+ input_data = dr.get("input") or {}
|
|
|
+ derived_nodes = input_data.get("derived_nodes") or []
|
|
|
+ tree_nodes = input_data.get("tree_nodes") or []
|
|
|
+ patterns = input_data.get("patterns") or []
|
|
|
+
|
|
|
+ input_post_nodes = [{"name": x} for x in derived_nodes]
|
|
|
+ input_tree_nodes = [_to_tree_node(x) for x in tree_nodes]
|
|
|
+ if patterns and isinstance(patterns[0], str):
|
|
|
+ input_pattern_nodes = [_to_pattern_node(p) for p in patterns]
|
|
|
+ elif patterns and isinstance(patterns[0], dict):
|
|
|
+ input_pattern_nodes = patterns
|
|
|
+ else:
|
|
|
+ input_pattern_nodes = []
|
|
|
+
|
|
|
+ output_nodes = [{"name": x} for x in matched_outputs]
|
|
|
+ detail = {
|
|
|
+ "reason": dr.get("reason", ""),
|
|
|
+ "评估结果": "匹配成功",
|
|
|
+ }
|
|
|
+ if dr.get("tools"):
|
|
|
+ detail["tools"] = dr["tools"]
|
|
|
+ edge_list.append({
|
|
|
+ "name": dr.get("method", "") or f"推导-{round_num}",
|
|
|
+ "input_post_nodes": input_post_nodes,
|
|
|
+ "input_tree_nodes": input_tree_nodes,
|
|
|
+ "input_pattern_nodes": input_pattern_nodes,
|
|
|
+ "output_nodes": output_nodes,
|
|
|
+ "detail": detail,
|
|
|
+ })
|
|
|
+
|
|
|
+ return node_list, edge_list
|
|
|
+
|
|
|
+
|
|
|
+def generate_visualize_data(account_name: str, post_id: str, log_id: str, base_dir: Path | None = None) -> None:
|
|
|
+ """
|
|
|
+ 主流程:读取解构内容与推导日志,生成整体推导结果与整体推导路径可视化两个 JSON。
|
|
|
+ """
|
|
|
+ if base_dir is None:
|
|
|
+ base_dir = Path(__file__).resolve().parent
|
|
|
+ input_dir = base_dir / "input" / account_name / "解构内容"
|
|
|
+ log_dir = base_dir / "output" / account_name / "推导日志" / post_id / log_id
|
|
|
+ result_dir = base_dir / "output" / account_name / "整体推导结果"
|
|
|
+ visualize_dir = base_dir / "output" / account_name / "整体推导路径可视化"
|
|
|
+
|
|
|
+ deconstruct_path = input_dir / f"{post_id}.json"
|
|
|
+ topic_points = parse_topic_points_from_deconstruct(deconstruct_path)
|
|
|
+
|
|
|
+ derivations, evals = load_derivation_logs(log_dir)
|
|
|
+ if not derivations or not evals:
|
|
|
+ raise ValueError(f"推导或评估数据为空: {log_dir}")
|
|
|
+
|
|
|
+ # 2.1 整体推导结果
|
|
|
+ derivation_result = build_derivation_result(topic_points, derivations, evals)
|
|
|
+ result_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ result_path = result_dir / f"{post_id}.json"
|
|
|
+ with open(result_path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump(derivation_result, f, ensure_ascii=False, indent=4)
|
|
|
+ print(f"已写入整体推导结果: {result_path}")
|
|
|
+
|
|
|
+ # 2.2 整体推导路径可视化
|
|
|
+ node_list, edge_list = build_visualize_edges(derivations, evals, topic_points)
|
|
|
+ visualize_path = visualize_dir / f"{post_id}.json"
|
|
|
+ visualize_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ with open(visualize_path, "w", encoding="utf-8") as f:
|
|
|
+ json.dump({"node_list": node_list, "edge_list": edge_list}, f, ensure_ascii=False, indent=4)
|
|
|
+ print(f"已写入整体推导路径可视化: {visualize_path}")
|
|
|
+
|
|
|
+
|
|
|
+def main(account_name, post_id, log_id):
|
|
|
+ # parser = argparse.ArgumentParser(description="生成推导可视化数据")
|
|
|
+ # parser.add_argument("account_name", help="账号名,如 家有大志")
|
|
|
+ # parser.add_argument("post_id", help="帖子 ID")
|
|
|
+ # parser.add_argument("log_id", help="推导日志 ID,如 20260303204232")
|
|
|
+ # parser.add_argument("--base-dir", type=Path, default=None, help="项目根目录,默认为本脚本所在目录")
|
|
|
+ # args = parser.parse_args()
|
|
|
+ generate_visualize_data(account_name=account_name, post_id=post_id, log_id=log_id)
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ account_name="家有大志"
|
|
|
+ post_id = "68fb6a5c000000000302e5de"
|
|
|
+ log_id="20260303221927"
|
|
|
+ main(account_name, post_id, log_id)
|