from __future__ import annotations import html as html_lib import json import math from dataclasses import dataclass, field from pathlib import Path from sqlalchemy import text from examples.demand.db_manager import DatabaseManager db = DatabaseManager() SOURCE_TYPES = ["实质", "形式", "意图"] # JSON field → 展示维度名(扩展新维度只需在此添加) SCORE_DIMS: dict[str, str] = {"score": "rov"} THEME = { "实质": { "root_bg": "#e8841a", "line": "#d4a574", "low": (253, 243, 228), "high": (195, 80, 5), "no_score": "#f5e6d3", }, "形式": { "root_bg": "#5b9bd5", "line": "#8cb9dc", "low": (232, 244, 253), "high": (30, 90, 165), "no_score": "#d6e9f8", }, "意图": { "root_bg": "#70ad47", "line": "#8fc270", "low": (235, 249, 225), "high": (40, 120, 20), "no_score": "#dbefd0", }, } # --------------------------------------------------------------------------- # Color helpers (Python 侧用于首次渲染;JS 侧有等价实现用于切换) # --------------------------------------------------------------------------- def _luminance(r: int, g: int, b: int) -> float: return 0.299 * r / 255 + 0.587 * g / 255 + 0.114 * b / 255 def _blend(low: tuple, high: tuple, t: float) -> tuple[int, int, int]: t = max(0.0, min(1.0, t)) return tuple(int(lo + (hi - lo) * t) for lo, hi in zip(low, high)) def _rgb_hex(rgb: tuple) -> str: return f"#{rgb[0]:02x}{rgb[1]:02x}{rgb[2]:02x}" def _normalize_score(score: float, max_score: float) -> float: if max_score <= 0 or score <= 0: return 0.0 return math.log1p(score) / math.log1p(max_score) # --------------------------------------------------------------------------- # Data model # --------------------------------------------------------------------------- @dataclass class CatNode: id: int name: str source_stable_id: int | None source_type: str description: str | None level: int | None parent_id: int | None element_count: int children: list[CatNode] = field(default_factory=list) @property def subtree_element_count(self) -> int: total = self.element_count or 0 for c in self.children: total += c.subtree_element_count return total # --------------------------------------------------------------------------- # Data loading # --------------------------------------------------------------------------- def fetch_categories(eid: int) -> list[dict]: session = db.get_session() try: rows = session.execute( text( "SELECT id, source_stable_id, source_type, name, description, " "level, parent_id, element_count " "FROM topic_pattern_category WHERE execution_id = :eid " "ORDER BY source_type, level, id" ), {"eid": eid}, ).mappings().fetchall() return [dict(r) for r in rows] finally: session.close() def load_scores(eid: int) -> tuple[dict[str, dict[str, dict]], list[str]]: """读取 data/{eid}/*_分类.json,返回 (scores_data, dim_names)。 scores_data: {source_type: {category_path: {dim_name: float, post_ids_count: int}}} """ data_dir = Path(__file__).resolve().parent / "data" / str(eid) result: dict[str, dict[str, dict]] = {} dims_found: set[str] = set() for st in SOURCE_TYPES: fpath = data_dir / f"{st}_分类.json" if not fpath.exists(): continue with open(fpath, encoding="utf-8") as f: items = json.load(f) lookup: dict[str, dict] = {} for item in items: cp = item.get("category_path", "") if not cp: continue entry: dict = {"post_ids_count": item.get("post_ids_count", 0)} for json_key, dim_name in SCORE_DIMS.items(): if json_key in item: entry[dim_name] = item[json_key] dims_found.add(dim_name) lookup[cp] = entry result[st] = lookup return result, sorted(dims_found) def build_trees(categories: list[dict]) -> dict[str, list[CatNode]]: nodes: dict[int, CatNode] = {} for c in categories: n = CatNode( id=c["id"], name=c["name"], source_stable_id=c.get("source_stable_id"), source_type=c["source_type"], description=c.get("description"), level=c.get("level"), parent_id=c.get("parent_id"), element_count=c.get("element_count") or 0, ) nodes[n.id] = n roots: dict[str, list[CatNode]] = {} for n in nodes.values(): if n.parent_id and n.parent_id in nodes: nodes[n.parent_id].children.append(n) else: roots.setdefault(n.source_type, []).append(n) for n in nodes.values(): n.children.sort(key=lambda x: x.id) return roots # --------------------------------------------------------------------------- # HTML rendering # --------------------------------------------------------------------------- def _node_html( n: CatNode, th: dict, depth: int = 0, parent_path: str = "", scores: dict[str, dict] | None = None, max_score: float = 1.0, default_dim: str = "rov", ) -> str: has_ch = bool(n.children) ec = n.element_count or 0 cc = len(n.children) cur_path = n.name if not parent_path else f"{parent_path}>{n.name}" info = scores.get(cur_path) if scores else None score_val = info.get(default_dim) if info else None pcount = info.get("post_ids_count") if info else None if score_val is not None: t = _normalize_score(score_val, max_score) if score_val > 0 else 0.0 rgb = _blend(th["low"], th["high"], t) bg = _rgb_hex(rgb) lum = _luminance(*rgb) tc = "#fff" if lum < 0.55 else "#4a3520" else: bg = th["no_score"] tc = "#6b5240" parts: list[str] = [] if has_ch: parts.append('\u25BC') parts.append(f'{html_lib.escape(n.name)}') if n.source_stable_id is not None: parts.append(f'{n.source_stable_id}') # .sc 始终存在(JS 切换维度时需要更新),无分数时隐藏 if score_val is not None: parts.append(f'{score_val:.2f}') else: parts.append('') if ec: parts.append(f'{ec}') if pcount is not None: parts.append(f'{pcount}p') if cc: parts.append(f'{cc}\u25B6') onclick = ' onclick="tog(this)"' if has_ch else "" cls = "b e" if has_ch else "b" title_attr = f' title="{html_lib.escape(n.description)}"' if n.description else "" esc = html_lib.escape data_attr = f' data-path="{esc(cur_path)}" data-st="{esc(n.source_type)}"' h = f'
' h += f'
' h += "".join(parts) h += "
" if has_ch: h += f'
' for child in n.children: h += _node_html(child, th, depth + 1, cur_path, scores, max_score, default_dim) h += "
" h += "
" return h def _section_html( source_type: str, roots: list[CatNode], scores: dict[str, dict] | None, max_score: float, default_dim: str = "rov", ) -> str: th = THEME.get(source_type, THEME["实质"]) total = sum(r.subtree_element_count for r in roots) h = '
' h += ( f'
' f"\u25BC {html_lib.escape(source_type)} ({total})
" ) h += '
' for r in roots: h += _node_html(r, th, depth=0, parent_path="", scores=scores, max_score=max_score, default_dim=default_dim) h += "
" return h def generate_tree_html(eid: int) -> str: categories = fetch_categories(eid) trees = build_trees(categories) all_scores, dims = load_scores(eid) # 每个维度、每个 source_type 的最大分数(用于归一化) max_scores: dict[str, dict[str, float]] = {} for dim in dims: max_scores[dim] = {} for st in SOURCE_TYPES: st_scores = all_scores.get(st, {}) mx = max((v.get(dim, 0) for v in st_scores.values()), default=0.0) max_scores[dim][st] = mx default_dim = dims[0] if dims else "rov" sections: list[str] = [] for st in SOURCE_TYPES: if st not in trees: continue st_scores = all_scores.get(st, {}) mx = max_scores.get(default_dim, {}).get(st, 1.0) or 1.0 sections.append(_section_html(st, trees[st], st_scores, mx, default_dim)) body = "\n".join(sections) # 维度按钮 dim_btns = "" for i, d in enumerate(dims): active = " active" if i == 0 else "" dim_btns += ( f'" ) if not dim_btns: dim_btns = '无得分数据' def _safe_json(obj: object) -> str: return json.dumps(obj, ensure_ascii=False).replace(" 分类树 · execution_id={{EID}}

选题模式分类树

execution_id = {{EID}} 低分 高分
得分维度 {{DIM_BTNS}}
{{BODY}} """ # --------------------------------------------------------------------------- # Entry # --------------------------------------------------------------------------- if __name__ == "__main__": execution_id = 58 html_content = generate_tree_html(execution_id) out = Path(__file__).resolve().parent / "new_result" / f"topic_pattern_tree_{execution_id}.html" out.parent.mkdir(parents=True, exist_ok=True) out.write_text(html_content, encoding="utf-8") print(f"已生成: {out}")