| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345 |
- #!/usr/bin/env python3
- """
- 生成推导可视化数据。
- 输入参数:account_name, post_id, log_id
- - 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表
- - 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取推导与评估 JSON,生成:
- 1. output/{account_name}/整体推导结果/{post_id}.json
- 2. output/{account_name}/整体推导路径可视化/{post_id}.json
- """
- import argparse
- import json
- import re
- from pathlib import Path
- from typing import Any
- def _collect_dimension_names(point_data: dict) -> dict[str, str]:
- """从点的 实质/形式/意图 中收集 名称 -> dimension。"""
- name_to_dim = {}
- if "实质" in point_data and point_data["实质"]:
- for key in ("具体元素", "具象概念", "抽象概念"):
- for item in (point_data["实质"].get(key) or []):
- n = item.get("名称")
- if n:
- name_to_dim[n] = "实质"
- if "形式" in point_data and point_data["形式"]:
- for key in ("具体元素形式", "具象概念形式", "整体形式"):
- for item in (point_data["形式"].get(key) or []):
- n = item.get("名称")
- if n:
- name_to_dim[n] = "形式"
- if point_data.get("意图"):
- for item in point_data["意图"]:
- n = item.get("名称")
- if n:
- name_to_dim[n] = "意图"
- return name_to_dim
- def parse_topic_points_from_deconstruct(deconstruct_path: Path) -> list[dict[str, Any]]:
- """
- 从 input/{account_name}/解构内容/{post_id}.json 解析选题点列表。
- 选题点来自分词结果中的「词」,字段:name, point, dimension, root_source, root_sources_desc。
- """
- if not deconstruct_path.exists():
- raise FileNotFoundError(f"解构内容文件不存在: {deconstruct_path}")
- with open(deconstruct_path, "r", encoding="utf-8") as f:
- data = json.load(f)
- result = []
- for point_type in ("灵感点", "目的点", "关键点"):
- for point in data.get(point_type) or []:
- root_source = point.get("点", "")
- root_sources_desc = point.get("点描述", "")
- name_to_dim = _collect_dimension_names(point)
- for word_item in point.get("分词结果") or []:
- name = word_item.get("词", "").strip()
- if not name:
- continue
- dimension = name_to_dim.get(name, "实质")
- result.append({
- "name": name,
- "point": point_type,
- "dimension": dimension,
- "root_source": root_source,
- "root_sources_desc": root_sources_desc,
- })
- return result
- def _topic_point_key(t: dict) -> tuple:
- return (t["name"], t["point"], t["dimension"])
- def load_derivation_logs(log_dir: Path) -> tuple[list[dict], list[dict]]:
- """
- 从 output/{account_name}/推导日志/{post_id}/{log_id}/ 读取所有 {轮次}_推导.json 与 {轮次}_评估.json。
- 返回 (推导列表按轮次序, 评估列表按轮次序)。
- """
- if not log_dir.is_dir():
- raise FileNotFoundError(f"推导日志目录不存在: {log_dir}")
- derivation_by_round = {}
- eval_by_round = {}
- for p in log_dir.glob("*.json"):
- base = p.stem
- m = re.match(r"^(\d+)_(推导|评估)$", base)
- if not m:
- continue
- round_num = int(m.group(1))
- with open(p, "r", encoding="utf-8") as f:
- content = json.load(f)
- if m.group(2) == "推导":
- derivation_by_round[round_num] = content
- else:
- eval_by_round[round_num] = content
- rounds = sorted(set(derivation_by_round) | set(eval_by_round))
- derivations = [derivation_by_round[r] for r in rounds if r in derivation_by_round]
- evals = [eval_by_round[r] for r in rounds if r in eval_by_round]
- return derivations, evals
- def build_derivation_result(
- topic_points: list[dict],
- derivations: list[dict],
- evals: list[dict],
- ) -> list[dict]:
- """
- 生成整体推导结果:每轮 轮次、推导成功的选题点、未推导成功的选题点、本次新推导成功的选题点。
- 选题点用 topic_points 中的完整信息;按 name 判定是否被推导(评估中的 match_post_point)。
- """
- all_keys = {_topic_point_key(t) for t in topic_points}
- topic_by_key = {_topic_point_key(t): t for t in topic_points}
- result = []
- derived_names_so_far: set[str] = set()
- for i, (derivation, eval_data) in enumerate(zip(derivations, evals)):
- round_num = derivation.get("round", i + 1)
- eval_results = eval_data.get("eval_results") or []
- matched_post_points = set()
- for er in eval_results:
- # 新格式: is_matched;旧格式: match_result == "匹配"
- if not (er.get("is_matched") is True or er.get("match_result") == "匹配"):
- continue
- mp = er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or ""
- if mp and str(mp).strip():
- matched_post_points.add(str(mp).strip())
- new_derived_names = matched_post_points - derived_names_so_far
- derived_names_so_far |= matched_post_points
- # 推导成功的选题点:name 在 derived_names_so_far 中的选题点(每 name 取一条,与 topic_points 顺序一致)
- derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in derived_names_so_far}
- new_derived_keys = {k for k in all_keys if topic_by_key[k]["name"] in new_derived_names}
- not_derived_keys = all_keys - derived_keys
- derived_list = [dict(topic_by_key[k]) for k in sorted(derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
- new_list = [dict(topic_by_key[k]) for k in sorted(new_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
- not_derived_list = [dict(topic_by_key[k]) for k in sorted(not_derived_keys, key=lambda k: (topic_by_key[k]["name"], k[1], k[2]))]
- result.append({
- "轮次": round_num,
- "推导成功的选题点": derived_list,
- "未推导成功的选题点": not_derived_list,
- "本次新推导成功的选题点": new_list,
- })
- return result
- def _tree_node_display_name(raw: str) -> str:
- """人设节点可能是 a.b.c 路径形式,实际需要的是最后一段节点名 c。"""
- s = (raw or "").strip()
- if "." in s:
- return s.rsplit(".", 1)[-1].strip() or s
- return s
- def _to_tree_node(name: str, extra: dict | None = None) -> dict:
- d = {"name": name}
- if extra:
- d.update(extra)
- return d
- def _to_pattern_node(pattern_name: str) -> dict:
- """将 pattern 字符串转为 input_pattern_nodes 的一项(简化版)。"""
- items = [x.strip() for x in pattern_name.replace("+", " ").split() if x.strip()]
- return {
- "items": [{"name": x, "point": "关键点", "dimension": "形式", "type": "标签"} for x in items],
- "match_items": items,
- }
- def build_visualize_edges(
- derivations: list[dict],
- evals: list[dict],
- topic_points: list[dict],
- ) -> tuple[list[dict], list[dict]]:
- """
- 生成 node_list(所有评估通过的帖子选题点)和 edge_list(只保留评估通过的推导路径)。
- 按轮次从小到大处理,保证每个输出节点最多只出现在一条边的 output_nodes 里,且保留的是前面轮次的数据。
- """
- # 按轮次从小到大排序,确保优先使用前面轮次的输出节点
- derivations = sorted(derivations, key=lambda d: d.get("round", 0))
- evals = sorted(evals, key=lambda e: e.get("round", 0))
- topic_by_name = {}
- for t in topic_points:
- name = t["name"]
- if name not in topic_by_name:
- topic_by_name[name] = t
- # 不依赖 id,仅用 (round, derivation_output_point) 与推导的 output 节点名匹配关联评估结果
- # key=(round_num, derivation_output_point) -> (matched_post_point, matched_reason);同轮同节点取首次匹配
- match_by_round_output: dict[tuple[int, str], tuple[str, str]] = {}
- for round_idx, eval_data in enumerate(evals):
- round_num = eval_data.get("round", round_idx + 1)
- for er in eval_data.get("eval_results") or []:
- if not (er.get("is_matched") is True or er.get("match_result") == "匹配"):
- continue
- out_point = (er.get("derivation_output_point") or "").strip()
- mp = er.get("matched_post_point") or er.get("matched_post_topic") or er.get("match_post_point") or ""
- matched_reason = er.get("matched_reason") or er.get("match_reason") or ""
- if out_point and mp and str(mp).strip():
- mp = str(mp).strip()
- k = (round_num, out_point)
- if k not in match_by_round_output:
- match_by_round_output[k] = (mp, str(matched_reason).strip() if matched_reason else "")
- node_list = []
- seen_nodes = set()
- edge_list = []
- level_by_name = {}
- output_nodes_seen: set[str] = set() # 已在之前边的 output_nodes 中出现过的节点,避免同一输出节点对应多条边
- for round_idx, derivation in enumerate(derivations):
- round_num = derivation.get("round", round_idx + 1)
- for dr in derivation.get("derivation_results") or []:
- output_list = dr.get("output") or []
- matched_outputs = []
- matched_reasons = []
- matched_derivation_outputs = []
- for out_item in output_list:
- key = (round_num, out_item)
- pair = match_by_round_output.get(key)
- if not pair:
- continue
- mp, reason = pair
- matched_outputs.append(mp)
- matched_reasons.append(reason)
- matched_derivation_outputs.append(out_item)
- if mp not in seen_nodes:
- seen_nodes.add(mp)
- node = dict(topic_by_name.get(mp, {"name": mp, "point": "", "dimension": "", "root_source": "", "root_sources_desc": ""}))
- node["level"] = round_num
- if "original_word" not in node:
- node["original_word"] = node.get("name", mp)
- node["derivation_type"] = dr.get("method", "")
- level_by_name[mp] = round_num
- node_list.append(node)
- if not matched_outputs:
- continue
- # 只保留尚未在之前边的 output_nodes 中出现过的节点,避免同一输出节点对应多条边
- output_names_this_edge = [x for x in matched_outputs if x not in output_nodes_seen]
- if not output_names_this_edge:
- continue
- output_nodes_seen.update(output_names_this_edge)
- input_data = dr.get("input") or {}
- derived_nodes = input_data.get("derived_nodes") or []
- tree_nodes = input_data.get("tree_nodes") or []
- patterns = input_data.get("patterns") or []
- input_post_nodes = [{"name": x} for x in derived_nodes]
- input_tree_nodes = [_to_tree_node(_tree_node_display_name(x)) for x in tree_nodes]
- if patterns and isinstance(patterns[0], str):
- input_pattern_nodes = [_to_pattern_node(p) for p in patterns]
- elif patterns and isinstance(patterns[0], dict):
- input_pattern_nodes = patterns
- else:
- input_pattern_nodes = []
- output_nodes = [{"name": x} for x in output_names_this_edge]
- # 与 output_names_this_edge 顺序对应的匹配理由、推导输出节点名
- mp_to_reason = dict(zip(matched_outputs, matched_reasons))
- mp_to_derivation_out = dict(zip(matched_outputs, matched_derivation_outputs))
- reason_for_this_edge = [mp_to_reason.get(name, "") for name in output_names_this_edge]
- derivation_points_this_edge = [mp_to_derivation_out.get(name, "") for name in output_names_this_edge]
- detail = {
- "reason": dr.get("reason", ""),
- "评估结果": "匹配成功",
- }
- if any(reason_for_this_edge):
- detail["匹配理由"] = reason_for_this_edge
- detail["待比对的推导选题点"] = derivation_points_this_edge
- if dr.get("tools"):
- detail["tools"] = dr["tools"]
- edge_list.append({
- "name": dr.get("method", "") or f"推导-{round_num}",
- "input_post_nodes": input_post_nodes,
- "input_tree_nodes": input_tree_nodes,
- "input_pattern_nodes": input_pattern_nodes,
- "output_nodes": output_nodes,
- "detail": detail,
- })
- return node_list, edge_list
- def generate_visualize_data(account_name: str, post_id: str, log_id: str, base_dir: Path | None = None) -> None:
- """
- 主流程:读取解构内容与推导日志,生成整体推导结果与整体推导路径可视化两个 JSON。
- """
- if base_dir is None:
- base_dir = Path(__file__).resolve().parent
- input_dir = base_dir / "input" / account_name / "原始数据" / "解构内容"
- log_dir = base_dir / "output" / account_name / "推导日志" / post_id / log_id
- result_dir = base_dir / "output" / account_name / "整体推导结果"
- visualize_dir = base_dir / "output" / account_name / "整体推导路径可视化"
- deconstruct_path = input_dir / f"{post_id}.json"
- topic_points = parse_topic_points_from_deconstruct(deconstruct_path)
- derivations, evals = load_derivation_logs(log_dir)
- if not derivations or not evals:
- raise ValueError(f"推导或评估数据为空: {log_dir}")
- # 2.1 整体推导结果
- derivation_result = build_derivation_result(topic_points, derivations, evals)
- result_dir.mkdir(parents=True, exist_ok=True)
- result_path = result_dir / f"{post_id}.json"
- with open(result_path, "w", encoding="utf-8") as f:
- json.dump(derivation_result, f, ensure_ascii=False, indent=4)
- print(f"已写入整体推导结果: {result_path}")
- # 2.2 整体推导路径可视化
- node_list, edge_list = build_visualize_edges(derivations, evals, topic_points)
- visualize_path = visualize_dir / f"{post_id}.json"
- visualize_dir.mkdir(parents=True, exist_ok=True)
- with open(visualize_path, "w", encoding="utf-8") as f:
- json.dump({"node_list": node_list, "edge_list": edge_list}, f, ensure_ascii=False, indent=4)
- print(f"已写入整体推导路径可视化: {visualize_path}")
- def main(account_name, post_id, log_id):
- # parser = argparse.ArgumentParser(description="生成推导可视化数据")
- # parser.add_argument("account_name", help="账号名,如 家有大志")
- # parser.add_argument("post_id", help="帖子 ID")
- # parser.add_argument("log_id", help="推导日志 ID,如 20260303204232")
- # parser.add_argument("--base-dir", type=Path, default=None, help="项目根目录,默认为本脚本所在目录")
- # args = parser.parse_args()
- generate_visualize_data(account_name=account_name, post_id=post_id, log_id=log_id)
- if __name__ == "__main__":
- account_name="家有大志"
- post_id = "68fb6a5c000000000302e5de"
- log_id="20260309010119"
- main(account_name, post_id, log_id)
|