| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761 |
- """
- Pattern 维度聚类分析 Tool
- 功能概述:
- 1. 读取某次整体推导日志目录下各轮评估结果,累计每轮已匹配的 matched_post_point。
- 2. 基于帖子与 pattern 库的匹配结果,对 pattern 元素做打分与分类(已推导/未推导)。
- 3. 在账号人设树(实质/形式/意图)中,分别为「已推导元素」「未推导元素」寻找聚类节点。
- 输入参数:
- - account_name: 账号名称
- - post_id: 帖子 ID
- - log_id: 推导日志目录名(形如 20260313210921)
- """
- import json
- import sys
- from pathlib import Path
- from typing import Any, Dict, List, Optional, Tuple, Set
- # 保证直接运行或作为包加载时都能解析 utils / tools(IDE 可跳转)
- _root = Path(__file__).resolve().parent.parent
- if str(_root) not in sys.path:
- sys.path.insert(0, str(_root))
- from tools.point_match import _load_match_data # 帖子选题点与人设树节点匹配分
- from tools.find_tree_node import _load_trees # 加载三棵人设树
- _BASE_INPUT = Path(__file__).resolve().parent.parent / "input"
- _BASE_OUTPUT = Path(__file__).resolve().parent.parent / "output"
- # pattern 库 key 定义(与 find_pattern 中保持一致)
- TOP_KEYS = [
- "depth_max_with_name",
- "depth_mixed",
- "depth_max_concrete",
- "depth2_medium",
- "depth1_abstract",
- ]
- SUB_KEYS = ["two_x", "one_x", "zero_x"]
- # ---------------------------------------------------------------------------
- # 1. 读取推导日志:按轮次累计 matched_post_point
- # ---------------------------------------------------------------------------
- def _round_eval_dir(account_name: str, post_id: str, log_id: str) -> Path:
- """
- 推导日志目录:
- ../output/{account_name}/推导日志/{post_id}/{log_id}/
- """
- return _BASE_OUTPUT / account_name / "推导日志" / post_id / log_id
- def _load_round_matched_points(
- account_name: str,
- post_id: str,
- log_id: str,
- ) -> List[Dict[str, Any]]:
- """
- 读取指定日志目录下所有 {轮次}.评估.json,按轮次排序,生成:
- [
- {
- "round": 1,
- "round_points": [... 本轮 matched_post_point 去重 ...],
- "cumulative_points": [... 累计到本轮的 matched_post_point 去重 ...],
- },
- ...
- ]
- """
- base_dir = _round_eval_dir(account_name, post_id, log_id)
- if not base_dir.is_dir():
- return []
- eval_files: List[Tuple[int, Path]] = []
- for p in base_dir.glob("*.json"):
- name = p.name
- # 只处理 *_评估.json
- if not name.endswith("评估.json"):
- continue
- try:
- round_str = name.split("_", 1)[0]
- r = int(round_str)
- except Exception:
- continue
- eval_files.append((r, p))
- eval_files.sort(key=lambda x: x[0])
- results: List[Dict[str, Any]] = []
- cumulative: List[str] = []
- cumulative_set: Set[str] = set()
- for r, path in eval_files:
- try:
- with open(path, "r", encoding="utf-8") as f:
- data = json.load(f)
- except Exception:
- continue
- eval_results = data.get("eval_results") or []
- round_points: List[str] = []
- for item in eval_results:
- if not isinstance(item, dict):
- continue
- if not item.get("is_matched"):
- continue
- # 根据是否已完全推导,选择不同的帖子选题点字段:
- # - is_fully_derived 为 False 时,使用 derivation_output_point
- # - 其他情况(True 或缺失)使用 matched_post_point(兼容旧数据)
- if item.get("is_fully_derived") is False:
- mp = item.get("derivation_output_point")
- else:
- mp = item.get("matched_post_point")
- if mp is None:
- continue
- mp = str(mp).strip()
- if not mp:
- continue
- if mp not in round_points:
- round_points.append(mp)
- # 累加到本轮
- for mp in round_points:
- if mp not in cumulative_set:
- cumulative_set.add(mp)
- cumulative.append(mp)
- results.append(
- {
- "round": r,
- "round_points": round_points,
- "cumulative_points": list(cumulative),
- }
- )
- return results
- # ---------------------------------------------------------------------------
- # 2. 读取 pattern 库并按 matched_post_point 打分
- # ---------------------------------------------------------------------------
- def _pattern_file(account_name: str) -> Path:
- """pattern 库文件:../input/{account_name}/原始数据/pattern/processed_edge_data.json"""
- return _BASE_INPUT / account_name / "原始数据" / "pattern" / "processed_edge_data.json"
- def _load_raw_patterns(account_name: str) -> List[Dict[str, Any]]:
- """
- 读取 pattern 库中所有原始 pattern(保留 items 结构,不做合并)。
- 返回列表中每个元素形如原始 JSON 中的 pattern(此处不关心 item 的 point / dimension 字段)。
- """
- path = _pattern_file(account_name)
- if not path.is_file():
- return []
- with open(path, "r", encoding="utf-8") as f:
- data = json.load(f)
- patterns: List[Dict[str, Any]] = []
- for top in TOP_KEYS:
- block = data.get(top)
- if not isinstance(block, dict):
- continue
- for sub in SUB_KEYS:
- items = block.get(sub) or []
- if isinstance(items, list):
- for p in items:
- if isinstance(p, dict):
- patterns.append(p)
- return patterns
- def _slim_pattern_for_dedupe(p: Dict[str, Any]) -> Tuple[float, List[str]]:
- """
- 提取 pattern 的 support 与去重后的 item name 列表(按名称合并,不关心顺序),
- 用于与 find_pattern.py 中的去重逻辑对齐。
- """
- items = p.get("items") or []
- names = [str(it.get("name") or "").strip() for it in items if isinstance(it, dict)]
- seen: Set[str] = set()
- unique: List[str] = []
- for n in names:
- if n and n not in seen:
- seen.add(n)
- unique.append(n)
- try:
- support = float(p.get("support", 0.0))
- except (TypeError, ValueError):
- support = 0.0
- return support, unique
- def _dedupe_patterns(raw_patterns: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- 按 pattern 的 item name 集合去重(不区分顺序),与 find_pattern.py 的思路一致:
- - key 为 sorted(unique item names)
- - 同一个 key 仅保留 support 最大的 pattern(保留其原始 items 结构,方便后续打分)
- """
- key_to_best: Dict[Tuple[str, ...], Dict[str, Any]] = {}
- key_to_support: Dict[Tuple[str, ...], float] = {}
- for p in raw_patterns:
- support, unique = _slim_pattern_for_dedupe(p)
- if not unique:
- continue
- key = tuple(sorted(unique))
- best_support = key_to_support.get(key)
- if best_support is None or support > best_support:
- key_to_support[key] = support
- key_to_best[key] = p
- return list(key_to_best.values())
- def _score_patterns_by_matched_points(
- patterns: List[Dict[str, Any]],
- account_name: str,
- post_id: str,
- matched_post_points: List[str],
- match_threshold: float,
- ) -> List[Dict[str, Any]]:
- """
- 对传入的 pattern 列表计算其元素与 matched_post_point 列表的匹配分:
- - 匹配分来源:../input/{account_name}/match_data/{post_id}_匹配_all.json
- lookup key 为 (帖子选题点, 人设树节点)
- - 对于每个 pattern 的每个元素(item):
- * 以 item["name"] 视为人设树节点名称
- * 对每个 matched_post_point 查找匹配分,取最大值
- - 仅保留「至少有一个元素匹配分 >= match_threshold」的 pattern。
- 返回的 pattern 结构(item 不再保留 point / dimension 字段):
- {
- "id": xxx,
- "support": xxx,
- "items": [
- {
- "name": "xxx",
- "type": "xxx",
- "matched_post_point": "xxx" | null,
- "matched_score": float,
- },
- ...
- ],
- }
- """
- if not patterns or not matched_post_points:
- return []
- match_lookup = _load_match_data(account_name, post_id)
- matched_post_points = [str(x).strip() for x in matched_post_points if str(x).strip()]
- if not matched_post_points:
- return []
- results: List[Dict[str, Any]] = []
- for p in patterns:
- items = p.get("items") or []
- if not isinstance(items, list):
- continue
- scored_items: List[Dict[str, Any]] = []
- max_item_score = 0.0
- for it in items:
- if not isinstance(it, dict):
- continue
- name = str(it.get("name") or "").strip()
- _type = str(it.get("type") or "").strip()
- best_score = 0.0
- best_post_point: Optional[str] = None
- if name:
- for post_point in matched_post_points:
- # 如果帖子选题点与节点名称完全一致,直接视为满分匹配
- if post_point == name:
- s = 1.0
- else:
- score = match_lookup.get((post_point, name))
- if score is None:
- continue
- try:
- s = float(score)
- except (TypeError, ValueError):
- continue
- if s > best_score:
- best_score = s
- best_post_point = post_point
- if best_score > max_item_score:
- max_item_score = best_score
- scored_items.append(
- {
- "name": name,
- "type": _type,
- "matched_post_point": best_post_point,
- "matched_score": round(best_score, 6),
- }
- )
- if not scored_items:
- continue
- if max_item_score < match_threshold:
- # 该 pattern 在本轮未与帖子形成足够强的匹配
- continue
- results.append(
- {
- "id": p.get("id"),
- "support": p.get("support"),
- "items": scored_items,
- }
- )
- return results
- # ---------------------------------------------------------------------------
- # 3. 人设树节点信息 & 聚类节点搜索
- # ---------------------------------------------------------------------------
- class TreeIndex:
- """
- 人设树索引:
- - node_info: 节点 -> { "parent": 父节点名称, "children": [子节点名称...], "depth": 深度, "dimension": 维度名 }
- - roots: 维度名 -> 根节点名称(即维度名本身)
- - merged_tree: 将实质/形式/意图三棵树合并后的单个 JSON(顶层 key 为实质/形式/意图)
- """
- def __init__(self, account_name: str) -> None:
- self.account_name = account_name
- self.node_info: Dict[str, Dict[str, Any]] = {}
- self.roots: Dict[str, str] = {}
- # 三棵树合并后的 JSON:{"实质": {...}, "形式": {...}, "意图": {...}}
- self.merged_tree: Dict[str, Dict[str, Any]] = {}
- self._build()
- def _build(self) -> None:
- trees = _load_trees(self.account_name)
- # 1)先将三棵树合并成一个 JSON:{"实质": {...}, "形式": {...}, "意图": {...}}
- merged: Dict[str, Dict[str, Any]] = {}
- for dim_name, root in trees:
- if isinstance(root, dict):
- merged[dim_name] = root
- self.merged_tree = merged
- # 2)基于合并后的 JSON 构建 parent/children 结构
- for dim_name, root in merged.items():
- root_name = dim_name
- self.roots[dim_name] = root_name
- if root_name not in self.node_info:
- self.node_info[root_name] = {
- "parent": None,
- "children": [],
- "dimension": dim_name,
- "depth": 0,
- }
- def walk(parent_name: str, node_dict: Dict[str, Any]):
- children = node_dict.get("children") or {}
- for name, child in children.items():
- if not isinstance(child, dict):
- continue
- if name not in self.node_info:
- self.node_info[name] = {
- "parent": parent_name,
- "children": [],
- "dimension": dim_name,
- "depth": None, # 稍后统一计算
- }
- else:
- # 仅当不会形成自引用时才更新 parent(树中可能存在同名的父子节点)
- if name != parent_name:
- self.node_info[name]["parent"] = parent_name
- self.node_info[name]["dimension"] = dim_name
- # 维护父节点的 children
- if parent_name not in self.node_info:
- self.node_info[parent_name] = {
- "parent": None,
- "children": [],
- "dimension": dim_name,
- "depth": 0,
- }
- if name not in self.node_info[parent_name]["children"]:
- self.node_info[parent_name]["children"].append(name)
- walk(name, child)
- walk(root_name, root)
- # 统一计算各节点深度(从根开始 BFS)
- from collections import deque
- q = deque()
- for dim_name, root_name in self.roots.items():
- if root_name not in self.node_info:
- continue
- self.node_info[root_name]["depth"] = 0
- q.append(root_name)
- while q:
- cur = q.popleft()
- cur_depth = self.node_info[cur].get("depth", 0) or 0
- for child in self.node_info[cur].get("children", []):
- self.node_info.setdefault(child, {})
- if self.node_info[child].get("depth") is None:
- self.node_info[child]["depth"] = cur_depth + 1
- q.append(child)
- # 聚类搜索(不再区分维度)
- def find_clusters(
- self,
- elements: List[str],
- cluster_level: int,
- ) -> List[Dict[str, Any]]:
- """
- 在所有人设树中,为给定元素列表寻找聚类节点(不再要求 dimension 一致)。
- 规则(固定聚类层级 cluster_level):
- - 仅在 depth == cluster_level 的节点上做聚类判断:
- * 若某节点子树中包含的元素数量 >= 2,
- 且在该路径上尚未存在更高层(深度更小)的聚类节点,则将其视为一个聚类节点。
- - 对无法向上形成聚类的元素,为其寻找 depth == cluster_level 的祖先节点,
- 若存在则作为该元素的「单元素聚类」节点。
- - 返回:
- [
- {
- "cluster_node": "节点名",
- "from_elements": ["元素A", "元素B", ...]
- },
- ...
- ]
- """
- # 过滤出真实存在于人设树中的元素
- elem_set: Set[str] = set()
- for e in elements:
- e = str(e).strip()
- if not e:
- continue
- info = self.node_info.get(e)
- if not info:
- continue
- elem_set.add(e)
- if not elem_set:
- return []
- # 先计算每个节点子树中包含的元素数量(跨所有维度的根)
- # 注意:人设树数据中可能存在意外的环或重复引用,这里通过 visited 集合避免递归死循环。
- subtree_count: Dict[str, int] = {}
- def dfs_count(node: str, visited: Set[str]) -> int:
- if node in visited:
- # 检测到环,直接返回 0,避免无限递归
- return 0
- visited.add(node)
- cnt = 1 if node in elem_set else 0
- for ch in self.node_info.get(node, {}).get("children", []):
- cnt += dfs_count(ch, visited)
- subtree_count[node] = cnt
- return cnt
- for root_name in self.roots.values():
- dfs_count(root_name, set())
- # 再自上而下优先选择「更上层」聚类节点(但仅在 cluster_level 层):
- # - 若当前节点已作为聚类节点,则其子孙不再作为聚类节点(保证尽量向上聚类);
- # 同样需要防止意外的环导致递归过深,这里使用 visited 集合。
- clusters: Set[str] = set()
- def dfs_select(node: str, ancestor_selected: bool, visited: Set[str]) -> None:
- if node in visited:
- return
- visited.add(node)
- info = self.node_info.get(node) or {}
- depth = info.get("depth", 0) or 0
- cnt = subtree_count.get(node, 0)
- selected_here = False
- # 仅当祖先尚未被选中、当前节点位于 cluster_level 层且满足条件时,选当前节点为聚类节点
- if (not ancestor_selected) and depth == cluster_level and cnt >= 2:
- clusters.add(node)
- selected_here = True
- # 祖先已经被选中或当前节点被选中,则子孙不再作为聚类节点
- for ch in info.get("children", []):
- dfs_select(ch, ancestor_selected or selected_here, visited)
- for root_name in self.roots.values():
- dfs_select(root_name, False, set())
- if not clusters:
- return []
- # 统计每个聚类节点下真实覆盖的元素列表
- cluster_to_elements: Dict[str, Set[str]] = {c: set() for c in clusters}
- for e in elem_set:
- cur = e
- visited: Set[str] = set()
- while cur and cur not in visited:
- visited.add(cur)
- if cur in clusters:
- cluster_to_elements[cur].add(e)
- parent = self.node_info.get(cur, {}).get("parent")
- if parent is None:
- break
- cur = parent
- out: List[Dict[str, Any]] = []
- # 1)多元素聚类:仅统计真正输出的聚类节点所覆盖的元素,
- # 避免把「元素数不足 2 的节点」也算作已覆盖,从而导致元素丢失。
- covered_elems: Set[str] = set()
- for node in clusters:
- elems = sorted(cluster_to_elements.get(node) or [])
- if len(elems) < 2:
- # 主聚类逻辑只考虑覆盖至少 2 个元素的节点
- continue
- out.append(
- {
- "cluster_node": node,
- "from_elements": elems,
- }
- )
- for e in elems:
- covered_elems.add(e)
- # 2)对无法向上形成聚类的元素,给一个「单元素聚类」
- uncovered = elem_set - covered_elems
- # 将未覆盖元素按「cluster_level 层级的祖先节点」分组,确保同一个祖先节点下的
- # 多个元素合并为一个聚类,而不是多个单元素聚类。
- single_clusters: Dict[str, Set[str]] = {}
- for e in uncovered:
- # 单元素聚类时,cluster_node 应为「祖先节点」,不直接使用元素自身。
- # 这里固定选择 depth == cluster_level 的祖先节点。
- info_e = self.node_info.get(e) or {}
- parent = info_e.get("parent")
- cur = parent
- best_ancestor: Optional[str] = None
- visited_chain: Set[str] = set()
- while cur and cur not in visited_chain:
- visited_chain.add(cur)
- info = self.node_info.get(cur) or {}
- depth = info.get("depth", 0) or 0
- if depth == cluster_level:
- best_ancestor = cur
- break
- parent = info.get("parent")
- if parent is None:
- break
- cur = parent
- if best_ancestor:
- single_clusters.setdefault(best_ancestor, set()).add(e)
- for anc, elems in single_clusters.items():
- out.append(
- {
- "cluster_node": anc,
- "from_elements": sorted(elems),
- }
- )
- # 为了输出更稳定,按 from_elements 的元素数量从大到小排序,数量相同再按节点名排序
- out.sort(key=lambda x: (-len(x["from_elements"]), x["cluster_node"]))
- return out
- # ---------------------------------------------------------------------------
- # 4. 对单轮数据执行 pattern & 聚类分析
- # ---------------------------------------------------------------------------
- def _analyze_single_round(
- account_name: str,
- post_id: str,
- patterns: List[Dict[str, Any]],
- tree_index: TreeIndex,
- cumulative_points: List[str],
- match_threshold: float,
- cluster_level: int,
- ) -> Dict[str, Any]:
- """
- 对某一轮(给定累计 matched_post_point 列表)执行分析:
- - 筛选与帖子匹配度 >= match_threshold 的 pattern
- - 将 pattern 元素按 matched_score 分为「已推导元素」与「未推导元素」
- - 在三棵人设树中(不区分维度)为两组元素分别寻找聚类节点
- """
- patterns = _score_patterns_by_matched_points(
- patterns=patterns,
- account_name=account_name,
- post_id=post_id,
- matched_post_points=cumulative_points,
- match_threshold=match_threshold,
- )
- print(f"_score_patterns_by_matched_points len: {len(patterns)}")
- # 已推导 / 未推导 元素列表(不再按维度拆分)
- derived_elems: List[str] = []
- underived_elems: List[str] = []
- for p in patterns:
- for it in p.get("items", []):
- if not isinstance(it, dict):
- continue
- node_name = str(it.get("name") or "").strip()
- if not node_name:
- continue
- score = float(it.get("matched_score") or 0.0)
- if score >= match_threshold:
- derived_elems.append(node_name)
- else:
- underived_elems.append(node_name)
- # 为避免重复元素干扰统计与聚类,先做去重
- derived_set: List[str] = list(dict.fromkeys(derived_elems))
- underived_set: List[str] = list(dict.fromkeys(underived_elems))
- clusters: Dict[str, Any] = {
- "derived": [],
- "underived": [],
- }
- # 已推导元素聚类
- if derived_set:
- c = tree_index.find_clusters(derived_set, cluster_level=cluster_level)
- clusters["derived"] = c or []
- # 未推导元素聚类
- if underived_set:
- c = tree_index.find_clusters(underived_set, cluster_level=cluster_level)
- clusters["underived"] = c or []
- # 在同一轮中,如果某个 cluster_node 已经在 derived 聚类里出现过,
- # 则从 underived 聚类中剔除该 cluster_node,避免重复展示。
- if isinstance(clusters.get("derived"), list) and isinstance(clusters.get("underived"), list):
- derived_nodes = {
- str(item.get("cluster_node"))
- for item in clusters["derived"]
- if isinstance(item, dict) and item.get("cluster_node") is not None
- }
- if derived_nodes:
- filtered_underived = []
- for item in clusters["underived"]:
- if not isinstance(item, dict):
- continue
- node = str(item.get("cluster_node"))
- if node in derived_nodes:
- continue
- filtered_underived.append(item)
- clusters["underived"] = filtered_underived
- return {
- "matched_post_points": list(cumulative_points),
- "patterns": patterns,
- "clusters": clusters,
- # 统计信息:
- # - patterns_count: 本轮参与分析的 pattern 数量
- # - derived_cluster_count: 已推导元素聚类节点数量
- # - underived_cluster_count: 未推导元素聚类节点数量
- "patterns_count": len(patterns),
- "derived_cluster_count": len(clusters["derived"]) if isinstance(clusters.get("derived"), list) else 0,
- "underived_cluster_count": len(clusters["underived"]) if isinstance(clusters.get("underived"), list) else 0,
- }
- def pattern_dimension_analyze(
- account_name: str,
- post_id: str,
- log_id: str,
- match_threshold: float = 0.6,
- cluster_level: int = 2,
- ) -> Dict[str, Any]:
- """
- Pattern 维度分析主入口。
- 参数
- -------
- account_name : 账号名(用于定位 input / output 下的数据目录)
- post_id : 帖子 ID(用于定位推导日志与帖子匹配数据)
- log_id : 推导日志目录名(../output/{account_name}/推导日志/{post_id}/{log_id}/)
- match_threshold : pattern 元素与 matched_post_point 的最小匹配分,默认 0.6
- cluster_level : 在人设树中搜索聚类节点的聚类层级(root 为 0 层),默认 2
- """
- eval_dir = _round_eval_dir(account_name, post_id, log_id)
- if not eval_dir.is_dir():
- raise FileNotFoundError(f"推导日志目录不存在: {eval_dir}")
- round_infos = _load_round_matched_points(account_name, post_id, log_id)
- if not round_infos:
- return {
- "account_name": account_name,
- "post_id": post_id,
- "log_id": log_id,
- "match_threshold": match_threshold,
- "cluster_level": cluster_level,
- "rounds": [],
- "message": "未在指定日志目录下找到任何评估结果文件(*_评估.json)",
- }
- tree_index = TreeIndex(account_name)
- # pattern 库只在整体分析时读取 & 去重一次,避免每一轮重复 IO 与解析
- raw_patterns = _load_raw_patterns(account_name)
- deduped_patterns = _dedupe_patterns(raw_patterns)
- print(f"deduped_patterns len: {len(deduped_patterns)}")
- rounds_output: List[Dict[str, Any]] = []
- for info in round_infos:
- r = info["round"]
- cumulative_points = info["cumulative_points"]
- analyzed = _analyze_single_round(
- account_name=account_name,
- post_id=post_id,
- patterns=deduped_patterns,
- tree_index=tree_index,
- cumulative_points=cumulative_points,
- match_threshold=match_threshold,
- cluster_level=cluster_level,
- )
- analyzed["round"] = r
- rounds_output.append(analyzed)
- result = {
- "account_name": account_name,
- "post_id": post_id,
- "log_id": log_id,
- "match_threshold": match_threshold,
- "cluster_level": cluster_level,
- "rounds": rounds_output,
- }
- return result
- def main() -> None:
- """本地简单测试:以家有大志账号的一次推导日志做分析,并将结果写入输出目录。"""
- account_name = "家有大志"
- post_id = "68fb6a5c000000000302e5de"
- # 需要根据实际运行结果修改为最新的 log_id
- log_id = "20260317112639"
- result = pattern_dimension_analyze(
- account_name=account_name,
- post_id=post_id,
- log_id=log_id,
- match_threshold=0.5,
- cluster_level=2,
- )
- # 控制台打印前 4000 字符,便于快速查看
- # print(json.dumps(result, ensure_ascii=False, indent=2)[:4000] + "...")
- # 写入输出文件:../output/{account_name}/推导日志/{post_id}/{log_id}/pattern_dimension_analyze.json
- out_dir = _round_eval_dir(account_name, post_id, log_id)
- out_dir.mkdir(parents=True, exist_ok=True)
- output_file_name = f"{post_id}_pattern_dimension_analyze.json"
- out_path = out_dir / output_file_name
- with open(out_path, "w", encoding="utf-8") as f:
- json.dump(result, f, ensure_ascii=False, indent=2)
- print(f"\n分析结果已写入: {out_path}")
- if __name__ == "__main__":
- main()
|