| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757 |
- """
- Pattern 维度分析 Tool
- 功能概述:
- 1. 读取某次整体推导日志目录下各轮评估结果,累计 matched_post_point / derivation_output_point 等字段。
- 2. 每轮通过 derivation_output_point 在人设树中找到 cluster_level 层祖先节点(已推导维度节点集合)。
- 3. 从 deduped_patterns 中筛选包含已推导维度节点的 pattern,并对各元素标记是否已推导。
- 输入参数:
- - account_name: 账号名称
- - post_id: 帖子 ID
- - log_id: 推导日志目录名(形如 20260313210921)
- - cluster_level: 在人设树中查找祖先节点的目标深度(root 为 0 层)
- """
- import json
- import sys
- from pathlib import Path
- from typing import Any, Dict, List, Optional, Tuple, Set
- # 保证直接运行或作为包加载时都能解析 utils / tools(IDE 可跳转)
- _root = Path(__file__).resolve().parent.parent
- if str(_root) not in sys.path:
- sys.path.insert(0, str(_root))
- from tools.find_tree_node import _load_trees # 加载三棵人设树
- _BASE_INPUT = Path(__file__).resolve().parent.parent / "input"
- _BASE_OUTPUT = Path(__file__).resolve().parent.parent / "output"
- # pattern 库 key 定义(与 find_pattern 中保持一致)
- TOP_KEYS = [
- "depth_4",
- ]
- SUB_KEYS = ["two_x", "one_x", "zero_x"]
- # ---------------------------------------------------------------------------
- # 1. 读取推导日志:按轮次累计 matched_post_point
- # ---------------------------------------------------------------------------
- def _round_eval_dir(account_name: str, post_id: str, log_id: str) -> Path:
- """
- 推导日志目录:
- ../output/{account_name}/推导日志/{post_id}/{log_id}/
- """
- return _BASE_OUTPUT / account_name / "推导日志" / post_id / log_id
- def _load_round_matched_points(
- account_name: str,
- post_id: str,
- log_id: str,
- ) -> List[Dict[str, Any]]:
- """
- 读取指定日志目录下所有 {轮次}.评估.json,按轮次排序,生成:
- [
- {
- "round": 1,
- "round_points": [
- {
- "matched_post_point": "叙事结构",
- "derivation_output_point": "叙事编排",
- "matched_score": 0.9151,
- "is_fully_derived": true,
- },
- ...
- ],
- "cumulative_points": [
- ... 累计到本轮的去重列表(以 derivation_output_point 为去重 key) ...
- ],
- },
- ...
- ]
- """
- base_dir = _round_eval_dir(account_name, post_id, log_id)
- if not base_dir.is_dir():
- return []
- eval_files: List[Tuple[int, Path]] = []
- for p in base_dir.glob("*.json"):
- name = p.name
- # 只处理 *_评估.json
- if not name.endswith("评估.json"):
- continue
- try:
- round_str = name.split("_", 1)[0]
- r = int(round_str)
- except Exception:
- continue
- eval_files.append((r, p))
- eval_files.sort(key=lambda x: x[0])
- results: List[Dict[str, Any]] = []
- cumulative: List[Dict[str, Any]] = []
- cumulative_set: Set[str] = set() # 以 derivation_output_point 去重
- for r, path in eval_files:
- try:
- with open(path, "r", encoding="utf-8") as f:
- data = json.load(f)
- except Exception:
- continue
- eval_results = data.get("eval_results") or []
- round_points: List[Dict[str, Any]] = []
- seen_in_round: Set[str] = set()
- for item in eval_results:
- if not isinstance(item, dict):
- continue
- if not item.get("is_matched"):
- continue
- dop = item.get("derivation_output_point")
- if dop is None:
- continue
- dop = str(dop).strip()
- if not dop:
- continue
- # 本轮内按 derivation_output_point 去重
- if dop in seen_in_round:
- continue
- seen_in_round.add(dop)
- mpp = item.get("matched_post_point")
- entry: Dict[str, Any] = {
- "matched_post_point": str(mpp).strip() if mpp is not None else None,
- "derivation_output_point": dop,
- "matched_score": item.get("matched_score"),
- "is_fully_derived": item.get("is_fully_derived"),
- }
- round_points.append(entry)
- # 累加到累计列表(按 derivation_output_point 去重)
- for entry in round_points:
- dop = entry["derivation_output_point"]
- if dop not in cumulative_set:
- cumulative_set.add(dop)
- cumulative.append(entry)
- results.append(
- {
- "round": r,
- "round_points": round_points,
- "cumulative_points": list(cumulative),
- }
- )
- return results
- # ---------------------------------------------------------------------------
- # 2. 读取 pattern 库并按 matched_post_point 打分
- # ---------------------------------------------------------------------------
- def _pattern_file(account_name: str) -> Path:
- """pattern 库文件:../input/{account_name}/原始数据/pattern/processed_edge_data.json"""
- return _BASE_INPUT / account_name / "原始数据" / "pattern" / "processed_edge_data.json"
- def _load_raw_patterns(account_name: str) -> List[Dict[str, Any]]:
- """
- 读取 pattern 库中所有原始 pattern(保留 items 结构,不做合并)。
- 返回列表中每个元素形如原始 JSON 中的 pattern(此处不关心 item 的 point / dimension 字段)。
- """
- path = _pattern_file(account_name)
- if not path.is_file():
- return []
- with open(path, "r", encoding="utf-8") as f:
- data = json.load(f)
- patterns: List[Dict[str, Any]] = []
- for top in TOP_KEYS:
- block = data.get(top)
- if not isinstance(block, dict):
- continue
- for sub in SUB_KEYS:
- items = block.get(sub) or []
- if isinstance(items, list):
- for p in items:
- if isinstance(p, dict):
- patterns.append(p)
- return patterns
- def _slim_pattern_for_dedupe(p: Dict[str, Any]) -> Tuple[float, List[str]]:
- """
- 提取 pattern 的 support 与去重后的 item name 列表(按名称合并,不关心顺序),
- 用于与 find_pattern.py 中的去重逻辑对齐。
- """
- items = p.get("items") or []
- names = [str(it.get("name") or "").strip() for it in items if isinstance(it, dict)]
- seen: Set[str] = set()
- unique: List[str] = []
- for n in names:
- if n and n not in seen:
- seen.add(n)
- unique.append(n)
- try:
- support = float(p.get("support", 0.0))
- except (TypeError, ValueError):
- support = 0.0
- return support, unique
- def _dedupe_patterns(raw_patterns: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 按 pattern 的 item name 集合去重(不区分顺序),与 find_pattern.py 的思路一致:
- - key 为 sorted(unique item names)
- - 同一个 key 仅保留 support 最大的 pattern(保留其原始 items 结构,方便后续打分)
- """
- key_to_best: Dict[Tuple[str, ...], Dict[str, Any]] = {}
- key_to_support: Dict[Tuple[str, ...], float] = {}
- for p in raw_patterns:
- support, unique = _slim_pattern_for_dedupe(p)
- if not unique:
- continue
- key = tuple(sorted(unique))
- best_support = key_to_support.get(key)
- if best_support is None or support > best_support:
- key_to_support[key] = support
- key_to_best[key] = p
- return list(key_to_best.values())
- # ---------------------------------------------------------------------------
- # 3. 人设树节点信息 & 聚类节点搜索
- # ---------------------------------------------------------------------------
- class TreeIndex:
- """
- 人设树索引:
- - node_info: 节点 -> { "parent": 父节点名称, "children": [子节点名称...], "depth": 深度, "dimension": 维度名 }
- - roots: 维度名 -> 根节点名称(即维度名本身)
- - merged_tree: 将实质/形式/意图三棵树合并后的单个 JSON(顶层 key 为实质/形式/意图)
- """
- def __init__(self, account_name: str) -> None:
- self.account_name = account_name
- self.node_info: Dict[str, Dict[str, Any]] = {}
- self.roots: Dict[str, str] = {}
- # 三棵树合并后的 JSON:{"实质": {...}, "形式": {...}, "意图": {...}}
- self.merged_tree: Dict[str, Dict[str, Any]] = {}
- self._build()
- def _build(self) -> None:
- trees = _load_trees(self.account_name)
- # 1)先将三棵树合并成一个 JSON:{"实质": {...}, "形式": {...}, "意图": {...}}
- merged: Dict[str, Dict[str, Any]] = {}
- for dim_name, root in trees:
- if isinstance(root, dict):
- merged[dim_name] = root
- self.merged_tree = merged
- # 2)基于合并后的 JSON 构建 parent/children 结构
- for dim_name, root in merged.items():
- root_name = dim_name
- self.roots[dim_name] = root_name
- if root_name not in self.node_info:
- self.node_info[root_name] = {
- "parent": None,
- "children": [],
- "dimension": dim_name,
- "depth": 0,
- }
- def walk(parent_name: str, node_dict: Dict[str, Any]):
- children = node_dict.get("children") or {}
- for name, child in children.items():
- if not isinstance(child, dict):
- continue
- if name not in self.node_info:
- self.node_info[name] = {
- "parent": parent_name,
- "children": [],
- "dimension": dim_name,
- "depth": None, # 稍后统一计算
- }
- else:
- # 仅当不会形成自引用时才更新 parent(树中可能存在同名的父子节点)
- if name != parent_name:
- self.node_info[name]["parent"] = parent_name
- self.node_info[name]["dimension"] = dim_name
- # 维护父节点的 children
- if parent_name not in self.node_info:
- self.node_info[parent_name] = {
- "parent": None,
- "children": [],
- "dimension": dim_name,
- "depth": 0,
- }
- if name not in self.node_info[parent_name]["children"]:
- self.node_info[parent_name]["children"].append(name)
- walk(name, child)
- walk(root_name, root)
- # 统一计算各节点深度(从根开始 BFS)
- from collections import deque
- q = deque()
- for dim_name, root_name in self.roots.items():
- if root_name not in self.node_info:
- continue
- self.node_info[root_name]["depth"] = 0
- q.append(root_name)
- while q:
- cur = q.popleft()
- cur_depth = self.node_info[cur].get("depth", 0) or 0
- for child in self.node_info[cur].get("children", []):
- self.node_info.setdefault(child, {})
- if self.node_info[child].get("depth") is None:
- self.node_info[child]["depth"] = cur_depth + 1
- q.append(child)
- def find_ancestor_at_level(self, node_name: str, level: int) -> Optional[str]:
- """
- 在人设树中找到 node_name 的 depth == level 的祖先节点。
- - 若 node_name 自身 depth == level,直接返回自身。
- - 若 node_name depth < level(比目标层浅),返回自身。
- - 否则沿 parent 链向上查找,返回第一个 depth == level 的祖先节点。
- """
- info = self.node_info.get(node_name)
- if not info:
- return None
- depth = info.get("depth")
- if depth is None:
- return None
- if depth <= level:
- return node_name
- cur = node_name
- visited: Set[str] = set()
- while cur and cur not in visited:
- visited.add(cur)
- cur_info = self.node_info.get(cur) or {}
- cur_depth = cur_info.get("depth") or 0
- if cur_depth == level:
- return cur
- if cur_depth < level:
- return cur
- parent = cur_info.get("parent")
- if parent is None:
- return cur
- cur = parent
- return None
- # 聚类搜索(不再区分维度)
- def find_clusters(
- self,
- elements: List[str],
- cluster_level: int,
- ) -> List[Dict[str, Any]]:
- """
- 在所有人设树中,为给定元素列表寻找聚类节点(不再要求 dimension 一致)。
- 规则(固定聚类层级 cluster_level):
- - 仅在 depth == cluster_level 的节点上做聚类判断:
- * 若某节点子树中包含的元素数量 >= 2,
- 且在该路径上尚未存在更高层(深度更小)的聚类节点,则将其视为一个聚类节点。
- - 对无法向上形成聚类的元素,为其寻找 depth == cluster_level 的祖先节点,
- 若存在则作为该元素的「单元素聚类」节点。
- - 返回:
- [
- {
- "cluster_node": "节点名",
- "from_elements": ["元素A", "元素B", ...]
- },
- ...
- ]
- """
- # 过滤出真实存在于人设树中的元素
- elem_set: Set[str] = set()
- for e in elements:
- e = str(e).strip()
- if not e:
- continue
- info = self.node_info.get(e)
- if not info:
- continue
- elem_set.add(e)
- if not elem_set:
- return []
- # 先计算每个节点子树中包含的元素数量(跨所有维度的根)
- # 注意:人设树数据中可能存在意外的环或重复引用,这里通过 visited 集合避免递归死循环。
- subtree_count: Dict[str, int] = {}
- def dfs_count(node: str, visited: Set[str]) -> int:
- if node in visited:
- # 检测到环,直接返回 0,避免无限递归
- return 0
- visited.add(node)
- cnt = 1 if node in elem_set else 0
- for ch in self.node_info.get(node, {}).get("children", []):
- cnt += dfs_count(ch, visited)
- subtree_count[node] = cnt
- return cnt
- for root_name in self.roots.values():
- dfs_count(root_name, set())
- # 再自上而下优先选择「更上层」聚类节点(但仅在 cluster_level 层):
- # - 若当前节点已作为聚类节点,则其子孙不再作为聚类节点(保证尽量向上聚类);
- # 同样需要防止意外的环导致递归过深,这里使用 visited 集合。
- clusters: Set[str] = set()
- def dfs_select(node: str, ancestor_selected: bool, visited: Set[str]) -> None:
- if node in visited:
- return
- visited.add(node)
- info = self.node_info.get(node) or {}
- depth = info.get("depth", 0) or 0
- cnt = subtree_count.get(node, 0)
- selected_here = False
- # 仅当祖先尚未被选中、当前节点位于 cluster_level 层且满足条件时,选当前节点为聚类节点
- if (not ancestor_selected) and depth == cluster_level and cnt >= 2:
- clusters.add(node)
- selected_here = True
- # 祖先已经被选中或当前节点被选中,则子孙不再作为聚类节点
- for ch in info.get("children", []):
- dfs_select(ch, ancestor_selected or selected_here, visited)
- for root_name in self.roots.values():
- dfs_select(root_name, False, set())
- if not clusters:
- return []
- # 统计每个聚类节点下真实覆盖的元素列表
- cluster_to_elements: Dict[str, Set[str]] = {c: set() for c in clusters}
- for e in elem_set:
- cur = e
- visited: Set[str] = set()
- while cur and cur not in visited:
- visited.add(cur)
- if cur in clusters:
- cluster_to_elements[cur].add(e)
- parent = self.node_info.get(cur, {}).get("parent")
- if parent is None:
- break
- cur = parent
- out: List[Dict[str, Any]] = []
- # 1)多元素聚类:仅统计真正输出的聚类节点所覆盖的元素,
- # 避免把「元素数不足 2 的节点」也算作已覆盖,从而导致元素丢失。
- covered_elems: Set[str] = set()
- for node in clusters:
- elems = sorted(cluster_to_elements.get(node) or [])
- if len(elems) < 2:
- # 主聚类逻辑只考虑覆盖至少 2 个元素的节点
- continue
- out.append(
- {
- "cluster_node": node,
- "from_elements": elems,
- }
- )
- for e in elems:
- covered_elems.add(e)
- # 2)对无法向上形成聚类的元素,给一个「单元素聚类」
- uncovered = elem_set - covered_elems
- # 将未覆盖元素按「cluster_level 层级的祖先节点」分组,确保同一个祖先节点下的
- # 多个元素合并为一个聚类,而不是多个单元素聚类。
- single_clusters: Dict[str, Set[str]] = {}
- for e in uncovered:
- # 单元素聚类时,cluster_node 应为「祖先节点」,不直接使用元素自身。
- # 这里固定选择 depth == cluster_level 的祖先节点。
- info_e = self.node_info.get(e) or {}
- parent = info_e.get("parent")
- cur = parent
- best_ancestor: Optional[str] = None
- visited_chain: Set[str] = set()
- while cur and cur not in visited_chain:
- visited_chain.add(cur)
- info = self.node_info.get(cur) or {}
- depth = info.get("depth", 0) or 0
- if depth == cluster_level:
- best_ancestor = cur
- break
- parent = info.get("parent")
- if parent is None:
- break
- cur = parent
- if best_ancestor:
- single_clusters.setdefault(best_ancestor, set()).add(e)
- for anc, elems in single_clusters.items():
- out.append(
- {
- "cluster_node": anc,
- "from_elements": sorted(elems),
- }
- )
- # 为了输出更稳定,按 from_elements 的元素数量从大到小排序,数量相同再按节点名排序
- out.sort(key=lambda x: (-len(x["from_elements"]), x["cluster_node"]))
- return out
- # ---------------------------------------------------------------------------
- # 4. 对单轮数据执行 pattern & 聚类分析
- # ---------------------------------------------------------------------------
- def _analyze_single_round(
- patterns: List[Dict[str, Any]],
- tree_index: TreeIndex,
- cumulative_points: List[Dict[str, Any]],
- cluster_level: int,
- ) -> Dict[str, Any]:
- """
- 对某一轮(给定累计 point 列表)执行维度分析:
- 1. 从 cumulative_points 中提取 derivation_output_point,
- 在人设树中找到每个节点的 cluster_level 层祖先 → derived_ancestor_set(已推导维度节点集合)。
- 2. 从 deduped_patterns 中筛选出包含 derived_ancestor_set 中节点的 pattern。
- 3. 对筛选出 pattern 的每个元素标记是否已推导:
- - 元素在 derived_ancestor_set 中 → is_derived=True(已推导维度)
- - 其他 → is_derived=False(未推导维度)
- 4. 汇总 derived_dims / underived_dims 列表。
- 返回结构:
- {
- "cumulative_points": [...], # 原始累计 point 对象列表
- "derived_ancestor_nodes": [...], # 所有 derivation_output_point 对应的 cluster_level 层祖先节点(已推导维度节点集合)
- "patterns": [...], # 筛选后带 is_derived 标记的 pattern 列表
- "derived_dims": [...], # 已推导维度节点(去重,出现于筛选 pattern 中)
- "underived_dims": [...], # 未推导维度节点(去重,排除已推导节点)
- "patterns_count": int,
- "derived_dim_count": int,
- "underived_dim_count": int,
- }
- """
- # 1. 收集 derived_ancestor_set,同时记录每个祖先节点对应的 matched_post_point 来源
- derived_ancestor_set: Set[str] = set()
- ancestor_to_mpps: Dict[str, List[str]] = {} # 祖先节点 -> [matched_post_point, ...]
- for entry in cumulative_points:
- dop = entry.get("derivation_output_point")
- if not dop:
- continue
- ancestor = tree_index.find_ancestor_at_level(str(dop).strip(), cluster_level)
- if not ancestor:
- continue
- derived_ancestor_set.add(ancestor)
- mpp = entry.get("matched_post_point") or ""
- if mpp and mpp not in ancestor_to_mpps.get(ancestor, []):
- ancestor_to_mpps.setdefault(ancestor, []).append(mpp)
- # 2. 筛选 pattern:已推导维度节点占所有元素的比例 >= 50%
- filtered_patterns: List[Dict[str, Any]] = []
- for p in patterns:
- items = p.get("items") or []
- item_names = [
- str(it.get("name") or "").strip()
- for it in items
- if isinstance(it, dict)
- ]
- if not item_names:
- continue
- if len(item_names) < 5:
- continue
- derived_count = sum(1 for name in item_names if name in derived_ancestor_set)
- if derived_count / len(item_names) >= 0.5:
- filtered_patterns.append(p)
- print(
- f"filtered_patterns: {len(filtered_patterns)}, "
- f"derived_ancestor_set: {len(derived_ancestor_set)}"
- )
- def _node_label(name: str, is_derived: bool) -> str:
- """
- 返回格式化标签:
- - 已推导节点:'node_name->dimension(mpp1,mpp2,...)'
- - 未推导节点:'node_name->dimension'
- """
- dim = (tree_index.node_info.get(name) or {}).get("dimension") or ""
- base = f"{name}->{dim}" if dim else name
- if is_derived:
- mpps = ancestor_to_mpps.get(name) or []
- if mpps:
- return f"{base}({','.join(mpps)})"
- return base
- # 3. 对筛选 pattern 元素分类并汇总维度列表
- derived_dims: List[str] = []
- underived_dims: List[str] = []
- derived_dims_seen: Set[str] = set()
- underived_dims_seen: Set[str] = set()
- scored_patterns: List[Dict[str, Any]] = []
- for p in filtered_patterns:
- items = p.get("items") or []
- tagged_items: List[Dict[str, Any]] = []
- for it in items:
- if not isinstance(it, dict):
- continue
- name = str(it.get("name") or "").strip()
- is_derived = name in derived_ancestor_set
- tagged_items.append(
- {
- "name": name,
- "is_derived": is_derived,
- }
- )
- if is_derived:
- if name and name not in derived_dims_seen:
- derived_dims_seen.add(name)
- derived_dims.append(_node_label(name, is_derived=True))
- else:
- if name and name not in underived_dims_seen:
- underived_dims_seen.add(name)
- underived_dims.append(_node_label(name, is_derived=False))
- scored_patterns.append(
- {
- "id": p.get("id"),
- "support": p.get("support"),
- "items": tagged_items,
- }
- )
- # 从 underived_dims 中排除与 derived_dims 重叠的节点
- underived_dims = [d for d in underived_dims if d.split("->")[0] not in derived_dims_seen]
- # 按 is_derived=True 的元素数量从高到低排序,数量相同再按元素总数从高到低
- scored_patterns.sort(
- key=lambda x: (
- sum(1 for it in x.get("items", []) if it.get("is_derived")),
- len(x.get("items", [])),
- ),
- reverse=True,
- )
- return {
- "cumulative_points": list(cumulative_points),
- "derived_ancestor_nodes": sorted(derived_ancestor_set),
- "patterns": scored_patterns,
- "derived_dims": derived_dims,
- "underived_dims": underived_dims,
- "patterns_count": len(scored_patterns),
- "derived_dim_count": len(derived_dims),
- "underived_dim_count": len(underived_dims),
- }
- def pattern_dimension_analyze(
- account_name: str,
- post_id: str,
- log_id: str,
- cluster_level: int = 2,
- ) -> Dict[str, Any]:
- """
- Pattern 维度分析主入口。
- 参数
- -------
- account_name : 账号名(用于定位 input / output 下的数据目录)
- post_id : 帖子 ID(用于定位推导日志)
- log_id : 推导日志目录名(../output/{account_name}/推导日志/{post_id}/{log_id}/)
- cluster_level : 在人设树中查找祖先节点的目标深度(root 为 0 层),默认 2
- 逻辑概述
- --------
- 每一轮:
- 1. 从 derivation_output_point 在人设树中找到 cluster_level 层祖先节点 → 已推导维度节点集合。
- 2. 筛选包含已推导维度节点的 pattern。
- 3. 标记每个 pattern 元素是否已推导,汇总 derived_dims / underived_dims。
- """
- eval_dir = _round_eval_dir(account_name, post_id, log_id)
- if not eval_dir.is_dir():
- raise FileNotFoundError(f"推导日志目录不存在: {eval_dir}")
- round_infos = _load_round_matched_points(account_name, post_id, log_id)
- if not round_infos:
- return {
- "account_name": account_name,
- "post_id": post_id,
- "log_id": log_id,
- "cluster_level": cluster_level,
- "rounds": [],
- "message": "未在指定日志目录下找到任何评估结果文件(*_评估.json)",
- }
- tree_index = TreeIndex(account_name)
- # pattern 库只在整体分析时读取 & 去重一次,避免每一轮重复 IO 与解析
- raw_patterns = _load_raw_patterns(account_name)
- deduped_patterns = _dedupe_patterns(raw_patterns)
- print(f"deduped_patterns len: {len(deduped_patterns)}")
- rounds_output: List[Dict[str, Any]] = []
- for info in round_infos:
- r = info["round"]
- cumulative_points = info["cumulative_points"]
- analyzed = _analyze_single_round(
- patterns=deduped_patterns,
- tree_index=tree_index,
- cumulative_points=cumulative_points,
- cluster_level=cluster_level,
- )
- analyzed["round"] = r
- rounds_output.append(analyzed)
- return {
- "account_name": account_name,
- "post_id": post_id,
- "log_id": log_id,
- "cluster_level": cluster_level,
- "rounds": rounds_output,
- }
- def main(account_name, post_id, log_id) -> None:
- """本地简单测试:以家有大志账号的一次推导日志做分析,并将结果写入输出目录。"""
- result = pattern_dimension_analyze(
- account_name=account_name,
- post_id=post_id,
- log_id=log_id,
- cluster_level=3,
- )
- # 控制台打印前 4000 字符,便于快速查看
- # print(json.dumps(result, ensure_ascii=False, indent=2)[:4000] + "...")
- # 写入输出文件:../output/{account_name}/推导日志/{post_id}/{log_id}/pattern_dimension_analyze.json
- out_dir = _round_eval_dir(account_name, post_id, log_id)
- out_dir.mkdir(parents=True, exist_ok=True)
- output_file_name = f"{post_id}_pattern_dimension_analyze.json"
- out_path = out_dir / output_file_name
- with open(out_path, "w", encoding="utf-8") as f:
- json.dump(result, f, ensure_ascii=False, indent=2)
- print(f"\n分析结果已写入: {out_path}")
- if __name__ == "__main__":
- account_name = "家有大志"
- items = [
- {"post_id": "68fb6a5c000000000302e5de", "log_id": "20260317214307"},
- {"post_id": "69185d49000000000d00f94e", "log_id": "20260317214841"},
- {"post_id": "6921937a000000001b0278d1", "log_id": "20260317215616"}
- ]
- for item in items:
- post_id = item["post_id"]
- log_id = item["log_id"]
- main(account_name, post_id, log_id)
|