| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """plan_tool.py — procedure-dsl 的「计划」内置工具 (技能本地, 仅 Cyber/run_cyber 引擎)。
- 为什么是工具而不是"写 understanding.md":
- 弱模型读了"先想清楚工序"的 prompt 也照样跳过、直接堆骨架。把这一步做成**工具**,
- LLM 必须**调用一次**把【工序分解 + 每工序步骤逐条展开 + 每工序覆盖原文哪些章节】
- 作为结构化参数交上来 —— 工具当场校验完整性 (混合门禁), 通过就**自动据计划生成
- workflow.json 骨架**, 结果回灌进对话锚定后续填充。于是:
- · understanding 重新独立成一步 (但由工具承载, 不靠自觉)
- · 工序由计划显式决定 (章节认领是声明式, 不再靠词汇模糊匹配)
- · workflow.json 结构严格按计划生成 (工序数/步骤数/顺序锁定, LLM 只填值)
- 设计边界 (遵守"runner 零业务知识"): 本模块是**技能本地**, 不进 agent/tools/builtin/,
- run_cyber 仅 `import plan_tool` 触发 @tool 注册 + 调 set_plan_context 注入原文/路径。
- Claude SDK 路 (run_procedure_dsl) 没有 repo 工具系统, 那条路 understanding 仍走 prompt。
- """
- # ⚠ 不能用 `from __future__ import annotations`: 它把类型注解变成字符串, 而框架
- # SchemaGenerator 靠运行时 get_origin() 内省真实类型 — 拿字符串会把 procedures 这种
- # List[Dict] 退化成 "string", LLM 就不知道该传数组了。
- import json
- import re
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- from agent.tools import tool, ToolResult, ToolContext
- # ===========================================================================
- # 运行时上下文 (run_cyber.main() 在跑 runner 前注入; 工具闭包读它, 避免让 LLM 传路径)
- # ===========================================================================
- _PLAN_CTX: Dict[str, Any] = {}
- def set_plan_context(*, body_text: str = "", out_dir: Optional[Path] = None,
- case_id: Any = None, source: Optional[dict] = None,
- ocr: str = "") -> None:
- """run_cyber 在执行前调用, 把原文正文 / 配图 OCR / 输出目录 / source 元信息交给工具。"""
- _PLAN_CTX.clear()
- _PLAN_CTX.update({
- "body_text": body_text or "",
- "ocr": ocr or "",
- "out_dir": Path(out_dir) if out_dir else None,
- "case_id": case_id,
- "source": source or {},
- })
- # ===========================================================================
- # 原文章节解析 (与 lint-case.py 同口径: 行首 0N| 标号, 排除 "图 0N|" 配图说明)
- # ===========================================================================
- _SEC_RE = re.compile(r'(?m)^\s*(0\d)\s*[||]')
- def _source_sections(body: str) -> List[tuple]:
- """返回 [(章节号, 标题)],如 [('01','从一个案例开始'), ('02','结构化 Prompt 框架'), ...]。"""
- marks = [(m.start(), m.group(1)) for m in _SEC_RE.finditer(body or "")]
- out: List[tuple] = []
- for i, (pos, num) in enumerate(marks):
- end = marks[i + 1][0] if i + 1 < len(marks) else len(body)
- seg = body[pos:end]
- after = re.split(r'[||]', seg, 1)
- tail = after[-1] if len(after) > 1 else seg
- title = ""
- for ln in tail.splitlines():
- ln = ln.strip()
- if ln:
- title = ln[:24]
- break
- out.append((num, title))
- return out
- def _norm_sec(s: Any) -> str:
- """把 '1' / '01' / '第1章' 之类归一成两位章节号 '01'。无数字返回原串。"""
- m = re.search(r'\d+', str(s))
- return f"0{int(m.group())}"[-2:] if m and int(m.group()) < 10 else (m.group() if m else str(s).strip())
- # ===========================================================================
- # 入参容错 (弱模型常把 list 传成 JSON 字符串 / 包一层 {"procedures": [...]})
- # ===========================================================================
- def _coerce_procedures(procedures: Any) -> List[dict]:
- if isinstance(procedures, str):
- try:
- procedures = json.loads(procedures)
- except Exception:
- return []
- if isinstance(procedures, dict):
- procedures = procedures.get("procedures", procedures.get("list", []))
- return procedures if isinstance(procedures, list) else []
- # ===========================================================================
- # 据计划生成 workflow.json 骨架
- # ===========================================================================
- def _build_skeleton(summary: str, procedures: List[dict]) -> dict:
- src = _PLAN_CTX.get("source") or {}
- skeleton: Dict[str, Any] = {
- "source": {
- "platform": src.get("platform", ""),
- "author": src.get("author", ""),
- "date": src.get("date", ""),
- "url": src.get("url", ""),
- "title": src.get("title", ""),
- "excerpt": src.get("excerpt", "") or (summary or "")[:120],
- },
- "procedures": [],
- }
- for pi, proc in enumerate(procedures, 1):
- pid = f"p{pi}"
- steps_out: List[dict] = []
- for si, st in enumerate(proc.get("steps") or [], 1):
- if not isinstance(st, dict):
- continue
- sid = f"s{si}"
- in_label = (st.get("input") or "").strip()
- out_label = (st.get("output") or "").strip()
- step: Dict[str, Any] = {
- "id": sid,
- "kind": "step",
- "via": (st.get("tool") or "").strip(),
- "directive": "", # 待 LLM 用 wf-patch 填真实 prompt
- "inputs": ([{"type": in_label[:40], "value": "", "anchor": ""}]
- if in_label else []),
- "outputs": ([{"id": f"{sid}o1", "type": out_label[:40], "value": "", "anchor": ""}]
- if out_label else []),
- }
- steps_out.append(step)
- skeleton["procedures"].append({
- "id": pid,
- "name": (proc.get("name") or "").strip(),
- "purpose": (proc.get("final_product") or proc.get("purpose") or "").strip(),
- "category": (proc.get("category") or "").strip(),
- "platform": src.get("platform", ""),
- "author": src.get("author", ""),
- "declarations": {"inputs": [], "resources": [], "returns": {}},
- "steps": steps_out,
- })
- return skeleton
- # ===========================================================================
- # 工具本体
- # ===========================================================================
- _PLAN_DESC = (
- "【第一步必做·只调用一次】提交你对这篇教程的工序计划 (understanding)。读完原文+配图后, "
- "把它拆成若干工序、每个工序的步骤逐条展开、并声明每个工序覆盖原文哪些 0N 章节。"
- "工具会校验完整性 (有章节没被任何工序认领 → 报错让你补全; 工序只有单步 → 警告), "
- "通过后**自动据此生成 workflow.json 骨架** (工序/步骤/顺序锁定), 你之后只在骨架上填 "
- "value/directive/anchor, 不再增删工序或步骤。\n"
- "procedures 每项形如:\n"
- ' {"name":"工序名", "category":"产物创造|资产建设|自动化|分析|学习", '
- '"final_product":"终态产物", "source_sections":["01","03"], '
- '"steps":[{"tool":"工具名/human", "input":"输入数据的短标签(如 提示词)", '
- '"does":"这步做什么的一句话自由描述", "output":"产出的短标签(如 场景图)"}, ...]}\n'
- "要点: source_sections 必填 (原文每个 0N 章节都要被某个工序认领, 别漏整段); "
- "steps 逐步展开别压成单步; input/output 写**短标签**(它们会变成步骤输入/输出的 type); "
- "does 只是计划期的自由描述, **不是** workflow 的 taxonomy 动作词 (那留到 Phase 2 查 action.json 归类)。"
- )
- @tool(description=_PLAN_DESC, hidden_params=["context"], groups=["core"])
- async def plan_procedures(
- summary: str,
- procedures: List[Dict[str, Any]],
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """提交工序计划 → 校验 → 自动生成 workflow.json 骨架。
- Args:
- summary: 2-4 句说清这篇在教什么、分几大板块。
- procedures: 工序列表 (结构见工具描述)。
- """
- procs = _coerce_procedures(procedures)
- if not procs:
- return ToolResult(
- title="计划为空",
- output="procedures 解析为空。请传一个工序数组, 每项含 name/category/"
- "final_product/source_sections/steps (steps 逐步展开)。",
- error="empty plan",
- )
- body = _PLAN_CTX.get("body_text", "")
- secs = _source_sections(body)
- present = {num for num, _ in secs}
- sec_title = dict(secs)
- # ---- 收集声明 + 基础校验 ----
- claimed: set = set()
- warnings: List[str] = []
- for pi, proc in enumerate(procs, 1):
- if not isinstance(proc, dict):
- warnings.append(f"procedures[{pi-1}] 不是对象, 已跳过")
- continue
- name = (proc.get("name") or "").strip() or f"(工序{pi})"
- steps = proc.get("steps") or []
- for s in (proc.get("source_sections") or []):
- claimed.add(_norm_sec(s))
- if not steps:
- warnings.append(f"工序『{name}』没有 steps — 至少要有步骤序列")
- elif len(steps) == 1:
- warnings.append(f"工序『{name}』只有 1 个步骤 — 确认它真的无法再展开? (多数工序是多步)")
- for sj, st in enumerate(steps, 1):
- if not isinstance(st, dict):
- continue
- miss = [k for k in ("tool", "input", "does", "output") if not (st.get(k) or "").strip()]
- if miss:
- warnings.append(f"工序『{name}』step{sj} 缺 {'/'.join(miss)} (四要素: 工具 tool·输入 input·动作 does·输出 output)")
- # ---- 硬门禁: 章节覆盖 (只在原文确有 0N 分章时启用) ----
- if present:
- unclaimed = sorted(present - claimed)
- if unclaimed:
- lines = "\n".join(f" · 章节 {n} 『{sec_title.get(n,'')}』" for n in unclaimed)
- return ToolResult(
- title=f"计划漏了 {len(unclaimed)} 个章节",
- output=(
- f"原文有 {len(present)} 个章节, 你的计划只认领了 {sorted(claimed) or '无'}。"
- f"下面这些章节**没有任何工序认领**, 极可能被整段漏抽:\n{lines}\n\n"
- f"请逐个判断: 它是一条独立工序 (有自己的做法/产物), 还是某工序的若干步骤? "
- f"想清楚后**重新调用 plan_procedures**, 让每个工序的 source_sections 把这些章节都覆盖上。"
- f"(无独立做法、纯展示的章节可以并进相邻工序的 steps, 但要在某工序的 source_sections 里出现。)"
- ),
- error=f"unclaimed sections: {','.join(unclaimed)}",
- )
- bogus = sorted(claimed - present - {""})
- if bogus:
- warnings.append(f"source_sections 里 {bogus} 在原文里找不到对应 0N 章节 (写错章节号?)")
- # ---- 通过: 生成骨架 + 落盘 ----
- skeleton = _build_skeleton(summary, procs)
- out_dir: Optional[Path] = _PLAN_CTX.get("out_dir")
- nproc = len(skeleton["procedures"])
- nstep = sum(len(p["steps"]) for p in skeleton["procedures"])
- written = []
- if out_dir:
- scratch = out_dir / "_scratch"
- scratch.mkdir(parents=True, exist_ok=True)
- wf_path = out_dir / "workflow.json"
- wf_path.write_text(json.dumps(skeleton, ensure_ascii=False, indent=2), encoding="utf-8")
- written.append(wf_path.as_posix())
- # 计划原文留档 (供审计 workflow 是否仍按计划; 也方便你回看)
- plan_path = scratch / "understanding.json"
- plan_path.write_text(json.dumps(
- {"summary": summary, "procedures": procs}, ensure_ascii=False, indent=2), encoding="utf-8")
- written.append(plan_path.as_posix())
- # ---- 回灌进对话: 计划摘要 + 骨架结构 + 下一步指令 ----
- recap = [f"✅ 计划已通过校验, 已据此生成 workflow.json 骨架: {nproc} 工序 / {nstep} 步。"]
- if present:
- recap.append(f"章节覆盖: 原文 {sorted(present)} 全部被认领 ✓")
- recap.append("")
- for p in skeleton["procedures"]:
- recap.append(f"【{p['id']}】{p['name']} ({p['category']}) — {len(p['steps'])} 步")
- for st in p["steps"]:
- ins = st["inputs"][0]["type"] if st["inputs"] else "—"
- outs = st["outputs"][0]["type"] if st["outputs"] else "—"
- recap.append(f" {st['id']} via={st['via'] or '—'}: [{ins}] → [{outs}]")
- if warnings:
- recap.append("")
- recap.append("⚠ 警告 (不阻塞, 但请核对):")
- for w in warnings:
- recap.append(f" - {w}")
- recap.append("")
- recap.append(
- "下一步 (在已生成的骨架上, 别增删工序/步骤):\n"
- " 1. 逐步用 wf-patch.py 填 inputs/outputs 的 value(文本类用 @quote 拽**完整逐字**原文)、"
- "anchor(数据流 ← / →); **提示词建成 type=提示词 的输入/输出 value(整段逐字), 别塞 directive**"
- "(directive 只放严格反推/比例 2:3 这类元指令, 多数生图步留空);\n"
- " 2. 填完后**必须跑 `python spec/tools/verify-io.py --workflow <wf> --source <原文> --ocr <ocr>`**: "
- "它校验文本 IO 的 value 逐字、生成步有没有 type=提示词 输入、提示词 value 完不完整、declarations; "
- "报问题就修(此时可重读原文, 提示词用 @quote 提全), 修完重跑直到通过, 才进 Phase 2;\n"
- " 3. type 现在是描述性短标签, Phase 2 再归一化到词表 + 补 effect/action/substance/form/intent;\n"
- " 4. 结构已按计划锁定 —— 若发现确实要加工序/步骤, 说明计划不完整, 重新调 plan_procedures。"
- )
- return ToolResult(
- title=f"计划通过: {nproc} 工序 / {nstep} 步",
- output="\n".join(recap),
- metadata={"procedures": nproc, "steps": nstep,
- "sections_present": sorted(present), "written": written,
- "warnings": len(warnings)},
- )
|