""" Pattern 维度分析 Tool 功能概述: 1. 读取某次整体推导日志目录下各轮评估结果,累计 matched_post_point / derivation_output_point 等字段。 2. 每轮通过 derivation_output_point 在人设树中找到 cluster_level 层祖先节点(已推导维度节点集合)。 3. 从 deduped_patterns 中筛选包含已推导维度节点的 pattern,并对各元素标记是否已推导。 输入参数: - account_name: 账号名称 - post_id: 帖子 ID - log_id: 推导日志目录名(形如 20260313210921) - cluster_level: 在人设树中查找祖先节点的目标深度(root 为 0 层) """ import json import sys from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Set # 保证直接运行或作为包加载时都能解析 utils / tools(IDE 可跳转) _root = Path(__file__).resolve().parent.parent if str(_root) not in sys.path: sys.path.insert(0, str(_root)) from tools.find_tree_node import _load_trees # 加载三棵人设树 _BASE_INPUT = Path(__file__).resolve().parent.parent / "input" _BASE_OUTPUT = Path(__file__).resolve().parent.parent / "output" # pattern 库 key 定义(与 find_pattern 中保持一致) TOP_KEYS = [ "depth_4", ] SUB_KEYS = ["two_x", "one_x", "zero_x"] # --------------------------------------------------------------------------- # 1. 读取推导日志:按轮次累计 matched_post_point # --------------------------------------------------------------------------- def _round_eval_dir(account_name: str, post_id: str, log_id: str) -> Path: """ 推导日志目录: ../output/{account_name}/推导日志/{post_id}/{log_id}/ """ return _BASE_OUTPUT / account_name / "推导日志" / post_id / log_id def _load_round_matched_points( account_name: str, post_id: str, log_id: str, ) -> List[Dict[str, Any]]: """ 读取指定日志目录下所有 {轮次}.评估.json,按轮次排序,生成: [ { "round": 1, "round_points": [ { "matched_post_point": "叙事结构", "derivation_output_point": "叙事编排", "matched_score": 0.9151, "is_fully_derived": true, }, ... ], "cumulative_points": [ ... 累计到本轮的去重列表(以 derivation_output_point 为去重 key) ... ], }, ... ] """ base_dir = _round_eval_dir(account_name, post_id, log_id) if not base_dir.is_dir(): return [] eval_files: List[Tuple[int, Path]] = [] for p in base_dir.glob("*.json"): name = p.name # 只处理 *_评估.json if not name.endswith("评估.json"): continue try: round_str = name.split("_", 1)[0] r = int(round_str) except Exception: continue eval_files.append((r, p)) eval_files.sort(key=lambda x: x[0]) results: List[Dict[str, Any]] = [] cumulative: List[Dict[str, Any]] = [] cumulative_set: Set[str] = set() # 以 derivation_output_point 去重 for r, path in eval_files: try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) except Exception: continue eval_results = data.get("eval_results") or [] round_points: List[Dict[str, Any]] = [] seen_in_round: Set[str] = set() for item in eval_results: if not isinstance(item, dict): continue if not item.get("is_matched"): continue dop = item.get("derivation_output_point") if dop is None: continue dop = str(dop).strip() if not dop: continue # 本轮内按 derivation_output_point 去重 if dop in seen_in_round: continue seen_in_round.add(dop) mpp = item.get("matched_post_point") entry: Dict[str, Any] = { "matched_post_point": str(mpp).strip() if mpp is not None else None, "derivation_output_point": dop, "matched_score": item.get("matched_score"), "is_fully_derived": item.get("is_fully_derived"), } round_points.append(entry) # 累加到累计列表(按 derivation_output_point 去重) for entry in round_points: dop = entry["derivation_output_point"] if dop not in cumulative_set: cumulative_set.add(dop) cumulative.append(entry) results.append( { "round": r, "round_points": round_points, "cumulative_points": list(cumulative), } ) return results # --------------------------------------------------------------------------- # 2. 读取 pattern 库并按 matched_post_point 打分 # --------------------------------------------------------------------------- def _pattern_file(account_name: str) -> Path: """pattern 库文件:../input/{account_name}/原始数据/pattern/processed_edge_data.json""" return _BASE_INPUT / account_name / "原始数据" / "pattern" / "processed_edge_data.json" def _load_raw_patterns(account_name: str) -> List[Dict[str, Any]]: """ 读取 pattern 库中所有原始 pattern(保留 items 结构,不做合并)。 返回列表中每个元素形如原始 JSON 中的 pattern(此处不关心 item 的 point / dimension 字段)。 """ path = _pattern_file(account_name) if not path.is_file(): return [] with open(path, "r", encoding="utf-8") as f: data = json.load(f) patterns: List[Dict[str, Any]] = [] for top in TOP_KEYS: block = data.get(top) if not isinstance(block, dict): continue for sub in SUB_KEYS: items = block.get(sub) or [] if isinstance(items, list): for p in items: if isinstance(p, dict): patterns.append(p) return patterns def _slim_pattern_for_dedupe(p: Dict[str, Any]) -> Tuple[float, List[str]]: """ 提取 pattern 的 support 与去重后的 item name 列表(按名称合并,不关心顺序), 用于与 find_pattern.py 中的去重逻辑对齐。 """ items = p.get("items") or [] names = [str(it.get("name") or "").strip() for it in items if isinstance(it, dict)] seen: Set[str] = set() unique: List[str] = [] for n in names: if n and n not in seen: seen.add(n) unique.append(n) try: support = float(p.get("support", 0.0)) except (TypeError, ValueError): support = 0.0 return support, unique def _dedupe_patterns(raw_patterns: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ 按 pattern 的 item name 集合去重(不区分顺序),与 find_pattern.py 的思路一致: - key 为 sorted(unique item names) - 同一个 key 仅保留 support 最大的 pattern(保留其原始 items 结构,方便后续打分) """ key_to_best: Dict[Tuple[str, ...], Dict[str, Any]] = {} key_to_support: Dict[Tuple[str, ...], float] = {} for p in raw_patterns: support, unique = _slim_pattern_for_dedupe(p) if not unique: continue key = tuple(sorted(unique)) best_support = key_to_support.get(key) if best_support is None or support > best_support: key_to_support[key] = support key_to_best[key] = p return list(key_to_best.values()) # --------------------------------------------------------------------------- # 3. 人设树节点信息 & 聚类节点搜索 # --------------------------------------------------------------------------- class TreeIndex: """ 人设树索引: - node_info: 节点 -> { "parent": 父节点名称, "children": [子节点名称...], "depth": 深度, "dimension": 维度名 } - roots: 维度名 -> 根节点名称(即维度名本身) - merged_tree: 将实质/形式/意图三棵树合并后的单个 JSON(顶层 key 为实质/形式/意图) """ def __init__(self, account_name: str) -> None: self.account_name = account_name self.node_info: Dict[str, Dict[str, Any]] = {} self.roots: Dict[str, str] = {} # 三棵树合并后的 JSON:{"实质": {...}, "形式": {...}, "意图": {...}} self.merged_tree: Dict[str, Dict[str, Any]] = {} self._build() def _build(self) -> None: trees = _load_trees(self.account_name) # 1)先将三棵树合并成一个 JSON:{"实质": {...}, "形式": {...}, "意图": {...}} merged: Dict[str, Dict[str, Any]] = {} for dim_name, root in trees: if isinstance(root, dict): merged[dim_name] = root self.merged_tree = merged # 2)基于合并后的 JSON 构建 parent/children 结构 for dim_name, root in merged.items(): root_name = dim_name self.roots[dim_name] = root_name if root_name not in self.node_info: self.node_info[root_name] = { "parent": None, "children": [], "dimension": dim_name, "depth": 0, } def walk(parent_name: str, node_dict: Dict[str, Any]): children = node_dict.get("children") or {} for name, child in children.items(): if not isinstance(child, dict): continue if name not in self.node_info: self.node_info[name] = { "parent": parent_name, "children": [], "dimension": dim_name, "depth": None, # 稍后统一计算 } else: # 仅当不会形成自引用时才更新 parent(树中可能存在同名的父子节点) if name != parent_name: self.node_info[name]["parent"] = parent_name self.node_info[name]["dimension"] = dim_name # 维护父节点的 children if parent_name not in self.node_info: self.node_info[parent_name] = { "parent": None, "children": [], "dimension": dim_name, "depth": 0, } if name not in self.node_info[parent_name]["children"]: self.node_info[parent_name]["children"].append(name) walk(name, child) walk(root_name, root) # 统一计算各节点深度(从根开始 BFS) from collections import deque q = deque() for dim_name, root_name in self.roots.items(): if root_name not in self.node_info: continue self.node_info[root_name]["depth"] = 0 q.append(root_name) while q: cur = q.popleft() cur_depth = self.node_info[cur].get("depth", 0) or 0 for child in self.node_info[cur].get("children", []): self.node_info.setdefault(child, {}) if self.node_info[child].get("depth") is None: self.node_info[child]["depth"] = cur_depth + 1 q.append(child) def find_ancestor_at_level(self, node_name: str, level: int) -> Optional[str]: """ 在人设树中找到 node_name 的 depth == level 的祖先节点。 - 若 node_name 自身 depth == level,直接返回自身。 - 若 node_name depth < level(比目标层浅),返回自身。 - 否则沿 parent 链向上查找,返回第一个 depth == level 的祖先节点。 """ info = self.node_info.get(node_name) if not info: return None depth = info.get("depth") if depth is None: return None if depth <= level: return node_name cur = node_name visited: Set[str] = set() while cur and cur not in visited: visited.add(cur) cur_info = self.node_info.get(cur) or {} cur_depth = cur_info.get("depth") or 0 if cur_depth == level: return cur if cur_depth < level: return cur parent = cur_info.get("parent") if parent is None: return cur cur = parent return None # 聚类搜索(不再区分维度) def find_clusters( self, elements: List[str], cluster_level: int, ) -> List[Dict[str, Any]]: """ 在所有人设树中,为给定元素列表寻找聚类节点(不再要求 dimension 一致)。 规则(固定聚类层级 cluster_level): - 仅在 depth == cluster_level 的节点上做聚类判断: * 若某节点子树中包含的元素数量 >= 2, 且在该路径上尚未存在更高层(深度更小)的聚类节点,则将其视为一个聚类节点。 - 对无法向上形成聚类的元素,为其寻找 depth == cluster_level 的祖先节点, 若存在则作为该元素的「单元素聚类」节点。 - 返回: [ { "cluster_node": "节点名", "from_elements": ["元素A", "元素B", ...] }, ... ] """ # 过滤出真实存在于人设树中的元素 elem_set: Set[str] = set() for e in elements: e = str(e).strip() if not e: continue info = self.node_info.get(e) if not info: continue elem_set.add(e) if not elem_set: return [] # 先计算每个节点子树中包含的元素数量(跨所有维度的根) # 注意:人设树数据中可能存在意外的环或重复引用,这里通过 visited 集合避免递归死循环。 subtree_count: Dict[str, int] = {} def dfs_count(node: str, visited: Set[str]) -> int: if node in visited: # 检测到环,直接返回 0,避免无限递归 return 0 visited.add(node) cnt = 1 if node in elem_set else 0 for ch in self.node_info.get(node, {}).get("children", []): cnt += dfs_count(ch, visited) subtree_count[node] = cnt return cnt for root_name in self.roots.values(): dfs_count(root_name, set()) # 再自上而下优先选择「更上层」聚类节点(但仅在 cluster_level 层): # - 若当前节点已作为聚类节点,则其子孙不再作为聚类节点(保证尽量向上聚类); # 同样需要防止意外的环导致递归过深,这里使用 visited 集合。 clusters: Set[str] = set() def dfs_select(node: str, ancestor_selected: bool, visited: Set[str]) -> None: if node in visited: return visited.add(node) info = self.node_info.get(node) or {} depth = info.get("depth", 0) or 0 cnt = subtree_count.get(node, 0) selected_here = False # 仅当祖先尚未被选中、当前节点位于 cluster_level 层且满足条件时,选当前节点为聚类节点 if (not ancestor_selected) and depth == cluster_level and cnt >= 2: clusters.add(node) selected_here = True # 祖先已经被选中或当前节点被选中,则子孙不再作为聚类节点 for ch in info.get("children", []): dfs_select(ch, ancestor_selected or selected_here, visited) for root_name in self.roots.values(): dfs_select(root_name, False, set()) if not clusters: return [] # 统计每个聚类节点下真实覆盖的元素列表 cluster_to_elements: Dict[str, Set[str]] = {c: set() for c in clusters} for e in elem_set: cur = e visited: Set[str] = set() while cur and cur not in visited: visited.add(cur) if cur in clusters: cluster_to_elements[cur].add(e) parent = self.node_info.get(cur, {}).get("parent") if parent is None: break cur = parent out: List[Dict[str, Any]] = [] # 1)多元素聚类:仅统计真正输出的聚类节点所覆盖的元素, # 避免把「元素数不足 2 的节点」也算作已覆盖,从而导致元素丢失。 covered_elems: Set[str] = set() for node in clusters: elems = sorted(cluster_to_elements.get(node) or []) if len(elems) < 2: # 主聚类逻辑只考虑覆盖至少 2 个元素的节点 continue out.append( { "cluster_node": node, "from_elements": elems, } ) for e in elems: covered_elems.add(e) # 2)对无法向上形成聚类的元素,给一个「单元素聚类」 uncovered = elem_set - covered_elems # 将未覆盖元素按「cluster_level 层级的祖先节点」分组,确保同一个祖先节点下的 # 多个元素合并为一个聚类,而不是多个单元素聚类。 single_clusters: Dict[str, Set[str]] = {} for e in uncovered: # 单元素聚类时,cluster_node 应为「祖先节点」,不直接使用元素自身。 # 这里固定选择 depth == cluster_level 的祖先节点。 info_e = self.node_info.get(e) or {} parent = info_e.get("parent") cur = parent best_ancestor: Optional[str] = None visited_chain: Set[str] = set() while cur and cur not in visited_chain: visited_chain.add(cur) info = self.node_info.get(cur) or {} depth = info.get("depth", 0) or 0 if depth == cluster_level: best_ancestor = cur break parent = info.get("parent") if parent is None: break cur = parent if best_ancestor: single_clusters.setdefault(best_ancestor, set()).add(e) for anc, elems in single_clusters.items(): out.append( { "cluster_node": anc, "from_elements": sorted(elems), } ) # 为了输出更稳定,按 from_elements 的元素数量从大到小排序,数量相同再按节点名排序 out.sort(key=lambda x: (-len(x["from_elements"]), x["cluster_node"])) return out # --------------------------------------------------------------------------- # 4. 对单轮数据执行 pattern & 聚类分析 # --------------------------------------------------------------------------- def _analyze_single_round( patterns: List[Dict[str, Any]], tree_index: TreeIndex, cumulative_points: List[Dict[str, Any]], cluster_level: int, ) -> Dict[str, Any]: """ 对某一轮(给定累计 point 列表)执行维度分析: 1. 从 cumulative_points 中提取 derivation_output_point, 在人设树中找到每个节点的 cluster_level 层祖先 → derived_ancestor_set(已推导维度节点集合)。 2. 从 deduped_patterns 中筛选出包含 derived_ancestor_set 中节点的 pattern。 3. 对筛选出 pattern 的每个元素标记是否已推导: - 元素在 derived_ancestor_set 中 → is_derived=True(已推导维度) - 其他 → is_derived=False(未推导维度) 4. 汇总 derived_dims / underived_dims 列表。 返回结构: { "cumulative_points": [...], # 原始累计 point 对象列表 "derived_ancestor_nodes": [...], # 所有 derivation_output_point 对应的 cluster_level 层祖先节点(已推导维度节点集合) "patterns": [...], # 筛选后带 is_derived 标记的 pattern 列表 "derived_dims": [...], # 已推导维度节点(去重,出现于筛选 pattern 中) "underived_dims": [...], # 未推导维度节点(去重,排除已推导节点) "patterns_count": int, "derived_dim_count": int, "underived_dim_count": int, } """ # 1. 收集 derived_ancestor_set,同时记录每个祖先节点对应的 matched_post_point 来源 derived_ancestor_set: Set[str] = set() ancestor_to_mpps: Dict[str, List[str]] = {} # 祖先节点 -> [matched_post_point, ...] for entry in cumulative_points: dop = entry.get("derivation_output_point") if not dop: continue ancestor = tree_index.find_ancestor_at_level(str(dop).strip(), cluster_level) if not ancestor: continue derived_ancestor_set.add(ancestor) mpp = entry.get("matched_post_point") or "" if mpp and mpp not in ancestor_to_mpps.get(ancestor, []): ancestor_to_mpps.setdefault(ancestor, []).append(mpp) # 2. 筛选 pattern:已推导维度节点占所有元素的比例 >= 50% filtered_patterns: List[Dict[str, Any]] = [] for p in patterns: items = p.get("items") or [] item_names = [ str(it.get("name") or "").strip() for it in items if isinstance(it, dict) ] if not item_names: continue if len(item_names) < 5: continue derived_count = sum(1 for name in item_names if name in derived_ancestor_set) if derived_count / len(item_names) >= 0.5: filtered_patterns.append(p) print( f"filtered_patterns: {len(filtered_patterns)}, " f"derived_ancestor_set: {len(derived_ancestor_set)}" ) def _node_label(name: str, is_derived: bool) -> str: """ 返回格式化标签: - 已推导节点:'node_name->dimension(mpp1,mpp2,...)' - 未推导节点:'node_name->dimension' """ dim = (tree_index.node_info.get(name) or {}).get("dimension") or "" base = f"{name}->{dim}" if dim else name if is_derived: mpps = ancestor_to_mpps.get(name) or [] if mpps: return f"{base}({','.join(mpps)})" return base # 3. 对筛选 pattern 元素分类并汇总维度列表 derived_dims: List[str] = [] underived_dims: List[str] = [] derived_dims_seen: Set[str] = set() underived_dims_seen: Set[str] = set() scored_patterns: List[Dict[str, Any]] = [] for p in filtered_patterns: items = p.get("items") or [] tagged_items: List[Dict[str, Any]] = [] for it in items: if not isinstance(it, dict): continue name = str(it.get("name") or "").strip() is_derived = name in derived_ancestor_set tagged_items.append( { "name": name, "is_derived": is_derived, } ) if is_derived: if name and name not in derived_dims_seen: derived_dims_seen.add(name) derived_dims.append(_node_label(name, is_derived=True)) else: if name and name not in underived_dims_seen: underived_dims_seen.add(name) underived_dims.append(_node_label(name, is_derived=False)) scored_patterns.append( { "id": p.get("id"), "support": p.get("support"), "items": tagged_items, } ) # 从 underived_dims 中排除与 derived_dims 重叠的节点 underived_dims = [d for d in underived_dims if d.split("->")[0] not in derived_dims_seen] # 按 is_derived=True 的元素数量从高到低排序,数量相同再按元素总数从高到低 scored_patterns.sort( key=lambda x: ( sum(1 for it in x.get("items", []) if it.get("is_derived")), len(x.get("items", [])), ), reverse=True, ) return { "cumulative_points": list(cumulative_points), "derived_ancestor_nodes": sorted(derived_ancestor_set), "patterns": scored_patterns, "derived_dims": derived_dims, "underived_dims": underived_dims, "patterns_count": len(scored_patterns), "derived_dim_count": len(derived_dims), "underived_dim_count": len(underived_dims), } def pattern_dimension_analyze( account_name: str, post_id: str, log_id: str, cluster_level: int = 2, ) -> Dict[str, Any]: """ Pattern 维度分析主入口。 参数 ------- account_name : 账号名(用于定位 input / output 下的数据目录) post_id : 帖子 ID(用于定位推导日志) log_id : 推导日志目录名(../output/{account_name}/推导日志/{post_id}/{log_id}/) cluster_level : 在人设树中查找祖先节点的目标深度(root 为 0 层),默认 2 逻辑概述 -------- 每一轮: 1. 从 derivation_output_point 在人设树中找到 cluster_level 层祖先节点 → 已推导维度节点集合。 2. 筛选包含已推导维度节点的 pattern。 3. 标记每个 pattern 元素是否已推导,汇总 derived_dims / underived_dims。 """ eval_dir = _round_eval_dir(account_name, post_id, log_id) if not eval_dir.is_dir(): raise FileNotFoundError(f"推导日志目录不存在: {eval_dir}") round_infos = _load_round_matched_points(account_name, post_id, log_id) if not round_infos: return { "account_name": account_name, "post_id": post_id, "log_id": log_id, "cluster_level": cluster_level, "rounds": [], "message": "未在指定日志目录下找到任何评估结果文件(*_评估.json)", } tree_index = TreeIndex(account_name) # pattern 库只在整体分析时读取 & 去重一次,避免每一轮重复 IO 与解析 raw_patterns = _load_raw_patterns(account_name) deduped_patterns = _dedupe_patterns(raw_patterns) print(f"deduped_patterns len: {len(deduped_patterns)}") rounds_output: List[Dict[str, Any]] = [] for info in round_infos: r = info["round"] cumulative_points = info["cumulative_points"] analyzed = _analyze_single_round( patterns=deduped_patterns, tree_index=tree_index, cumulative_points=cumulative_points, cluster_level=cluster_level, ) analyzed["round"] = r rounds_output.append(analyzed) return { "account_name": account_name, "post_id": post_id, "log_id": log_id, "cluster_level": cluster_level, "rounds": rounds_output, } def main(account_name, post_id, log_id) -> None: """本地简单测试:以家有大志账号的一次推导日志做分析,并将结果写入输出目录。""" result = pattern_dimension_analyze( account_name=account_name, post_id=post_id, log_id=log_id, cluster_level=3, ) # 控制台打印前 4000 字符,便于快速查看 # print(json.dumps(result, ensure_ascii=False, indent=2)[:4000] + "...") # 写入输出文件:../output/{account_name}/推导日志/{post_id}/{log_id}/pattern_dimension_analyze.json out_dir = _round_eval_dir(account_name, post_id, log_id) out_dir.mkdir(parents=True, exist_ok=True) output_file_name = f"{post_id}_pattern_dimension_analyze.json" out_path = out_dir / output_file_name with open(out_path, "w", encoding="utf-8") as f: json.dump(result, f, ensure_ascii=False, indent=2) print(f"\n分析结果已写入: {out_path}") if __name__ == "__main__": account_name = "家有大志" items = [ {"post_id": "68fb6a5c000000000302e5de", "log_id": "20260317214307"}, {"post_id": "69185d49000000000d00f94e", "log_id": "20260317214841"}, {"post_id": "6921937a000000001b0278d1", "log_id": "20260317215616"} ] for item in items: post_id = item["post_id"] log_id = item["log_id"] main(account_name, post_id, log_id)