#!/usr/bin/env python3 """ 生成推导可视化数据。 输入参数:account_name, post_id, log_id - 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表 - 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取推导与评估 JSON,生成: 1. output/{account_name}/整体推导结果/{post_id}.json 2. output/{account_name}/整体推导路径可视化/{post_id}.json """ import argparse import json import re from pathlib import Path from typing import Any, Dict, List, Optional def _walk_tree_children_for_persona( children: Any, persona_by_name: Dict[str, Dict[str, Any]] ) -> None: """递归遍历人设树 children,按节点名(与 input_tree_nodes 短名一致)登记 type / 常量标记。""" if not isinstance(children, dict): return for name, node in children.items(): if not isinstance(node, dict): continue if name not in persona_by_name: persona_by_name[name] = { "name": name, "type": node.get("_type"), "is_constant": bool(node.get("_is_constant", False)), "is_local_constant": bool(node.get("_is_local_constant", False)), } sub = node.get("children") if isinstance(sub, dict): _walk_tree_children_for_persona(sub, persona_by_name) def build_persona_by_name_from_tree_dir(tree_dir: Path) -> Dict[str, Dict[str, Any]]: """ 从 input/{account}/处理后数据/tree 下所有人设树 JSON(如 *_point_tree_how.json)构建 name -> 人设节点信息。 同名节点以首次出现为准,与 process_pipeline_tree_data.build_persona_by_name 用法一致。 """ persona_by_name: Dict[str, Dict[str, Any]] = {} if not tree_dir.is_dir(): return persona_by_name for path in sorted(tree_dir.glob("*_point_tree_how.json")): with open(path, "r", encoding="utf-8") as f: data = json.load(f) if not isinstance(data, dict): continue for _dim, root in data.items(): if not isinstance(root, dict): continue ch = root.get("children") _walk_tree_children_for_persona(ch, persona_by_name) return persona_by_name def _node_obj_for_used_tree( name: str, node: Optional[Dict[str, Any]], persona: Optional[Dict[str, Any]], ) -> Dict[str, Any]: """与 process_pipeline_tree_data._node_obj 一致:合并人设与 edge 上节点字段。""" type_val = None is_constant = False is_local_constant = False if persona is not None: type_val = persona.get("type") if "is_constant" in persona: is_constant = bool(persona["is_constant"]) if "is_local_constant" in persona: is_local_constant = bool(persona["is_local_constant"]) if node is not None: t = node.get("type") if t is not None and len(t) > 0: type_val = t if "is_constant" in node: is_constant = bool(node["is_constant"]) if "is_local_constant" in node: is_local_constant = bool(node["is_local_constant"]) return { "name": name, "type": type_val, "is_constant": is_constant, "is_local_constant": is_local_constant, } def _dedup_node_objs(nodes: List[Dict[str, Any]]) -> List[Dict[str, Any]]: seen = set() out = [] for n in nodes: key = (n["name"], n.get("type"), n["is_constant"], n["is_local_constant"]) if key not in seen: seen.add(key) out.append(n) return out def extract_used_tree_nodes_from_edge( edge: Dict[str, Any], persona_by_name: Dict[str, Dict[str, Any]], ) -> List[Dict[str, Any]]: """与 process_pipeline_tree_data.extract_used_tree_nodes_from_edge 一致。""" used: List[Dict[str, Any]] = [] for node in edge.get("input_tree_nodes") or []: name = node.get("name") if name is None or name == "": continue persona = persona_by_name.get(name) used.append(_node_obj_for_used_tree(name, node, persona)) for pn in edge.get("input_pattern_nodes") or []: for item in pn.get("match_items") or []: if item is None or item == "": continue persona = persona_by_name.get(item) used.append(_node_obj_for_used_tree(item, None, persona)) return _dedup_node_objs(used) def enrich_visualize_with_used_tree_nodes( data: Dict[str, Any], persona_by_name: Dict[str, Dict[str, Any]], ) -> Dict[str, Any]: """ 为 edge_list 每条 edge 增加 used_tree_nodes,顶层增加 all_used_tree_nodes(与 process_pipeline 一致)。 """ edge_list = data.get("edge_list") if not edge_list: data["all_used_tree_nodes"] = [] return data all_used: List[Dict[str, Any]] = [] for edge in edge_list: used = extract_used_tree_nodes_from_edge(edge, persona_by_name) edge["used_tree_nodes"] = used all_used.extend(used) data["all_used_tree_nodes"] = _dedup_node_objs(all_used) return data def _collect_dimension_names(point_data: dict) -> dict[str, str]: """从点的 实质/形式/意图 中收集 名称 -> dimension。""" name_to_dim = {} if "实质" in point_data and point_data["实质"]: for key in ("具体元素", "具象概念", "抽象概念"): for item in (point_data["实质"].get(key) or []): n = item.get("名称") if n: name_to_dim[n] = "实质" if "形式" in point_data and point_data["形式"]: for key in ("具体元素形式", "具象概念形式", "整体形式"): for item in (point_data["形式"].get(key) or []): n = item.get("名称") if n: name_to_dim[n] = "形式" if point_data.get("意图"): for item in point_data["意图"]: n = item.get("名称") if n: name_to_dim[n] = "意图" return name_to_dim def parse_topic_points_from_deconstruct(deconstruct_path: Path) -> list[dict[str, Any]]: """ 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表。 - 新格式(Agent):灵感点/目的点/关键点 下为「选题点」「选题点元素」(元素名称、元素类型)。 - 旧格式:「点」「分词结果」中的「词」等。 输出字段:name, point, dimension, root_source, root_sources_desc。 """ if not deconstruct_path.exists(): raise FileNotFoundError(f"解构内容文件不存在: {deconstruct_path}") with open(deconstruct_path, "r", encoding="utf-8") as f: data = json.load(f) result_agent: list[dict[str, Any]] = [] for point_type in ("灵感点", "目的点", "关键点"): for point in data.get(point_type) or []: if not isinstance(point, dict): continue root_source = (point.get("选题点") or point.get("点") or "").strip() root_sources_desc = point.get("选题点描述") or point.get("点描述") or "" for el in point.get("选题点元素") or []: if not isinstance(el, dict): continue name = (el.get("元素名称") or "").strip() if not name: continue et = el.get("元素类型") or "实质" if et not in ("实质", "形式", "意图"): et = "实质" result_agent.append( { "name": name, "point": point_type, "dimension": et, "root_source": root_source, "root_sources_desc": root_sources_desc, } ) if result_agent: return result_agent result = [] for point_type in ("灵感点", "目的点", "关键点"): for point in data.get(point_type) or []: root_source = point.get("点", "") root_sources_desc = point.get("点描述", "") name_to_dim = _collect_dimension_names(point) for word_item in point.get("分词结果") or []: name = word_item.get("词", "").strip() if not name: continue dimension = name_to_dim.get(name, "实质") result.append({ "name": name, "point": point_type, "dimension": dimension, "root_source": root_source, "root_sources_desc": root_sources_desc, }) return result def _topic_point_key(t: dict) -> tuple: return (t["name"], t["point"], t["dimension"]) def load_derivation_logs(log_dir: Path) -> tuple[list[dict], list[dict]]: """ 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取所有 {轮次}_推导.json 与 {轮次}_评估.json。 返回 (推导列表按轮次序, 评估列表按轮次序)。 """ if not log_dir.is_dir(): raise FileNotFoundError(f"推导日志目录不存在: {log_dir}") derivation_by_round = {} eval_by_round = {} for p in log_dir.glob("*.json"): base = p.stem m = re.match(r"^(\d+)_(推导|评估)$", base) if not m: continue round_num = int(m.group(1)) with open(p, "r", encoding="utf-8") as f: content = json.load(f) if m.group(2) == "推导": derivation_by_round[round_num] = content else: eval_by_round[round_num] = content rounds = sorted(set(derivation_by_round) | set(eval_by_round)) derivations = [derivation_by_round[r] for r in rounds if r in derivation_by_round] evals = [eval_by_round[r] for r in rounds if r in eval_by_round] return derivations, evals def build_derivation_result( topic_points: list[dict], derivations: list[dict], evals: list[dict], ) -> list[dict]: """ 生成整体推导结果:每轮 轮次、推导成功的选题点、未推导成功的选题点、本次新推导成功的选题点。 选题点用 topic_points 中的完整信息;按 name 判定是否被推导(评估中的 match_post_point)。 若之前推导成功的选题点 is_fully_derived=false,本轮变为 is_fully_derived=true,则算本次新推导成功的选题点, 且 matched_score、is_fully_derived 在本轮后更新为该轮评估值。 推导成功的选题点:使用当前已更新的 best (matched_score, is_fully_derived)。 本次新推导成功的选题点:用当轮评估的 matched_score、is_fully_derived。 未推导成功的选题点:不包含 matched_score、is_fully_derived。 """ all_keys = {_topic_point_key(t) for t in topic_points} topic_by_key = {_topic_point_key(t): t for t in topic_points} # 分轮次收集 (round_num, name) -> (matched_score, is_fully_derived),同一轮同名保留 matched_score 最高的 score_by_round_name: dict[tuple[int, str], tuple[float, bool]] = {} for round_idx, eval_data in enumerate(evals): round_num = eval_data.get("round", round_idx + 1) for er in eval_data.get("eval_results") or []: if not (er.get("is_matched") is True or er.get("match_result") == "匹配"): continue mp = (er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "").strip() if not mp: continue score = er.get("matched_score") if score is None: score = 1.0 else: try: score = float(score) except (TypeError, ValueError): score = 1.0 is_fully = er.get("is_fully_derived", True) key = (round_num, mp) if key not in score_by_round_name or score > score_by_round_name[key][0]: score_by_round_name[key] = (score, bool(is_fully)) result = [] derived_names_so_far: set[str] = set() fully_derived_names_so_far: set[str] = set() # 已出现过 is_fully_derived=true 的选题点 # name -> (matched_score, is_fully_derived),一旦 is_fully_derived=True,后续轮次不再更新 matched_score best_score_by_name: dict[str, tuple[float, bool]] = {} for i, (derivation, eval_data) in enumerate(zip(derivations, evals)): round_num = derivation.get("round", i + 1) eval_results = eval_data.get("eval_results") or [] matched_post_points = set() for er in eval_results: if not (er.get("is_matched") is True or er.get("match_result") == "匹配"): continue mp = er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "" if mp and str(mp).strip(): matched_post_points.add(str(mp).strip()) # 本轮每个匹配名的 (score, is_fully) this_round_scores: dict[str, tuple[float, bool]] = {} for name in matched_post_points: val = score_by_round_name.get((round_num, name)) if val is not None: this_round_scores[name] = val # 本次新推导成功:首次匹配 或 之前 is_fully=false 且本轮 is_fully=true new_derived_names = set() for name in matched_post_points: score, is_fully = this_round_scores.get(name, (None, False)) if name not in derived_names_so_far: new_derived_names.add(name) elif name not in fully_derived_names_so_far and is_fully: new_derived_names.add(name) # 更新推导集合与 best: # - 首次出现时写入 # - 若尚未 fully 且本轮 fully,则更新为 fully,并锁定,不再被后续轮次覆盖 # - 若尚未 fully 且本轮仍为部分推导,可用更高分数更新 derived_names_so_far |= matched_post_points for name in matched_post_points: val = this_round_scores.get(name) if val is None: continue score, is_fully = val if name not in best_score_by_name: best_score_by_name[name] = (score, is_fully) else: prev_score, prev_fully = best_score_by_name[name] # 已经 fully 的节点,后续轮次不再更新 matched_score if prev_fully: pass else: if is_fully: best_score_by_name[name] = (score, True) else: # 都是部分推导时,可以用更高分覆盖 if score > prev_score: best_score_by_name[name] = (score, False) if is_fully: fully_derived_names_so_far.add(name) derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in derived_names_so_far} new_derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in new_derived_names} not_derived_keys = all_keys - derived_keys sort_derived = sorted(derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2])) sort_new = sorted(new_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2])) sort_not = sorted(not_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2])) def add_score_fields(keys: set, sort_keys: list, round_for_score: int | None) -> list[dict]: """round_for_score: 用该轮评估的分数;若为 None 则不添加 score 字段。""" out = [] for k in sort_keys: if k not in keys: continue obj = dict(topic_by_key[k]) if round_for_score is not None: name = obj.get("name", "") val = score_by_round_name.get((round_for_score, name)) if val is not None: obj["matched_score"] = val[0] obj["is_fully_derived"] = val[1] else: obj["matched_score"] = None obj["is_fully_derived"] = False out.append(obj) return out # 推导成功的选题点:用当前已更新的 best (matched_score, is_fully_derived) derived_list = [] for k in sort_derived: if k not in derived_keys: continue obj = dict(topic_by_key[k]) name = obj.get("name", "") val = best_score_by_name.get(name) if val is not None: obj["matched_score"] = val[0] obj["is_fully_derived"] = val[1] else: obj["matched_score"] = None obj["is_fully_derived"] = False derived_list.append(obj) new_list = add_score_fields(new_derived_keys, sort_new, round_for_score=round_num) not_derived_list = [dict(topic_by_key[k]) for k in sort_not] # 不带 matched_score、is_fully_derived result.append({ "轮次": round_num, "推导成功的选题点": derived_list, "未推导成功的选题点": not_derived_list, "本次新推导成功的选题点": new_list, }) return result def _tree_node_display_name(raw: str) -> str: """人设节点可能是 a.b.c 路径形式,实际需要的是最后一段节点名 c。""" s = (raw or "").strip() if "." in s: return s.rsplit(".", 1)[-1].strip() or s return s def _to_tree_node(name: str, extra: dict | None = None) -> dict: d = {"name": name} if extra: d.update(extra) return d def _to_pattern_node(pattern_name: str) -> dict: """将 pattern 字符串转为 input_pattern_nodes 的一项(简化版)。""" items = [x.strip() for x in pattern_name.replace("+", " ").split() if x.strip()] return { "items": [{"name": x, "point": "关键点", "dimension": "形式", "type": "标签"} for x in items], "match_items": items, } def build_visualize_edges( derivations: list[dict], evals: list[dict], topic_points: list[dict], ) -> tuple[list[dict], list[dict]]: """ 生成 node_list(所有评估通过的帖子选题点)和 edge_list(只保留评估通过的推导路径)。 - node_list:同一轮内节点不重复,重复时保留 matched_score 更高的;节点带 matched_score、is_fully_derived。 - edge_list:边带 level(与 output 节点 level 一致);同一轮内 output 节点不重复;若前面轮次该节点匹配分更高则本轮不保留该节点。 评估数据支持 path_id(对应推导 derivation_results[].id)、derivation_output_point(与推导 output 中字符串对齐)、matched_score、is_fully_derived;不按 item_id 对齐。 """ derivations = sorted(derivations, key=lambda d: d.get("round", 0)) evals = sorted(evals, key=lambda e: e.get("round", 0)) topic_by_name = {t["name"]: t for t in topic_points} # 评估匹配:(round_num, path_id, derivation_output_point) -> (matched_post_point, matched_reason, matched_score, is_fully_derived) match_by_path_out: dict[tuple[int, int, str], tuple[str, str, float, bool]] = {} match_by_round_output: dict[tuple[int, str], tuple[str, str, float, bool]] = {} # 兼容无 path_id 的旧数据 for round_idx, eval_data in enumerate(evals): round_num = eval_data.get("round", round_idx + 1) for er in eval_data.get("eval_results") or []: if not (er.get("is_matched") is True or er.get("match_result") == "匹配"): continue mp = (er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or "").strip() if not mp: continue out_point = (er.get("derivation_output_point") or "").strip() reason = (er.get("matched_reason") or er.get("match_reason") or "").strip() score = er.get("matched_score") if score is None: score = 1.0 else: try: score = float(score) except (TypeError, ValueError): score = 1.0 is_fully = er.get("is_fully_derived", True) val = (mp, reason, score, bool(is_fully)) path_id = er.get("path_id") if path_id is not None and out_point: try: match_by_path_out[(round_num, int(path_id), out_point)] = val except (TypeError, ValueError): pass if out_point: k = (round_num, out_point) if k not in match_by_round_output: match_by_round_output[k] = val def get_match(round_num: int, path_id: int | None, out_item: str) -> tuple[str, str, float, bool] | None: out_item = (out_item or "").strip() if not out_item: return None if path_id is not None: v = match_by_path_out.get((round_num, path_id, out_item)) if v is not None: return v return match_by_round_output.get((round_num, out_item)) # 第一遍:按 (round_num, mp) 聚合节点最佳信息(不考虑边是否最终保留) # (round_num, mp) -> (score, is_fully_derived, derivation_output_point, method) best_node_info_by_round_mp: dict[tuple[int, str], tuple[float, bool, str, str]] = {} for round_idx, derivation in enumerate(derivations): round_num = derivation.get("round", round_idx + 1) for dr in derivation.get("derivation_results") or []: output_list = dr.get("output") or [] path_id = dr.get("id") for out_item in output_list: v = get_match(round_num, path_id, out_item) if not v: continue mp, _reason, score, is_fully = v key = (round_num, mp) prev = best_node_info_by_round_mp.get(key) if prev is None or score > prev[0]: best_node_info_by_round_mp[key] = (score, bool(is_fully), out_item, dr.get("method", "")) edge_list = [] round_output_seen: set[tuple[int, str]] = set() # (round_num, node_name) 本轮已作为某边的 output prev_best_by_node: dict[str, tuple[float, bool]] = {} # node_name -> (score, is_fully) of last included round for round_idx, derivation in enumerate(derivations): round_num = derivation.get("round", round_idx + 1) for dr in derivation.get("derivation_results") or []: output_list = dr.get("output") or [] path_id = dr.get("id") matched: list[tuple[str, str, float, bool, str]] = [] # (mp, reason, score, is_fully, derivation_out) for out_item in output_list: v = get_match(round_num, path_id, out_item) if not v: continue mp, reason, score, is_fully = v matched.append((mp, reason, score, is_fully, out_item)) if not matched: continue # 同一轮内 output 节点不重复;若前面轮次该节点已完全推导,或分数未提升且未从 false 变 true,则本轮跳过; # 并且只保留与 node_list 中该轮该节点的最高分记录一致的边 output_names_this_edge = [] for mp, reason, score, is_fully, out_item in matched: if (round_num, mp) in round_output_seen: continue prev = prev_best_by_node.get(mp) if prev is not None: prev_score, prev_fully = prev if prev_fully: continue if not is_fully and score <= prev_score: continue best_info = best_node_info_by_round_mp.get((round_num, mp)) if not best_info or score < best_info[0]: continue output_names_this_edge.append((mp, reason, score, is_fully, out_item)) if not output_names_this_edge: continue for mp, _r, score, is_fully, _o in output_names_this_edge: round_output_seen.add((round_num, mp)) prev = prev_best_by_node.get(mp) if prev is None or (not prev[1] and (is_fully or score > prev[0])): prev_best_by_node[mp] = (score, is_fully) input_data = dr.get("input") or {} derived_nodes = input_data.get("derived_nodes") or [] tree_nodes = input_data.get("tree_nodes") or [] patterns = input_data.get("patterns") or [] input_post_nodes = [{"name": x} for x in derived_nodes] input_tree_nodes = [_to_tree_node(_tree_node_display_name(x)) for x in tree_nodes] if patterns and isinstance(patterns[0], str): input_pattern_nodes = [_to_pattern_node(p) for p in patterns] elif patterns and isinstance(patterns[0], dict): input_pattern_nodes = patterns else: input_pattern_nodes = [] output_nodes = [] reasons_list = [] compare_detail_list = [] for mp, reason, score, is_fully, out_item in output_names_this_edge: output_nodes.append({"name": mp, "matched_score": score, "is_fully_derived": is_fully}) reasons_list.append(reason) compare_detail_list.append( f"待比对推导选题点:{out_item} -> 帖子选题点:{mp} ({score})" ) detail = { "reason": dr.get("reason", ""), "评估结果": "匹配成功", } if any(reasons_list): detail["匹配理由"] = reasons_list detail["比对详情"] = compare_detail_list if dr.get("tools"): detail["tools"] = dr["tools"] edge_list.append({ "name": dr.get("method", "") or f"推导-{round_num}", "level": round_num, "input_post_nodes": input_post_nodes, "input_tree_nodes": input_tree_nodes, "input_pattern_nodes": input_pattern_nodes, "output_nodes": output_nodes, "detail": detail, }) # 根据按 (round, mp) 聚合后的最佳信息生成 node_list # 规则:节点首次出现保留;is_fully_derived 从 false 变 true 时保留; # is_fully_derived=false 且分数高于之前已保留轮次时保留;其余情况跳过 prev_node_best: dict[str, tuple[float, bool]] = {} # mp -> (score, is_fully) of last included round node_list: list[dict] = [] for (round_num, mp), (score, is_fully, out_item, method) in sorted( best_node_info_by_round_mp.items(), key=lambda x: (x[0][0], x[0][1]) ): prev = prev_node_best.get(mp) if prev is None: should_include = True else: prev_score, prev_fully = prev if prev_fully: should_include = False elif is_fully: should_include = True elif score > prev_score: should_include = True else: should_include = False if not should_include: continue prev_node_best[mp] = (score, is_fully) base = dict(topic_by_name.get(mp, {"name": mp, "point": "", "dimension": "", "root_source": "", "root_sources_desc": ""})) base["level"] = round_num base.setdefault("original_word", base.get("name", mp)) base["derivation_type"] = method base["matched_score"] = score base["is_fully_derived"] = is_fully base["derivation_output_point"] = out_item node_list.append(base) node_list.sort(key=lambda n: (n.get("level", 0), str(n.get("name", "")))) return node_list, edge_list def _find_project_root() -> Path: """从脚本所在目录向上查找包含 .git 的项目根目录。""" p = Path(__file__).resolve().parent while p != p.parent: if (p / ".git").is_dir(): return p p = p.parent return Path(__file__).resolve().parent def generate_visualize_data(account_name: str, post_id: str, log_id: str, base_dir: Path | None = None) -> None: """ 主流程:读取解构内容与推导日志,生成整体推导结果与整体推导路径可视化两个 JSON。 base_dir 默认为脚本所在目录;若其下 output/.../推导日志 不存在,则尝试项目根目录下的 output/...(兼容从项目根运行)。 """ if base_dir is None: base_dir = Path(__file__).resolve().parent input_dir = base_dir / "input" / account_name / "原始数据" / "解构内容" log_dir = base_dir / "output" / account_name / "推导日志" / post_id / log_id result_dir = base_dir / "output" / account_name / "整体推导结果" visualize_dir = base_dir / "output" / account_name / "整体推导路径可视化" # 兼容:若推导日志不在 base_dir 下,尝试项目根目录下的 output/ if not log_dir.is_dir(): project_root = _find_project_root() if project_root != base_dir: alt_log = project_root / "output" / account_name / "推导日志" / post_id / log_id if alt_log.is_dir(): log_dir = alt_log result_dir = project_root / "output" / account_name / "整体推导结果" visualize_dir = project_root / "output" / account_name / "整体推导路径可视化" deconstruct_path = input_dir / f"{post_id}.json" topic_points = parse_topic_points_from_deconstruct(deconstruct_path) derivations, evals = load_derivation_logs(log_dir) if not derivations or not evals: raise ValueError(f"推导或评估数据为空: {log_dir}") # 2.1 整体推导结果 derivation_result = build_derivation_result(topic_points, derivations, evals) result_dir.mkdir(parents=True, exist_ok=True) result_path = result_dir / f"{post_id}.json" with open(result_path, "w", encoding="utf-8") as f: json.dump(derivation_result, f, ensure_ascii=False, indent=4) print(f"已写入整体推导结果: {result_path}") # 2.2 整体推导路径可视化(人设节点补全:used_tree_nodes / all_used_tree_nodes,数据来自处理后数据/tree 人设树) node_list, edge_list = build_visualize_edges(derivations, evals, topic_points) tree_dir = base_dir / "input" / account_name / "处理后数据" / "tree" persona_by_name = build_persona_by_name_from_tree_dir(tree_dir) if persona_by_name: print( f"已加载人设树节点: {len(persona_by_name)} 个(目录: {tree_dir.name})" ) else: print( f"警告: 未从人设树目录加载到节点(请确认存在 *_point_tree_how.json): {tree_dir}" ) visualize_payload: Dict[str, Any] = {"node_list": node_list, "edge_list": edge_list} enrich_visualize_with_used_tree_nodes(visualize_payload, persona_by_name) visualize_path = visualize_dir / f"{post_id}.json" visualize_dir.mkdir(parents=True, exist_ok=True) with open(visualize_path, "w", encoding="utf-8") as f: json.dump(visualize_payload, f, ensure_ascii=False, indent=4) print(f"已写入整体推导路径可视化: {visualize_path}") def main(account_name, post_id, log_id): # parser = argparse.ArgumentParser(description="生成推导可视化数据") # parser.add_argument("account_name", help="账号名,如 家有大志") # parser.add_argument("post_id", help="帖子 ID") # parser.add_argument("log_id", help="推导日志 ID,如 20260303204232") # parser.add_argument("--base-dir", type=Path, default=None, help="项目根目录,默认为本脚本所在目录") # args = parser.parse_args() generate_visualize_data(account_name=account_name, post_id=post_id, log_id=log_id) if __name__ == "__main__": from tools.pattern_dimension_analyze import main as pattern_dimension_analyze_main # account_name="阿里多多酱" # items = [ # {"post_id": "6915dfc400000000070224d9", "log_id": "20260322135142"}, # {"post_id":"69002ba70000000007008bcc","log_id":"20260322213934"}, # ] # account_name="摸鱼阿希" # items = [ # {"post_id": "68ae91ce000000001d016b8b", "log_id": "20260322202416"}, # {"post_id":"689c63ac000000001d015119","log_id":"20260322203119"}, # ] # account_name = "每天心理学" # items = [ # {"post_id": "6949df27000000001d03e0e9", "log_id": "20260322205512"}, # {"post_id": "6951c718000000001e0105b7", "log_id": "20260322211126"}, # ] account_name = "空间点阵设计研究室" items = [ {"post_id": "687ee6fc000000001c032bb1", "log_id": "20260322211748"}, {"post_id": "68843a4d000000001c037591", "log_id": "20260322213024"}, ] for item in items: post_id = item["post_id"] log_id = item["log_id"] main(account_name, post_id, log_id) pattern_dimension_analyze_main(account_name, post_id, log_id)