| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- #!/usr/bin/env python3
- """
- 内容树查询 CLI(execution_id=56)
- 供 LLM 通过 Bash 调用。所有子命令输出 JSON 到 stdout。
- 默认只返回较高层级 + 后代统计;可按 id 拉子树或单节点;可关键词搜索。
- 子命令:
- overview 顶层概览:实质 / 形式两根 + 各自二级类,含后代数量与元素数量
- subtree <id> [--depth N] 以 id 为根的子树,默认 depth=2,最大 depth=4
- node <id> [--with-elements] 某个节点详情;--with-elements 时附该节点直接 elements
- elements <id> 某个分类的全部 elements(去重 distinct)
- search <text> [--source 实质|形式|both] [--limit N]
- 在 path / name / description / element_name 中模糊匹配,默认 limit=15
- """
- from __future__ import annotations
- import argparse
- import json
- import re
- import sys
- from pathlib import Path
- DEFAULT_TREE = Path(__file__).resolve().parent / "category_tree_56.json"
- def load_tree(path: Path = DEFAULT_TREE) -> tuple[dict[int, dict], dict[int, list[int]]]:
- raw = json.loads(path.read_text(encoding="utf-8"))
- nodes_by_id: dict[int, dict] = {c["id"]: c for c in raw.get("categories", []) if "id" in c}
- children: dict[int, list[int]] = {}
- for c in nodes_by_id.values():
- pid = c.get("parent_id") or 0
- children.setdefault(pid, []).append(c["id"])
- for arr in children.values():
- arr.sort(key=lambda i: (nodes_by_id[i].get("path") or ""))
- return nodes_by_id, children
- def descendant_stats(node_id: int, children: dict[int, list[int]], nodes: dict[int, dict]) -> dict:
- """递归统计后代分类数 + distinct element 总和。"""
- direct = children.get(node_id, [])
- total_cats = 0
- total_elements = 0
- stack = list(direct)
- while stack:
- cid = stack.pop()
- total_cats += 1
- n = nodes.get(cid)
- if n:
- total_elements += len(n.get("elements") or [])
- stack.extend(children.get(cid, []))
- return {"descendant_categories": total_cats, "descendant_elements": total_elements}
- def thin_node(n: dict, *, with_elements: bool = False) -> dict:
- out = {
- "id": n.get("id"),
- "name": n.get("name"),
- "path": n.get("path"),
- "level": n.get("level"),
- "source_type": n.get("source_type"),
- "description": n.get("description"),
- "self_element_count": len(n.get("elements") or []),
- }
- if with_elements:
- out["elements"] = [
- {"name": e.get("name"), "post_count": e.get("count") or e.get("post_count")}
- for e in (n.get("elements") or [])
- ]
- return out
- def cmd_overview(nodes: dict[int, dict], children: dict[int, list[int]]) -> dict:
- roots = [n for n in nodes.values() if n.get("source_type") in ("实质", "形式") and n.get("level") == 1]
- out = {"roots": []}
- for r in sorted(roots, key=lambda n: (n.get("source_type"), n.get("name"))):
- rid = r["id"]
- kids = []
- for kid in children.get(rid, []):
- ck = nodes[kid]
- stats = descendant_stats(kid, children, nodes)
- kids.append({
- **thin_node(ck),
- **stats,
- })
- stats = descendant_stats(rid, children, nodes)
- out["roots"].append({
- **thin_node(r),
- **stats,
- "children": kids,
- })
- out["hint"] = "use `subtree <id>` to drill in, `search <text>` to keyword-find, `elements <id>` to list distinct elements of a category"
- return out
- def collect_subtree(node_id: int, depth: int, max_depth: int, nodes: dict[int, dict], children: dict[int, list[int]]) -> dict | None:
- n = nodes.get(node_id)
- if n is None:
- return None
- out: dict = thin_node(n)
- if depth < max_depth:
- out["children"] = [
- c for c in (
- collect_subtree(kid, depth + 1, max_depth, nodes, children)
- for kid in children.get(node_id, [])
- ) if c is not None
- ]
- if not out["children"]:
- out.pop("children")
- else:
- kids = children.get(node_id, [])
- if kids:
- out["children_truncated"] = [
- {"id": kid, "name": nodes[kid].get("name"), "path": nodes[kid].get("path")}
- for kid in kids
- ]
- return out
- def cmd_subtree(nodes: dict[int, dict], children: dict[int, list[int]], node_id: int, depth: int) -> dict:
- depth = max(1, min(depth, 4))
- sub = collect_subtree(node_id, 1, depth, nodes, children)
- if sub is None:
- return {"error": f"node {node_id} not found"}
- return sub
- def cmd_node(nodes: dict[int, dict], children: dict[int, list[int]], node_id: int, with_elements: bool) -> dict:
- n = nodes.get(node_id)
- if n is None:
- return {"error": f"node {node_id} not found"}
- out = thin_node(n, with_elements=with_elements)
- parent_id = n.get("parent_id") or 0
- if parent_id and parent_id in nodes:
- out["parent"] = thin_node(nodes[parent_id])
- out["children"] = [thin_node(nodes[kid]) for kid in children.get(node_id, [])]
- out["descendant_stats"] = descendant_stats(node_id, children, nodes)
- return out
- def cmd_elements(nodes: dict[int, dict], node_id: int) -> dict:
- n = nodes.get(node_id)
- if n is None:
- return {"error": f"node {node_id} not found"}
- elems = n.get("elements") or []
- return {
- "id": node_id,
- "path": n.get("path"),
- "source_type": n.get("source_type"),
- "count": len(elems),
- "elements": [
- {"name": e.get("name"), "post_count": e.get("count") or e.get("post_count")}
- for e in elems
- ],
- }
- def cmd_search(nodes: dict[int, dict], text: str, source: str, limit: int) -> dict:
- text = text.strip()
- if not text:
- return {"error": "empty query"}
- pat = re.compile(re.escape(text), re.IGNORECASE)
- cat_hits: list[dict] = []
- elem_hits: list[dict] = []
- for n in nodes.values():
- st = n.get("source_type")
- if source != "both" and st != source:
- continue
- if st not in ("实质", "形式"):
- continue
- score = 0
- if pat.search(n.get("name") or ""):
- score += 3
- if pat.search(n.get("path") or ""):
- score += 2
- if pat.search(n.get("description") or ""):
- score += 1
- if score:
- cat_hits.append({**thin_node(n), "score": score})
- for e in n.get("elements") or []:
- ename = e.get("name") or ""
- if pat.search(ename):
- elem_hits.append({
- "category_id": n["id"],
- "category_path": n.get("path"),
- "source_type": st,
- "element": ename,
- "post_count": e.get("count") or e.get("post_count"),
- })
- cat_hits.sort(key=lambda x: -x["score"])
- return {
- "query": text,
- "categories": cat_hits[:limit],
- "elements": elem_hits[:limit],
- "truncated_categories": max(0, len(cat_hits) - limit),
- "truncated_elements": max(0, len(elem_hits) - limit),
- }
- def main() -> int:
- ap = argparse.ArgumentParser()
- sub = ap.add_subparsers(dest="cmd", required=True)
- sub.add_parser("overview")
- s = sub.add_parser("subtree")
- s.add_argument("id", type=int)
- s.add_argument("--depth", type=int, default=2)
- s = sub.add_parser("node")
- s.add_argument("id", type=int)
- s.add_argument("--with-elements", action="store_true")
- s = sub.add_parser("elements")
- s.add_argument("id", type=int)
- s = sub.add_parser("search")
- s.add_argument("text")
- s.add_argument("--source", choices=["实质", "形式", "both"], default="both")
- s.add_argument("--limit", type=int, default=15)
- args = ap.parse_args()
- nodes, children = load_tree()
- if args.cmd == "overview":
- out = cmd_overview(nodes, children)
- elif args.cmd == "subtree":
- out = cmd_subtree(nodes, children, args.id, args.depth)
- elif args.cmd == "node":
- out = cmd_node(nodes, children, args.id, args.with_elements)
- elif args.cmd == "elements":
- out = cmd_elements(nodes, args.id)
- elif args.cmd == "search":
- out = cmd_search(nodes, args.text, args.source, args.limit)
- else:
- return 2
- json.dump(out, sys.stdout, ensure_ascii=False, indent=2)
- sys.stdout.write("\n")
- return 0
- if __name__ == "__main__":
- sys.exit(main())
|