#!/usr/bin/env python3 """ 内容树查询 CLI(execution_id=56) 供 LLM 通过 Bash 调用。所有子命令输出 JSON 到 stdout。 默认只返回较高层级 + 后代统计;可按 id 拉子树或单节点;可关键词搜索。 子命令: overview 顶层概览:实质 / 形式两根 + 各自二级类,含后代数量与元素数量 subtree [--depth N] 以 id 为根的子树,默认 depth=2,最大 depth=4 node [--with-elements] 某个节点详情;--with-elements 时附该节点直接 elements elements 某个分类的全部 elements(去重 distinct) search [--source 实质|形式|both] [--limit N] 在 path / name / description / element_name 中模糊匹配,默认 limit=15 """ from __future__ import annotations import argparse import json import re import sys from pathlib import Path DEFAULT_TREE = Path(__file__).resolve().parent / "category_tree_56.json" def load_tree(path: Path = DEFAULT_TREE) -> tuple[dict[int, dict], dict[int, list[int]]]: raw = json.loads(path.read_text(encoding="utf-8")) nodes_by_id: dict[int, dict] = {c["id"]: c for c in raw.get("categories", []) if "id" in c} children: dict[int, list[int]] = {} for c in nodes_by_id.values(): pid = c.get("parent_id") or 0 children.setdefault(pid, []).append(c["id"]) for arr in children.values(): arr.sort(key=lambda i: (nodes_by_id[i].get("path") or "")) return nodes_by_id, children def descendant_stats(node_id: int, children: dict[int, list[int]], nodes: dict[int, dict]) -> dict: """递归统计后代分类数 + distinct element 总和。""" direct = children.get(node_id, []) total_cats = 0 total_elements = 0 stack = list(direct) while stack: cid = stack.pop() total_cats += 1 n = nodes.get(cid) if n: total_elements += len(n.get("elements") or []) stack.extend(children.get(cid, [])) return {"descendant_categories": total_cats, "descendant_elements": total_elements} def thin_node(n: dict, *, with_elements: bool = False) -> dict: out = { "id": n.get("id"), "name": n.get("name"), "path": n.get("path"), "level": n.get("level"), "source_type": n.get("source_type"), "description": n.get("description"), "self_element_count": len(n.get("elements") or []), } if with_elements: out["elements"] = [ {"name": e.get("name"), "post_count": e.get("count") or e.get("post_count")} for e in (n.get("elements") or []) ] return out def cmd_overview(nodes: dict[int, dict], children: dict[int, list[int]]) -> dict: roots = [n for n in nodes.values() if n.get("source_type") in ("实质", "形式") and n.get("level") == 1] out = {"roots": []} for r in sorted(roots, key=lambda n: (n.get("source_type"), n.get("name"))): rid = r["id"] kids = [] for kid in children.get(rid, []): ck = nodes[kid] stats = descendant_stats(kid, children, nodes) kids.append({ **thin_node(ck), **stats, }) stats = descendant_stats(rid, children, nodes) out["roots"].append({ **thin_node(r), **stats, "children": kids, }) out["hint"] = "use `subtree ` to drill in, `search ` to keyword-find, `elements ` to list distinct elements of a category" return out def collect_subtree(node_id: int, depth: int, max_depth: int, nodes: dict[int, dict], children: dict[int, list[int]]) -> dict | None: n = nodes.get(node_id) if n is None: return None out: dict = thin_node(n) if depth < max_depth: out["children"] = [ c for c in ( collect_subtree(kid, depth + 1, max_depth, nodes, children) for kid in children.get(node_id, []) ) if c is not None ] if not out["children"]: out.pop("children") else: kids = children.get(node_id, []) if kids: out["children_truncated"] = [ {"id": kid, "name": nodes[kid].get("name"), "path": nodes[kid].get("path")} for kid in kids ] return out def cmd_subtree(nodes: dict[int, dict], children: dict[int, list[int]], node_id: int, depth: int) -> dict: depth = max(1, min(depth, 4)) sub = collect_subtree(node_id, 1, depth, nodes, children) if sub is None: return {"error": f"node {node_id} not found"} return sub def cmd_node(nodes: dict[int, dict], children: dict[int, list[int]], node_id: int, with_elements: bool) -> dict: n = nodes.get(node_id) if n is None: return {"error": f"node {node_id} not found"} out = thin_node(n, with_elements=with_elements) parent_id = n.get("parent_id") or 0 if parent_id and parent_id in nodes: out["parent"] = thin_node(nodes[parent_id]) out["children"] = [thin_node(nodes[kid]) for kid in children.get(node_id, [])] out["descendant_stats"] = descendant_stats(node_id, children, nodes) return out def cmd_elements(nodes: dict[int, dict], node_id: int) -> dict: n = nodes.get(node_id) if n is None: return {"error": f"node {node_id} not found"} elems = n.get("elements") or [] return { "id": node_id, "path": n.get("path"), "source_type": n.get("source_type"), "count": len(elems), "elements": [ {"name": e.get("name"), "post_count": e.get("count") or e.get("post_count")} for e in elems ], } def cmd_search(nodes: dict[int, dict], text: str, source: str, limit: int) -> dict: text = text.strip() if not text: return {"error": "empty query"} pat = re.compile(re.escape(text), re.IGNORECASE) cat_hits: list[dict] = [] elem_hits: list[dict] = [] for n in nodes.values(): st = n.get("source_type") if source != "both" and st != source: continue if st not in ("实质", "形式"): continue score = 0 if pat.search(n.get("name") or ""): score += 3 if pat.search(n.get("path") or ""): score += 2 if pat.search(n.get("description") or ""): score += 1 if score: cat_hits.append({**thin_node(n), "score": score}) for e in n.get("elements") or []: ename = e.get("name") or "" if pat.search(ename): elem_hits.append({ "category_id": n["id"], "category_path": n.get("path"), "source_type": st, "element": ename, "post_count": e.get("count") or e.get("post_count"), }) cat_hits.sort(key=lambda x: -x["score"]) return { "query": text, "categories": cat_hits[:limit], "elements": elem_hits[:limit], "truncated_categories": max(0, len(cat_hits) - limit), "truncated_elements": max(0, len(elem_hits) - limit), } def main() -> int: ap = argparse.ArgumentParser() sub = ap.add_subparsers(dest="cmd", required=True) sub.add_parser("overview") s = sub.add_parser("subtree") s.add_argument("id", type=int) s.add_argument("--depth", type=int, default=2) s = sub.add_parser("node") s.add_argument("id", type=int) s.add_argument("--with-elements", action="store_true") s = sub.add_parser("elements") s.add_argument("id", type=int) s = sub.add_parser("search") s.add_argument("text") s.add_argument("--source", choices=["实质", "形式", "both"], default="both") s.add_argument("--limit", type=int, default=15) args = ap.parse_args() nodes, children = load_tree() if args.cmd == "overview": out = cmd_overview(nodes, children) elif args.cmd == "subtree": out = cmd_subtree(nodes, children, args.id, args.depth) elif args.cmd == "node": out = cmd_node(nodes, children, args.id, args.with_elements) elif args.cmd == "elements": out = cmd_elements(nodes, args.id) elif args.cmd == "search": out = cmd_search(nodes, args.text, args.source, args.limit) else: return 2 json.dump(out, sys.stdout, ensure_ascii=False, indent=2) sys.stdout.write("\n") return 0 if __name__ == "__main__": sys.exit(main())