#!/usr/bin/env python3 """ 生成推导可视化数据。 输入参数:account_name, post_id, log_id - 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表 - 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取推导与评估 JSON,生成: 1. output/{account_name}/整体推导结果/{post_id}.json 2. output/{account_name}/整体推导路径可视化/{post_id}.json """ import argparse import json import re from pathlib import Path from typing import Any def _collect_dimension_names(point_data: dict) -> dict[str, str]: """从点的 实质/形式/意图 中收集 名称 -> dimension。""" name_to_dim = {} if "实质" in point_data and point_data["实质"]: for key in ("具体元素", "具象概念", "抽象概念"): for item in (point_data["实质"].get(key) or []): n = item.get("名称") if n: name_to_dim[n] = "实质" if "形式" in point_data and point_data["形式"]: for key in ("具体元素形式", "具象概念形式", "整体形式"): for item in (point_data["形式"].get(key) or []): n = item.get("名称") if n: name_to_dim[n] = "形式" if point_data.get("意图"): for item in point_data["意图"]: n = item.get("名称") if n: name_to_dim[n] = "意图" return name_to_dim def parse_topic_points_from_deconstruct(deconstruct_path: Path) -> list[dict[str, Any]]: """ 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表。 选题点来自分词结果中的「词」,字段:name, point, dimension, root_source, root_sources_desc。 """ if not deconstruct_path.exists(): raise FileNotFoundError(f"解构内容文件不存在: {deconstruct_path}") with open(deconstruct_path, "r", encoding="utf-8") as f: data = json.load(f) result = [] for point_type in ("灵感点", "目的点", "关键点"): for point in data.get(point_type) or []: root_source = point.get("点", "") root_sources_desc = point.get("点描述", "") name_to_dim = _collect_dimension_names(point) for word_item in point.get("分词结果") or []: name = word_item.get("词", "").strip() if not name: continue dimension = name_to_dim.get(name, "实质") result.append({ "name": name, "point": point_type, "dimension": dimension, "root_source": root_source, "root_sources_desc": root_sources_desc, }) return result def _topic_point_key(t: dict) -> tuple: return (t["name"], t["point"], t["dimension"]) def load_derivation_logs(log_dir: Path) -> tuple[list[dict], list[dict]]: """ 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取所有 {轮次}_推导.json 与 {轮次}_评估.json。 返回 (推导列表按轮次序, 评估列表按轮次序)。 """ if not log_dir.is_dir(): raise FileNotFoundError(f"推导日志目录不存在: {log_dir}") derivation_by_round = {} eval_by_round = {} for p in log_dir.glob("*.json"): base = p.stem m = re.match(r"^(\d+)_(推导|评估)$", base) if not m: continue round_num = int(m.group(1)) with open(p, "r", encoding="utf-8") as f: content = json.load(f) if m.group(2) == "推导": derivation_by_round[round_num] = content else: eval_by_round[round_num] = content rounds = sorted(set(derivation_by_round) | set(eval_by_round)) derivations = [derivation_by_round[r] for r in rounds if r in derivation_by_round] evals = [eval_by_round[r] for r in rounds if r in eval_by_round] return derivations, evals def build_derivation_result( topic_points: list[dict], derivations: list[dict], evals: list[dict], ) -> list[dict]: """ 生成整体推导结果:每轮 轮次、推导成功的选题点、未推导成功的选题点、本次新推导成功的选题点。 选题点用 topic_points 中的完整信息;按 name 判定是否被推导(评估中的 match_post_point)。 """ all_keys = {_topic_point_key(t) for t in topic_points} topic_by_key = {_topic_point_key(t): t for t in topic_points} result = [] derived_names_so_far: set[str] = set() for i, (derivation, eval_data) in enumerate(zip(derivations, evals)): round_num = derivation.get("round", i + 1) eval_results = eval_data.get("eval_results") or [] matched_post_points = set() for er in eval_results: # 新格式: is_matched;旧格式: match_result == "匹配" if not (er.get("is_matched") is True or er.get("match_result") == "匹配"): continue mp = er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "" if mp and str(mp).strip(): matched_post_points.add(str(mp).strip()) new_derived_names = matched_post_points - derived_names_so_far derived_names_so_far |= matched_post_points # 推导成功的选题点:name 在 derived_names_so_far 中的选题点(每 name 取一条,与 topic_points 顺序一致) derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in derived_names_so_far} new_derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in new_derived_names} not_derived_keys = all_keys - derived_keys derived_list = [dict(topic_by_key[k]) for k in sorted(derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))] new_list = [dict(topic_by_key[k]) for k in sorted(new_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))] not_derived_list = [dict(topic_by_key[k]) for k in sorted(not_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))] result.append({ "轮次": round_num, "推导成功的选题点": derived_list, "未推导成功的选题点": not_derived_list, "本次新推导成功的选题点": new_list, }) return result def _tree_node_display_name(raw: str) -> str: """人设节点可能是 a.b.c 路径形式,实际需要的是最后一段节点名 c。""" s = (raw or "").strip() if "." in s: return s.rsplit(".", 1)[-1].strip() or s return s def _to_tree_node(name: str, extra: dict | None = None) -> dict: d = {"name": name} if extra: d.update(extra) return d def _to_pattern_node(pattern_name: str) -> dict: """将 pattern 字符串转为 input_pattern_nodes 的一项(简化版)。""" items = [x.strip() for x in pattern_name.replace("+", " ").split() if x.strip()] return { "items": [{"name": x, "point": "关键点", "dimension": "形式", "type": "标签"} for x in items], "match_items": items, } def build_visualize_edges( derivations: list[dict], evals: list[dict], topic_points: list[dict], ) -> tuple[list[dict], list[dict]]: """ 生成 node_list(所有评估通过的帖子选题点)和 edge_list(只保留评估通过的推导路径)。 按轮次从小到大处理,保证每个输出节点最多只出现在一条边的 output_nodes 里,且保留的是前面轮次的数据。 """ # 按轮次从小到大排序,确保优先使用前面轮次的输出节点 derivations = sorted(derivations, key=lambda d: d.get("round", 0)) evals = sorted(evals, key=lambda e: e.get("round", 0)) topic_by_name = {} for t in topic_points: name = t["name"] if name not in topic_by_name: topic_by_name[name] = t # 不依赖 id,仅用 (round, derivation_output_point) 与推导的 output 节点名匹配关联评估结果 # key=(round_num, derivation_output_point) -> (matched_post_point, matched_reason);同轮同节点取首次匹配 match_by_round_output: dict[tuple[int, str], tuple[str, str]] = {} for round_idx, eval_data in enumerate(evals): round_num = eval_data.get("round", round_idx + 1) for er in eval_data.get("eval_results") or []: if not (er.get("is_matched") is True or er.get("match_result") == "匹配"): continue out_point = (er.get("derivation_output_point") or "").strip() mp = er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "" matched_reason = er.get("matched_reason") or er.get("match_reason") or "" if out_point and mp and str(mp).strip(): mp = str(mp).strip() k = (round_num, out_point) if k not in match_by_round_output: match_by_round_output[k] = (mp, str(matched_reason).strip() if matched_reason else "") node_list = [] seen_nodes = set() edge_list = [] level_by_name = {} output_nodes_seen: set[str] = set() # 已在之前边的 output_nodes 中出现过的节点,避免同一输出节点对应多条边 for round_idx, derivation in enumerate(derivations): round_num = derivation.get("round", round_idx + 1) for dr in derivation.get("derivation_results") or []: output_list = dr.get("output") or [] matched_outputs = [] matched_reasons = [] matched_derivation_outputs = [] for out_item in output_list: key = (round_num, out_item) pair = match_by_round_output.get(key) if not pair: continue mp, reason = pair matched_outputs.append(mp) matched_reasons.append(reason) matched_derivation_outputs.append(out_item) if mp not in seen_nodes: seen_nodes.add(mp) node = dict(topic_by_name.get(mp, {"name": mp, "point": "", "dimension": "", "root_source": "", "root_sources_desc": ""})) node["level"] = round_num if "original_word" not in node: node["original_word"] = node.get("name", mp) node["derivation_type"] = dr.get("method", "") level_by_name[mp] = round_num node_list.append(node) if not matched_outputs: continue # 只保留尚未在之前边的 output_nodes 中出现过的节点,避免同一输出节点对应多条边 output_names_this_edge = [x for x in matched_outputs if x not in output_nodes_seen] if not output_names_this_edge: continue output_nodes_seen.update(output_names_this_edge) input_data = dr.get("input") or {} derived_nodes = input_data.get("derived_nodes") or [] tree_nodes = input_data.get("tree_nodes") or [] patterns = input_data.get("patterns") or [] input_post_nodes = [{"name": x} for x in derived_nodes] input_tree_nodes = [_to_tree_node(_tree_node_display_name(x)) for x in tree_nodes] if patterns and isinstance(patterns[0], str): input_pattern_nodes = [_to_pattern_node(p) for p in patterns] elif patterns and isinstance(patterns[0], dict): input_pattern_nodes = patterns else: input_pattern_nodes = [] output_nodes = [{"name": x} for x in output_names_this_edge] # 与 output_names_this_edge 顺序对应的匹配理由、推导输出节点名 mp_to_reason = dict(zip(matched_outputs, matched_reasons)) mp_to_derivation_out = dict(zip(matched_outputs, matched_derivation_outputs)) reason_for_this_edge = [mp_to_reason.get(name, "") for name in output_names_this_edge] derivation_points_this_edge = [mp_to_derivation_out.get(name, "") for name in output_names_this_edge] detail = { "reason": dr.get("reason", ""), "评估结果": "匹配成功", } if any(reason_for_this_edge): detail["匹配理由"] = reason_for_this_edge detail["待比对的推导选题点"] = derivation_points_this_edge if dr.get("tools"): detail["tools"] = dr["tools"] edge_list.append({ "name": dr.get("method", "") or f"推导-{round_num}", "input_post_nodes": input_post_nodes, "input_tree_nodes": input_tree_nodes, "input_pattern_nodes": input_pattern_nodes, "output_nodes": output_nodes, "detail": detail, }) return node_list, edge_list def generate_visualize_data(account_name: str, post_id: str, log_id: str, base_dir: Path | None = None) -> None: """ 主流程:读取解构内容与推导日志,生成整体推导结果与整体推导路径可视化两个 JSON。 """ if base_dir is None: base_dir = Path(__file__).resolve().parent input_dir = base_dir / "input" / account_name / "原始数据" / "解构内容" log_dir = base_dir / "output" / account_name / "推导日志" / post_id / log_id result_dir = base_dir / "output" / account_name / "整体推导结果" visualize_dir = base_dir / "output" / account_name / "整体推导路径可视化" deconstruct_path = input_dir / f"{post_id}.json" topic_points = parse_topic_points_from_deconstruct(deconstruct_path) derivations, evals = load_derivation_logs(log_dir) if not derivations or not evals: raise ValueError(f"推导或评估数据为空: {log_dir}") # 2.1 整体推导结果 derivation_result = build_derivation_result(topic_points, derivations, evals) result_dir.mkdir(parents=True, exist_ok=True) result_path = result_dir / f"{post_id}.json" with open(result_path, "w", encoding="utf-8") as f: json.dump(derivation_result, f, ensure_ascii=False, indent=4) print(f"已写入整体推导结果: {result_path}") # 2.2 整体推导路径可视化 node_list, edge_list = build_visualize_edges(derivations, evals, topic_points) visualize_path = visualize_dir / f"{post_id}.json" visualize_dir.mkdir(parents=True, exist_ok=True) with open(visualize_path, "w", encoding="utf-8") as f: json.dump({"node_list": node_list, "edge_list": edge_list}, f, ensure_ascii=False, indent=4) print(f"已写入整体推导路径可视化: {visualize_path}") def main(account_name, post_id, log_id): # parser = argparse.ArgumentParser(description="生成推导可视化数据") # parser.add_argument("account_name", help="账号名,如 家有大志") # parser.add_argument("post_id", help="帖子 ID") # parser.add_argument("log_id", help="推导日志 ID,如 20260303204232") # parser.add_argument("--base-dir", type=Path, default=None, help="项目根目录,默认为本脚本所在目录") # args = parser.parse_args() generate_visualize_data(account_name=account_name, post_id=post_id, log_id=log_id) if __name__ == "__main__": account_name="家有大志" post_id = "68fb6a5c000000000302e5de" log_id="20260309010119" main(account_name, post_id, log_id)