#!/usr/bin/env python3 """ 生成推导可视化数据。 输入参数:account_name, post_id, log_id - 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表 - 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取推导与评估 JSON,生成: 1. output/{account_name}/整体推导结果/{post_id}.json 2. output/{account_name}/整体推导路径可视化/{post_id}.json """ import argparse import json import re from pathlib import Path from typing import Any def _collect_dimension_names(point_data: dict) -> dict[str, str]: """从点的 实质/形式/意图 中收集 名称 -> dimension。""" name_to_dim = {} if "实质" in point_data and point_data["实质"]: for key in ("具体元素", "具象概念", "抽象概念"): for item in (point_data["实质"].get(key) or []): n = item.get("名称") if n: name_to_dim[n] = "实质" if "形式" in point_data and point_data["形式"]: for key in ("具体元素形式", "具象概念形式", "整体形式"): for item in (point_data["形式"].get(key) or []): n = item.get("名称") if n: name_to_dim[n] = "形式" if point_data.get("意图"): for item in point_data["意图"]: n = item.get("名称") if n: name_to_dim[n] = "意图" return name_to_dim def parse_topic_points_from_deconstruct(deconstruct_path: Path) -> list[dict[str, Any]]: """ 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表。 选题点来自分词结果中的「词」,字段:name, point, dimension, root_source, root_sources_desc。 """ if not deconstruct_path.exists(): raise FileNotFoundError(f"解构内容文件不存在: {deconstruct_path}") with open(deconstruct_path, "r", encoding="utf-8") as f: data = json.load(f) result = [] for point_type in ("灵感点", "目的点", "关键点"): for point in data.get(point_type) or []: root_source = point.get("点", "") root_sources_desc = point.get("点描述", "") name_to_dim = _collect_dimension_names(point) for word_item in point.get("分词结果") or []: name = word_item.get("词", "").strip() if not name: continue dimension = name_to_dim.get(name, "实质") result.append({ "name": name, "point": point_type, "dimension": dimension, "root_source": root_source, "root_sources_desc": root_sources_desc, }) return result def _topic_point_key(t: dict) -> tuple: return (t["name"], t["point"], t["dimension"]) def load_derivation_logs(log_dir: Path) -> tuple[list[dict], list[dict]]: """ 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取所有 {轮次}_推导.json 与 {轮次}_评估.json。 返回 (推导列表按轮次序, 评估列表按轮次序)。 """ if not log_dir.is_dir(): raise FileNotFoundError(f"推导日志目录不存在: {log_dir}") derivation_by_round = {} eval_by_round = {} for p in log_dir.glob("*.json"): base = p.stem m = re.match(r"^(\d+)_(推导|评估)$", base) if not m: continue round_num = int(m.group(1)) with open(p, "r", encoding="utf-8") as f: content = json.load(f) if m.group(2) == "推导": derivation_by_round[round_num] = content else: eval_by_round[round_num] = content rounds = sorted(set(derivation_by_round) | set(eval_by_round)) derivations = [derivation_by_round[r] for r in rounds if r in derivation_by_round] evals = [eval_by_round[r] for r in rounds if r in eval_by_round] return derivations, evals def build_derivation_result( topic_points: list[dict], derivations: list[dict], evals: list[dict], ) -> list[dict]: """ 生成整体推导结果:每轮 轮次、推导成功的选题点、未推导成功的选题点、本次新推导成功的选题点。 选题点用 topic_points 中的完整信息;按 name 判定是否被推导(评估中的 match_post_point)。 """ all_keys = {_topic_point_key(t) for t in topic_points} topic_by_key = {_topic_point_key(t): t for t in topic_points} result = [] derived_names_so_far: set[str] = set() for i, (derivation, eval_data) in enumerate(zip(derivations, evals)): round_num = derivation.get("round", i + 1) eval_results = eval_data.get("eval_results") or [] matched_post_points = set() for er in eval_results: if er.get("match_result") != "匹配": continue mp = (er.get("match_post_point") or "").strip() if mp: matched_post_points.add(mp) new_derived_names = matched_post_points - derived_names_so_far derived_names_so_far |= matched_post_points # 推导成功的选题点:name 在 derived_names_so_far 中的选题点(每 name 取一条,与 topic_points 顺序一致) derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in derived_names_so_far} new_derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in new_derived_names} not_derived_keys = all_keys - derived_keys derived_list = [dict(topic_by_key[k]) for k in sorted(derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))] new_list = [dict(topic_by_key[k]) for k in sorted(new_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))] not_derived_list = [dict(topic_by_key[k]) for k in sorted(not_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))] result.append({ "轮次": round_num, "推导成功的选题点": derived_list, "未推导成功的选题点": not_derived_list, "本次新推导成功的选题点": new_list, }) return result def _to_tree_node(name: str, extra: dict | None = None) -> dict: d = {"name": name} if extra: d.update(extra) return d def _to_pattern_node(pattern_name: str) -> dict: """将 pattern 字符串转为 input_pattern_nodes 的一项(简化版)。""" items = [x.strip() for x in pattern_name.replace("+", " ").split() if x.strip()] return { "items": [{"name": x, "point": "关键点", "dimension": "形式", "type": "标签"} for x in items], "match_items": items, } def build_visualize_edges( derivations: list[dict], evals: list[dict], topic_points: list[dict], ) -> tuple[list[dict], list[dict]]: """ 生成 node_list(所有评估通过的帖子选题点)和 edge_list(只保留评估通过的推导路径)。 """ topic_by_name = {} for t in topic_points: name = t["name"] if name not in topic_by_name: topic_by_name[name] = t derivation_output_to_match = {} for eval_data in evals: for er in eval_data.get("eval_results") or []: if er.get("match_result") != "匹配": continue out_point = (er.get("derivation_output_point") or "").strip() match_point = (er.get("match_post_point") or "").strip() if out_point and match_point: derivation_output_to_match[out_point] = { "match_post_point": match_point, "match_reason": er.get("match_reason", ""), "eval": er, } node_list = [] seen_nodes = set() edge_list = [] level_by_name = {} for round_idx, derivation in enumerate(derivations): round_num = derivation.get("round", round_idx + 1) for dr in derivation.get("derivation_results") or []: output_list = dr.get("output") or [] matched_outputs = [] for out_item in output_list: info = derivation_output_to_match.get(out_item) if not info: continue mp = info["match_post_point"] if not mp: continue matched_outputs.append(mp) if mp not in seen_nodes: seen_nodes.add(mp) node = dict(topic_by_name.get(mp, {"name": mp, "point": "", "dimension": "", "root_source": "", "root_sources_desc": ""})) node["level"] = round_num if "original_word" not in node: node["original_word"] = node.get("name", mp) node["derivation_type"] = dr.get("method", "") level_by_name[mp] = round_num node_list.append(node) if not matched_outputs: continue input_data = dr.get("input") or {} derived_nodes = input_data.get("derived_nodes") or [] tree_nodes = input_data.get("tree_nodes") or [] patterns = input_data.get("patterns") or [] input_post_nodes = [{"name": x} for x in derived_nodes] input_tree_nodes = [_to_tree_node(x) for x in tree_nodes] if patterns and isinstance(patterns[0], str): input_pattern_nodes = [_to_pattern_node(p) for p in patterns] elif patterns and isinstance(patterns[0], dict): input_pattern_nodes = patterns else: input_pattern_nodes = [] output_nodes = [{"name": x} for x in matched_outputs] detail = { "reason": dr.get("reason", ""), "评估结果": "匹配成功", } if dr.get("tools"): detail["tools"] = dr["tools"] edge_list.append({ "name": dr.get("method", "") or f"推导-{round_num}", "input_post_nodes": input_post_nodes, "input_tree_nodes": input_tree_nodes, "input_pattern_nodes": input_pattern_nodes, "output_nodes": output_nodes, "detail": detail, }) return node_list, edge_list def generate_visualize_data(account_name: str, post_id: str, log_id: str, base_dir: Path | None = None) -> None: """ 主流程:读取解构内容与推导日志,生成整体推导结果与整体推导路径可视化两个 JSON。 """ if base_dir is None: base_dir = Path(__file__).resolve().parent input_dir = base_dir / "input" / account_name / "原始数据" / "解构内容" log_dir = base_dir / "output" / account_name / "推导日志" / post_id / log_id result_dir = base_dir / "output" / account_name / "整体推导结果" visualize_dir = base_dir / "output" / account_name / "整体推导路径可视化" deconstruct_path = input_dir / f"{post_id}.json" topic_points = parse_topic_points_from_deconstruct(deconstruct_path) derivations, evals = load_derivation_logs(log_dir) if not derivations or not evals: raise ValueError(f"推导或评估数据为空: {log_dir}") # 2.1 整体推导结果 derivation_result = build_derivation_result(topic_points, derivations, evals) result_dir.mkdir(parents=True, exist_ok=True) result_path = result_dir / f"{post_id}.json" with open(result_path, "w", encoding="utf-8") as f: json.dump(derivation_result, f, ensure_ascii=False, indent=4) print(f"已写入整体推导结果: {result_path}") # 2.2 整体推导路径可视化 node_list, edge_list = build_visualize_edges(derivations, evals, topic_points) visualize_path = visualize_dir / f"{post_id}.json" visualize_dir.mkdir(parents=True, exist_ok=True) with open(visualize_path, "w", encoding="utf-8") as f: json.dump({"node_list": node_list, "edge_list": edge_list}, f, ensure_ascii=False, indent=4) print(f"已写入整体推导路径可视化: {visualize_path}") def main(account_name, post_id, log_id): # parser = argparse.ArgumentParser(description="生成推导可视化数据") # parser.add_argument("account_name", help="账号名,如 家有大志") # parser.add_argument("post_id", help="帖子 ID") # parser.add_argument("log_id", help="推导日志 ID,如 20260303204232") # parser.add_argument("--base-dir", type=Path, default=None, help="项目根目录,默认为本脚本所在目录") # args = parser.parse_args() generate_visualize_data(account_name=account_name, post_id=post_id, log_id=log_id) if __name__ == "__main__": account_name="家有大志" post_id = "68fb6a5c000000000302e5de" log_id="20260304161832" main(account_name, post_id, log_id)