from __future__ import annotations import html as html_lib import json import math from dataclasses import dataclass, field from pathlib import Path from sqlalchemy import text from examples.demand.db_manager import DatabaseManager db = DatabaseManager() SOURCE_TYPES = ["实质", "形式", "意图"] # JSON field → 展示维度名(扩展新维度只需在此添加) SCORE_DIMS: dict[str, str] = {"score": "rov"} THEME = { "实质": { "root_bg": "#e8841a", "line": "#d4a574", "low": (253, 243, 228), "high": (195, 80, 5), "no_score": "#f5e6d3", }, "形式": { "root_bg": "#5b9bd5", "line": "#8cb9dc", "low": (232, 244, 253), "high": (30, 90, 165), "no_score": "#d6e9f8", }, "意图": { "root_bg": "#70ad47", "line": "#8fc270", "low": (235, 249, 225), "high": (40, 120, 20), "no_score": "#dbefd0", }, } # --------------------------------------------------------------------------- # Color helpers (Python 侧用于首次渲染;JS 侧有等价实现用于切换) # --------------------------------------------------------------------------- def _luminance(r: int, g: int, b: int) -> float: return 0.299 * r / 255 + 0.587 * g / 255 + 0.114 * b / 255 def _blend(low: tuple, high: tuple, t: float) -> tuple[int, int, int]: t = max(0.0, min(1.0, t)) return tuple(int(lo + (hi - lo) * t) for lo, hi in zip(low, high)) def _rgb_hex(rgb: tuple) -> str: return f"#{rgb[0]:02x}{rgb[1]:02x}{rgb[2]:02x}" def _normalize_score(score: float, max_score: float) -> float: if max_score <= 0 or score <= 0: return 0.0 return math.log1p(score) / math.log1p(max_score) # --------------------------------------------------------------------------- # Data model # --------------------------------------------------------------------------- @dataclass class CatNode: id: int name: str source_stable_id: int | None source_type: str description: str | None level: int | None parent_id: int | None element_count: int children: list[CatNode] = field(default_factory=list) @property def subtree_element_count(self) -> int: total = self.element_count or 0 for c in self.children: total += c.subtree_element_count return total # --------------------------------------------------------------------------- # Data loading # --------------------------------------------------------------------------- def fetch_categories(eid: int) -> list[dict]: session = db.get_session() try: rows = session.execute( text( "SELECT id, source_stable_id, source_type, name, description, " "level, parent_id, element_count " "FROM topic_pattern_category WHERE execution_id = :eid " "ORDER BY source_type, level, id" ), {"eid": eid}, ).mappings().fetchall() return [dict(r) for r in rows] finally: session.close() def load_scores(eid: int) -> tuple[dict[str, dict[str, dict]], list[str]]: """读取 data/{eid}/*_分类.json,返回 (scores_data, dim_names)。 scores_data: {source_type: {category_path: {dim_name: float, post_ids_count: int}}} """ data_dir = Path(__file__).resolve().parent / "data" / str(eid) result: dict[str, dict[str, dict]] = {} dims_found: set[str] = set() for st in SOURCE_TYPES: fpath = data_dir / f"{st}_分类.json" if not fpath.exists(): continue with open(fpath, encoding="utf-8") as f: items = json.load(f) lookup: dict[str, dict] = {} for item in items: cp = item.get("category_path", "") if not cp: continue entry: dict = {"post_ids_count": item.get("post_ids_count", 0)} for json_key, dim_name in SCORE_DIMS.items(): if json_key in item: entry[dim_name] = item[json_key] dims_found.add(dim_name) lookup[cp] = entry result[st] = lookup return result, sorted(dims_found) def build_trees(categories: list[dict]) -> dict[str, list[CatNode]]: nodes: dict[int, CatNode] = {} for c in categories: n = CatNode( id=c["id"], name=c["name"], source_stable_id=c.get("source_stable_id"), source_type=c["source_type"], description=c.get("description"), level=c.get("level"), parent_id=c.get("parent_id"), element_count=c.get("element_count") or 0, ) nodes[n.id] = n roots: dict[str, list[CatNode]] = {} for n in nodes.values(): if n.parent_id and n.parent_id in nodes: nodes[n.parent_id].children.append(n) else: roots.setdefault(n.source_type, []).append(n) for n in nodes.values(): n.children.sort(key=lambda x: x.id) return roots # --------------------------------------------------------------------------- # HTML rendering # --------------------------------------------------------------------------- def _node_html( n: CatNode, th: dict, depth: int = 0, parent_path: str = "", scores: dict[str, dict] | None = None, max_score: float = 1.0, default_dim: str = "rov", ) -> str: has_ch = bool(n.children) ec = n.element_count or 0 cc = len(n.children) cur_path = n.name if not parent_path else f"{parent_path}>{n.name}" info = scores.get(cur_path) if scores else None score_val = info.get(default_dim) if info else None pcount = info.get("post_ids_count") if info else None if score_val is not None: t = _normalize_score(score_val, max_score) if score_val > 0 else 0.0 rgb = _blend(th["low"], th["high"], t) bg = _rgb_hex(rgb) lum = _luminance(*rgb) tc = "#fff" if lum < 0.55 else "#4a3520" else: bg = th["no_score"] tc = "#6b5240" parts: list[str] = [] if has_ch: parts.append('\u25BC') parts.append(f'{html_lib.escape(n.name)}') if n.source_stable_id is not None: parts.append(f'{n.source_stable_id}') # .sc 始终存在(JS 切换维度时需要更新),无分数时隐藏 if score_val is not None: parts.append(f'{score_val:.2f}') else: parts.append('') if ec: parts.append(f'{ec}') if pcount is not None: parts.append(f'{pcount}p') if cc: parts.append(f'{cc}\u25B6') onclick = ' onclick="tog(this)"' if has_ch else "" cls = "b e" if has_ch else "b" title_attr = f' title="{html_lib.escape(n.description)}"' if n.description else "" esc = html_lib.escape data_attr = f' data-path="{esc(cur_path)}" data-st="{esc(n.source_type)}"' h = f'