#!/usr/bin/env python3 """ 生成推导可视化数据。 输入参数:account_name, post_id, log_id - 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表 - 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取推导与评估 JSON,生成: 1. output/{account_name}/整体推导结果/{post_id}.json 2. output/{account_name}/整体推导路径可视化/{post_id}.json """ import argparse import json import re from pathlib import Path from typing import Any def _collect_dimension_names(point_data: dict) -> dict[str, str]: """从点的 实质/形式/意图 中收集 名称 -> dimension。""" name_to_dim = {} if "实质" in point_data and point_data["实质"]: for key in ("具体元素", "具象概念", "抽象概念"): for item in (point_data["实质"].get(key) or []): n = item.get("名称") if n: name_to_dim[n] = "实质" if "形式" in point_data and point_data["形式"]: for key in ("具体元素形式", "具象概念形式", "整体形式"): for item in (point_data["形式"].get(key) or []): n = item.get("名称") if n: name_to_dim[n] = "形式" if point_data.get("意图"): for item in point_data["意图"]: n = item.get("名称") if n: name_to_dim[n] = "意图" return name_to_dim def parse_topic_points_from_deconstruct(deconstruct_path: Path) -> list[dict[str, Any]]: """ 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表。 选题点来自分词结果中的「词」,字段:name, point, dimension, root_source, root_sources_desc。 """ if not deconstruct_path.exists(): raise FileNotFoundError(f"解构内容文件不存在: {deconstruct_path}") with open(deconstruct_path, "r", encoding="utf-8") as f: data = json.load(f) result = [] for point_type in ("灵感点", "目的点", "关键点"): for point in data.get(point_type) or []: root_source = point.get("点", "") root_sources_desc = point.get("点描述", "") name_to_dim = _collect_dimension_names(point) for word_item in point.get("分词结果") or []: name = word_item.get("词", "").strip() if not name: continue dimension = name_to_dim.get(name, "实质") result.append({ "name": name, "point": point_type, "dimension": dimension, "root_source": root_source, "root_sources_desc": root_sources_desc, }) return result def _topic_point_key(t: dict) -> tuple: return (t["name"], t["point"], t["dimension"]) def load_derivation_logs(log_dir: Path) -> tuple[list[dict], list[dict]]: """ 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取所有 {轮次}_推导.json 与 {轮次}_评估.json。 返回 (推导列表按轮次序, 评估列表按轮次序)。 """ if not log_dir.is_dir(): raise FileNotFoundError(f"推导日志目录不存在: {log_dir}") derivation_by_round = {} eval_by_round = {} for p in log_dir.glob("*.json"): base = p.stem m = re.match(r"^(\d+)_(推导|评估)$", base) if not m: continue round_num = int(m.group(1)) with open(p, "r", encoding="utf-8") as f: content = json.load(f) if m.group(2) == "推导": derivation_by_round[round_num] = content else: eval_by_round[round_num] = content rounds = sorted(set(derivation_by_round) | set(eval_by_round)) derivations = [derivation_by_round[r] for r in rounds if r in derivation_by_round] evals = [eval_by_round[r] for r in rounds if r in eval_by_round] return derivations, evals def build_derivation_result( topic_points: list[dict], derivations: list[dict], evals: list[dict], ) -> list[dict]: """ 生成整体推导结果:每轮 轮次、推导成功的选题点、未推导成功的选题点、本次新推导成功的选题点。 选题点用 topic_points 中的完整信息;按 name 判定是否被推导(评估中的 match_post_point)。 若之前推导成功的选题点 is_fully_derived=false,本轮变为 is_fully_derived=true,则算本次新推导成功的选题点, 且 matched_score、is_fully_derived 在本轮后更新为该轮评估值。 推导成功的选题点:使用当前已更新的 best (matched_score, is_fully_derived)。 本次新推导成功的选题点:用当轮评估的 matched_score、is_fully_derived。 未推导成功的选题点:不包含 matched_score、is_fully_derived。 """ all_keys = {_topic_point_key(t) for t in topic_points} topic_by_key = {_topic_point_key(t): t for t in topic_points} # 分轮次收集 (round_num, name) -> (matched_score, is_fully_derived),同一轮同名取首次出现 score_by_round_name: dict[tuple[int, str], tuple[float, bool]] = {} for round_idx, eval_data in enumerate(evals): round_num = eval_data.get("round", round_idx + 1) for er in eval_data.get("eval_results") or []: if not (er.get("is_matched") is True or er.get("match_result") == "匹配"): continue mp = (er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "").strip() if not mp: continue key = (round_num, mp) if key in score_by_round_name: continue score = er.get("matched_score") if score is None: score = 1.0 else: try: score = float(score) except (TypeError, ValueError): score = 1.0 is_fully = er.get("is_fully_derived", True) score_by_round_name[key] = (score, bool(is_fully)) result = [] derived_names_so_far: set[str] = set() fully_derived_names_so_far: set[str] = set() # 已出现过 is_fully_derived=true 的选题点 best_score_by_name: dict[str, tuple[float, bool]] = {} # name -> (matched_score, is_fully_derived),遇 is_fully=true 时更新 for i, (derivation, eval_data) in enumerate(zip(derivations, evals)): round_num = derivation.get("round", i + 1) eval_results = eval_data.get("eval_results") or [] matched_post_points = set() for er in eval_results: if not (er.get("is_matched") is True or er.get("match_result") == "匹配"): continue mp = er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "" if mp and str(mp).strip(): matched_post_points.add(str(mp).strip()) # 本轮每个匹配名的 (score, is_fully) this_round_scores: dict[str, tuple[float, bool]] = {} for name in matched_post_points: val = score_by_round_name.get((round_num, name)) if val is not None: this_round_scores[name] = val # 本次新推导成功:首次匹配 或 之前 is_fully=false 且本轮 is_fully=true new_derived_names = set() for name in matched_post_points: score, is_fully = this_round_scores.get(name, (None, False)) if name not in derived_names_so_far: new_derived_names.add(name) elif name not in fully_derived_names_so_far and is_fully: new_derived_names.add(name) # 更新推导集合与 best:首次出现或本轮 is_fully=true 时更新 best derived_names_so_far |= matched_post_points for name in matched_post_points: val = this_round_scores.get(name) if val is None: continue score, is_fully = val if name not in best_score_by_name: best_score_by_name[name] = (score, is_fully) elif is_fully: best_score_by_name[name] = (score, is_fully) if is_fully: fully_derived_names_so_far.add(name) derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in derived_names_so_far} new_derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in new_derived_names} not_derived_keys = all_keys - derived_keys sort_derived = sorted(derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2])) sort_new = sorted(new_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2])) sort_not = sorted(not_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2])) def add_score_fields(keys: set, sort_keys: list, round_for_score: int | None) -> list[dict]: """round_for_score: 用该轮评估的分数;若为 None 则不添加 score 字段。""" out = [] for k in sort_keys: if k not in keys: continue obj = dict(topic_by_key[k]) if round_for_score is not None: name = obj.get("name", "") val = score_by_round_name.get((round_for_score, name)) if val is not None: obj["matched_score"] = val[0] obj["is_fully_derived"] = val[1] else: obj["matched_score"] = None obj["is_fully_derived"] = False out.append(obj) return out # 推导成功的选题点:用当前已更新的 best (matched_score, is_fully_derived) derived_list = [] for k in sort_derived: if k not in derived_keys: continue obj = dict(topic_by_key[k]) name = obj.get("name", "") val = best_score_by_name.get(name) if val is not None: obj["matched_score"] = val[0] obj["is_fully_derived"] = val[1] else: obj["matched_score"] = None obj["is_fully_derived"] = False derived_list.append(obj) new_list = add_score_fields(new_derived_keys, sort_new, round_for_score=round_num) not_derived_list = [dict(topic_by_key[k]) for k in sort_not] # 不带 matched_score、is_fully_derived result.append({ "轮次": round_num, "推导成功的选题点": derived_list, "未推导成功的选题点": not_derived_list, "本次新推导成功的选题点": new_list, }) return result def _tree_node_display_name(raw: str) -> str: """人设节点可能是 a.b.c 路径形式,实际需要的是最后一段节点名 c。""" s = (raw or "").strip() if "." in s: return s.rsplit(".", 1)[-1].strip() or s return s def _to_tree_node(name: str, extra: dict | None = None) -> dict: d = {"name": name} if extra: d.update(extra) return d def _to_pattern_node(pattern_name: str) -> dict: """将 pattern 字符串转为 input_pattern_nodes 的一项(简化版)。""" items = [x.strip() for x in pattern_name.replace("+", " ").split() if x.strip()] return { "items": [{"name": x, "point": "关键点", "dimension": "形式", "type": "标签"} for x in items], "match_items": items, } def build_visualize_edges( derivations: list[dict], evals: list[dict], topic_points: list[dict], ) -> tuple[list[dict], list[dict]]: """ 生成 node_list(所有评估通过的帖子选题点)和 edge_list(只保留评估通过的推导路径)。 - node_list:同一轮内节点不重复,重复时保留 matched_score 更高的;节点带 matched_score、is_fully_derived。 - edge_list:边带 level(与 output 节点 level 一致);同一轮内 output 节点不重复;若前面轮次该节点匹配分更高则本轮不保留该节点。 评估数据支持 path_id(对应推导 derivation_results[].id)、item_id(output 中元素从 1 起的序号)、matched_score、is_fully_derived。 """ derivations = sorted(derivations, key=lambda d: d.get("round", 0)) evals = sorted(evals, key=lambda e: e.get("round", 0)) topic_by_name = {t["name"]: t for t in topic_points} # 评估匹配:(round_num, path_id, item_id) -> (matched_post_point, matched_reason, matched_score, is_fully_derived) # path_id = 推导中 derivation_results[].id,item_id = output 中元素从 1 起的序号 match_by_path_item: dict[tuple[int, int, int], tuple[str, str, float, bool]] = {} match_by_round_output: dict[tuple[int, str], tuple[str, str, float, bool]] = {} # 兼容无 path_id/item_id for round_idx, eval_data in enumerate(evals): round_num = eval_data.get("round", round_idx + 1) for er in eval_data.get("eval_results") or []: if not (er.get("is_matched") is True or er.get("match_result") == "匹配"): continue mp = (er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "").strip() if not mp: continue out_point = (er.get("derivation_output_point") or "").strip() reason = (er.get("matched_reason") or er.get("match_reason") or "").strip() score = er.get("matched_score") if score is None: score = 1.0 else: try: score = float(score) except (TypeError, ValueError): score = 1.0 is_fully = er.get("is_fully_derived", True) val = (mp, reason, score, bool(is_fully)) path_id = er.get("path_id") item_id = er.get("item_id") if path_id is not None and item_id is not None: try: match_by_path_item[(round_num, int(path_id), int(item_id))] = val except (TypeError, ValueError): pass if out_point: k = (round_num, out_point) if k not in match_by_round_output: match_by_round_output[k] = val # 按 (round_num, mp) 收集节点候选,同轮同节点保留 matched_score 最高的一条 node_candidates: dict[tuple[int, str], dict] = {} # (round_num, mp) -> node_dict (含 score, is_fully_derived) def get_match(round_num: int, path_id: int | None, item_id: int | None, out_item: str) -> tuple[str, str, float, bool] | None: if path_id is not None and item_id is not None: v = match_by_path_item.get((round_num, path_id, item_id)) if v is not None: return v return match_by_round_output.get((round_num, out_item)) edge_list = [] round_output_seen: set[tuple[int, str]] = set() # (round_num, node_name) 本轮已作为某边的 output best_score_by_node: dict[str, float] = {} # node_name -> 已出现过的最高 matched_score for round_idx, derivation in enumerate(derivations): round_num = derivation.get("round", round_idx + 1) for dr in derivation.get("derivation_results") or []: output_list = dr.get("output") or [] path_id = dr.get("id") matched: list[tuple[str, str, float, bool, str]] = [] # (mp, reason, score, is_fully, derivation_out) for i, out_item in enumerate(output_list): item_id = i + 1 v = get_match(round_num, path_id, item_id, out_item) if not v: continue mp, reason, score, is_fully = v matched.append((mp, reason, score, is_fully, out_item)) if not matched: continue # 同一轮内 output 节点不重复;若前面轮次该节点匹配分更高则本轮不保留 output_names_this_edge = [] for mp, reason, score, is_fully, out_item in matched: if (round_num, mp) in round_output_seen: continue if score <= best_score_by_node.get(mp, -1.0): continue output_names_this_edge.append((mp, reason, score, is_fully, out_item)) if not output_names_this_edge: continue for mp, _r, score, _f, _o in output_names_this_edge: round_output_seen.add((round_num, mp)) best_score_by_node[mp] = max(best_score_by_node.get(mp, -1.0), score) # 节点候选:同轮同节点保留匹配分更高的 for mp, _reason, score, is_fully, _out_item in output_names_this_edge: key = (round_num, mp) if key not in node_candidates or node_candidates[key].get("matched_score", 0) < score: node = dict(topic_by_name.get(mp, {"name": mp, "point": "", "dimension": "", "root_source": "", "root_sources_desc": ""})) node["level"] = round_num node.setdefault("original_word", node.get("name", mp)) node["derivation_type"] = dr.get("method", "") node["matched_score"] = score node["is_fully_derived"] = is_fully node_candidates[key] = node input_data = dr.get("input") or {} derived_nodes = input_data.get("derived_nodes") or [] tree_nodes = input_data.get("tree_nodes") or [] patterns = input_data.get("patterns") or [] input_post_nodes = [{"name": x} for x in derived_nodes] input_tree_nodes = [_to_tree_node(_tree_node_display_name(x)) for x in tree_nodes] if patterns and isinstance(patterns[0], str): input_pattern_nodes = [_to_pattern_node(p) for p in patterns] elif patterns and isinstance(patterns[0], dict): input_pattern_nodes = patterns else: input_pattern_nodes = [] output_nodes = [] reasons_list = [] derivation_points_list = [] for mp, reason, score, is_fully, out_item in output_names_this_edge: output_nodes.append({"name": mp, "matched_score": score, "is_fully_derived": is_fully}) reasons_list.append(reason) derivation_points_list.append(out_item) detail = { "reason": dr.get("reason", ""), "评估结果": "匹配成功", } if any(reasons_list): detail["匹配理由"] = reasons_list detail["待比对的推导选题点"] = derivation_points_list if dr.get("tools"): detail["tools"] = dr["tools"] edge_list.append({ "name": dr.get("method", "") or f"推导-{round_num}", "level": round_num, "input_post_nodes": input_post_nodes, "input_tree_nodes": input_tree_nodes, "input_pattern_nodes": input_pattern_nodes, "output_nodes": output_nodes, "detail": detail, }) node_list = list(node_candidates.values()) return node_list, edge_list def _find_project_root() -> Path: """从脚本所在目录向上查找包含 .git 的项目根目录。""" p = Path(__file__).resolve().parent while p != p.parent: if (p / ".git").is_dir(): return p p = p.parent return Path(__file__).resolve().parent def generate_visualize_data(account_name: str, post_id: str, log_id: str, base_dir: Path | None = None) -> None: """ 主流程:读取解构内容与推导日志,生成整体推导结果与整体推导路径可视化两个 JSON。 base_dir 默认为脚本所在目录;若其下 output/.../推导日志 不存在,则尝试项目根目录下的 output/...(兼容从项目根运行)。 """ if base_dir is None: base_dir = Path(__file__).resolve().parent input_dir = base_dir / "input" / account_name / "原始数据" / "解构内容" log_dir = base_dir / "output" / account_name / "推导日志" / post_id / log_id result_dir = base_dir / "output" / account_name / "整体推导结果" visualize_dir = base_dir / "output" / account_name / "整体推导路径可视化" # 兼容:若推导日志不在 base_dir 下,尝试项目根目录下的 output/ if not log_dir.is_dir(): project_root = _find_project_root() if project_root != base_dir: alt_log = project_root / "output" / account_name / "推导日志" / post_id / log_id if alt_log.is_dir(): log_dir = alt_log result_dir = project_root / "output" / account_name / "整体推导结果" visualize_dir = project_root / "output" / account_name / "整体推导路径可视化" deconstruct_path = input_dir / f"{post_id}.json" topic_points = parse_topic_points_from_deconstruct(deconstruct_path) derivations, evals = load_derivation_logs(log_dir) if not derivations or not evals: raise ValueError(f"推导或评估数据为空: {log_dir}") # 2.1 整体推导结果 derivation_result = build_derivation_result(topic_points, derivations, evals) result_dir.mkdir(parents=True, exist_ok=True) result_path = result_dir / f"{post_id}.json" with open(result_path, "w", encoding="utf-8") as f: json.dump(derivation_result, f, ensure_ascii=False, indent=4) print(f"已写入整体推导结果: {result_path}") # 2.2 整体推导路径可视化 node_list, edge_list = build_visualize_edges(derivations, evals, topic_points) visualize_path = visualize_dir / f"{post_id}.json" visualize_dir.mkdir(parents=True, exist_ok=True) with open(visualize_path, "w", encoding="utf-8") as f: json.dump({"node_list": node_list, "edge_list": edge_list}, f, ensure_ascii=False, indent=4) print(f"已写入整体推导路径可视化: {visualize_path}") def main(account_name, post_id, log_id): # parser = argparse.ArgumentParser(description="生成推导可视化数据") # parser.add_argument("account_name", help="账号名,如 家有大志") # parser.add_argument("post_id", help="帖子 ID") # parser.add_argument("log_id", help="推导日志 ID,如 20260303204232") # parser.add_argument("--base-dir", type=Path, default=None, help="项目根目录,默认为本脚本所在目录") # args = parser.parse_args() generate_visualize_data(account_name=account_name, post_id=post_id, log_id=log_id) if __name__ == "__main__": account_name="家有大志" post_id = "68fb6a5c000000000302e5de" log_id="20260310220945" main(account_name, post_id, log_id)