plan_tool.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """plan_tool.py — procedure-dsl 的「计划」内置工具 (技能本地, 仅 Cyber/run_cyber 引擎)。
  4. 为什么是工具而不是"写 understanding.md":
  5. 弱模型读了"先想清楚工序"的 prompt 也照样跳过、直接堆骨架。把这一步做成**工具**,
  6. LLM 必须**调用一次**把【工序分解 + 每工序步骤逐条展开 + 每工序覆盖原文哪些章节】
  7. 作为结构化参数交上来 —— 工具当场校验完整性 (混合门禁), 通过就**自动据计划生成
  8. workflow.json 骨架**, 结果回灌进对话锚定后续填充。于是:
  9. · understanding 重新独立成一步 (但由工具承载, 不靠自觉)
  10. · 工序由计划显式决定 (章节认领是声明式, 不再靠词汇模糊匹配)
  11. · workflow.json 结构严格按计划生成 (工序数/步骤数/顺序锁定, LLM 只填值)
  12. 设计边界 (遵守"runner 零业务知识"): 本模块是**技能本地**, 不进 agent/tools/builtin/,
  13. run_cyber 仅 `import plan_tool` 触发 @tool 注册 + 调 set_plan_context 注入原文/路径。
  14. Claude SDK 路 (run_procedure_dsl) 没有 repo 工具系统, 那条路 understanding 仍走 prompt。
  15. """
  16. # ⚠ 不能用 `from __future__ import annotations`: 它把类型注解变成字符串, 而框架
  17. # SchemaGenerator 靠运行时 get_origin() 内省真实类型 — 拿字符串会把 procedures 这种
  18. # List[Dict] 退化成 "string", LLM 就不知道该传数组了。
  19. import json
  20. import re
  21. from pathlib import Path
  22. from typing import Any, Dict, List, Optional
  23. from agent.tools import tool, ToolResult, ToolContext
  24. # ===========================================================================
  25. # 运行时上下文 (run_cyber.main() 在跑 runner 前注入; 工具闭包读它, 避免让 LLM 传路径)
  26. # ===========================================================================
  27. _PLAN_CTX: Dict[str, Any] = {}
  28. def set_plan_context(*, body_text: str = "", out_dir: Optional[Path] = None,
  29. case_id: Any = None, source: Optional[dict] = None,
  30. ocr: str = "", spec_name: str = "spec") -> None:
  31. """run_cyber 在执行前调用, 把原文正文 / 配图 OCR / 输出目录 / source 元信息交给工具。"""
  32. _PLAN_CTX.clear()
  33. _PLAN_CTX.update({
  34. "body_text": body_text or "",
  35. "ocr": ocr or "",
  36. "out_dir": Path(out_dir) if out_dir else None,
  37. "case_id": case_id,
  38. "source": source or {},
  39. "spec_name": spec_name or "spec",
  40. })
  41. # ===========================================================================
  42. # 原文章节解析 (与 lint-case.py 同口径: 行首 0N| 标号, 排除 "图 0N|" 配图说明)
  43. # ===========================================================================
  44. # 与 lint-case.py 同口径: 行首两位标号 NN| (01..99), 行首要求天然排除 "图 0N|" 配图说明
  45. _SEC_RE = re.compile(r'(?m)^\s*(\d{2})\s*[||]')
  46. def _source_sections(body: str) -> List[tuple]:
  47. """返回 [(章节号, 标题)],如 [('01','从一个案例开始'), ('02','结构化 Prompt 框架'), ...]。"""
  48. marks = [(m.start(), m.group(1)) for m in _SEC_RE.finditer(body or "")]
  49. out: List[tuple] = []
  50. for i, (pos, num) in enumerate(marks):
  51. end = marks[i + 1][0] if i + 1 < len(marks) else len(body)
  52. seg = body[pos:end]
  53. after = re.split(r'[||]', seg, 1)
  54. tail = after[-1] if len(after) > 1 else seg
  55. title = ""
  56. for ln in tail.splitlines():
  57. ln = ln.strip()
  58. if ln:
  59. title = ln[:24]
  60. break
  61. out.append((num, title))
  62. return out
  63. def _norm_sec(s: Any) -> str:
  64. """把 '1' / '01' / '第1章' 之类归一成两位章节号 '01'。无数字返回原串。"""
  65. m = re.search(r'\d+', str(s))
  66. return f"0{int(m.group())}"[-2:] if m and int(m.group()) < 10 else (m.group() if m else str(s).strip())
  67. # ===========================================================================
  68. # 入参容错 (弱模型常把 list 传成 JSON 字符串 / 包一层 {"procedures": [...]})
  69. # ===========================================================================
  70. def _coerce_procedures(procedures: Any) -> List[dict]:
  71. if isinstance(procedures, str):
  72. try:
  73. procedures = json.loads(procedures)
  74. except Exception:
  75. return []
  76. if isinstance(procedures, dict):
  77. procedures = procedures.get("procedures", procedures.get("list", []))
  78. return procedures if isinstance(procedures, list) else []
  79. # ===========================================================================
  80. # 据计划生成 workflow.json 骨架
  81. # ===========================================================================
  82. def _io_labels(st: dict, key: str) -> List[str]:
  83. """取步骤的 input/output 标签, 字符串或数组都收 (生成步典型是 [提示词, 参考图] 两个输入)。"""
  84. v = st.get(key)
  85. if v is None:
  86. v = st.get(key + "s")
  87. if isinstance(v, str):
  88. v = [v]
  89. if not isinstance(v, list):
  90. return []
  91. return [str(x).strip()[:40] for x in v if str(x).strip()]
  92. def _build_skeleton(summary: str, procedures: List[dict]) -> dict:
  93. src = _PLAN_CTX.get("source") or {}
  94. skeleton: Dict[str, Any] = {
  95. "source": {
  96. "platform": src.get("platform", ""),
  97. "author": src.get("author", ""),
  98. "date": src.get("date", ""),
  99. "url": src.get("url", ""),
  100. "title": src.get("title", ""),
  101. "excerpt": src.get("excerpt", "") or (summary or "")[:120],
  102. },
  103. "procedures": [],
  104. }
  105. for pi, proc in enumerate(procedures, 1):
  106. pid = f"p{pi}"
  107. steps_out: List[dict] = []
  108. for si, st in enumerate(proc.get("steps") or [], 1):
  109. if not isinstance(st, dict):
  110. continue
  111. sid = f"s{si}"
  112. ins = _io_labels(st, "input")
  113. outs = _io_labels(st, "output")
  114. tool = (st.get("tool") or "").strip()
  115. loop = (st.get("loop") or "").strip()[:40]
  116. if loop:
  117. # 循环步 → block 父 + nested 子 (README「有循环/并行怎么切」的标准形)。
  118. # block: 输入=被遍历的集合, 输出=结果列表; 子步带真实工具和逐项 IO。
  119. steps_out.append({
  120. "id": sid, "kind": "block", "via": "-", "directive": "",
  121. "inputs": [{"type": loop, "value": "", "anchor": ""}],
  122. "outputs": [{"id": f"{sid}o1",
  123. "type": (outs[0] + "列表")[:40] if outs else "结果列表",
  124. "value": "", "anchor": ""}],
  125. })
  126. steps_out.append({
  127. "id": f"{sid}.1", "kind": "nested", "group": sid, "via": tool,
  128. "directive": "",
  129. "inputs": [{"type": t, "value": "", "anchor": ""} for t in ins],
  130. "outputs": [{"id": f"{sid}.1o{oi}", "type": t, "value": "", "anchor": ""}
  131. for oi, t in enumerate(outs, 1)],
  132. })
  133. continue
  134. steps_out.append({
  135. "id": sid,
  136. "kind": "step",
  137. "via": tool,
  138. "directive": "", # 待 LLM 用 wf-patch 填真实 prompt
  139. "inputs": [{"type": t, "value": "", "anchor": ""} for t in ins],
  140. "outputs": [{"id": f"{sid}o{oi}", "type": t, "value": "", "anchor": ""}
  141. for oi, t in enumerate(outs, 1)],
  142. })
  143. skeleton["procedures"].append({
  144. "id": pid,
  145. "name": (proc.get("name") or "").strip(),
  146. "purpose": (proc.get("final_product") or proc.get("purpose") or "").strip(),
  147. "category": (proc.get("category") or "").strip(),
  148. "platform": src.get("platform", ""),
  149. "author": src.get("author", ""),
  150. "declarations": {"inputs": [], "resources": [], "returns": {}},
  151. "steps": steps_out,
  152. })
  153. return skeleton
  154. # ===========================================================================
  155. # 工具本体
  156. # ===========================================================================
  157. _PLAN_DESC = (
  158. "【第一步必做】提交你对这篇教程的工序计划 (understanding)。读完原文+配图后, "
  159. "把它拆成若干工序、每个工序的步骤逐条展开、并声明每个工序覆盖原文哪些 0N 章节。"
  160. "工具会校验完整性 (有章节没被任何工序认领 → 报错, 修改计划后**重新调用**; 工序只有单步 → 警告), "
  161. "通过后**自动据此生成 workflow.json 骨架** (工序划分/步骤序列锁定), 你之后在骨架上填 "
  162. "value/directive/anchor, 不再增删工序或步骤。\n"
  163. "procedures 每项形如:\n"
  164. ' {"name":"工序名", "category":"产物创造|资产建设|自动化|分析|学习", '
  165. '"final_product":"终态产物", "source_sections":["01","03"], '
  166. '"steps":[{"tool":"工具名/human", "input":["提示词","参考图"], '
  167. '"does":"这步做什么的一句话自由描述", "output":"场景图"}, ...]}\n'
  168. "要点: source_sections 必填 (原文每个 0N 章节都要被某个工序认领, 别漏整段); "
  169. "steps 逐步展开别压成单步; input/output 写**短标签**(字符串或数组——生成步通常要 [提示词, 参考图] "
  170. "两个输入, 都列上, 它们会变成步骤输入/输出的 type); "
  171. '对一批数据逐个重复的步骤加 "loop":"被遍历数据的短标签(如 分镜脚本)", 骨架会自动展开成循环块+子步; '
  172. "does 只是计划期的自由描述, **不是** workflow 的 taxonomy 动作词 (那留到 Phase 2 查 action.json 归类)。"
  173. )
  174. @tool(description=_PLAN_DESC, hidden_params=["context"], groups=["core"])
  175. async def plan_procedures(
  176. summary: str,
  177. procedures: List[Dict[str, Any]],
  178. context: Optional[ToolContext] = None,
  179. ) -> ToolResult:
  180. """提交工序计划 → 校验 → 自动生成 workflow.json 骨架。
  181. Args:
  182. summary: 2-4 句说清这篇在教什么、分几大板块。
  183. procedures: 工序列表 (结构见工具描述)。
  184. """
  185. procs = _coerce_procedures(procedures)
  186. if not procs:
  187. return ToolResult(
  188. title="计划为空",
  189. output="procedures 解析为空。请传一个工序数组, 每项含 name/category/"
  190. "final_product/source_sections/steps (steps 逐步展开)。",
  191. error="empty plan",
  192. )
  193. body = _PLAN_CTX.get("body_text", "")
  194. secs = _source_sections(body)
  195. present = {num for num, _ in secs}
  196. sec_title = dict(secs)
  197. # ---- 收集声明 + 基础校验 ----
  198. claimed: set = set()
  199. warnings: List[str] = []
  200. io_missing: List[str] = [] # 缺 input/output 标签是硬门禁: 骨架会带空 IO 出生, 下游全瞎
  201. for pi, proc in enumerate(procs, 1):
  202. if not isinstance(proc, dict):
  203. warnings.append(f"procedures[{pi-1}] 不是对象, 已跳过")
  204. continue
  205. name = (proc.get("name") or "").strip() or f"(工序{pi})"
  206. steps = proc.get("steps") or []
  207. for s in (proc.get("source_sections") or []):
  208. claimed.add(_norm_sec(s))
  209. if not steps:
  210. warnings.append(f"工序『{name}』没有 steps — 至少要有步骤序列")
  211. elif len(steps) == 1:
  212. warnings.append(f"工序『{name}』只有 1 个步骤 — 确认它真的无法再展开? (多数工序是多步)")
  213. for sj, st in enumerate(steps, 1):
  214. if not isinstance(st, dict):
  215. continue
  216. miss = [k for k in ("tool", "does") if not str(st.get(k) or "").strip()]
  217. if miss:
  218. warnings.append(f"工序『{name}』step{sj} 缺 {'/'.join(miss)} (四要素: 工具 tool·输入 input·动作 does·输出 output)")
  219. io_miss = [k for k in ("input", "output") if not _io_labels(st, k)]
  220. if io_miss:
  221. io_missing.append(f"工序『{name}』step{sj} 缺 {'/'.join(io_miss)}")
  222. # ---- 硬门禁: 每步必有 input + output 标签 (步骤=对输入操作产生新产物, 缺标签=骨架洞) ----
  223. if io_missing:
  224. lines = "\n".join(f" · {m}" for m in io_missing)
  225. return ToolResult(
  226. title=f"{len(io_missing)} 个步骤缺 input/output 标签",
  227. output=(
  228. f"以下步骤缺输入或输出的短标签, 骨架会带着空 IO 生成, 后续校验/渲染全过不去:\n{lines}\n\n"
  229. f"每步都要写 input(用到什么数据)和 output(产出什么) 的短标签 (字符串或数组)。"
  230. f"原文没明写的, 按工艺推断该有什么(如生图步 input 必有 提示词)。"
  231. f"补全后**重新调用 plan_procedures**。"
  232. ),
  233. error=f"steps missing input/output: {len(io_missing)}",
  234. )
  235. # ---- 硬门禁: 章节覆盖 (只在原文确有 0N 分章时启用) ----
  236. if present:
  237. unclaimed = sorted(present - claimed)
  238. if unclaimed:
  239. lines = "\n".join(f" · 章节 {n} 『{sec_title.get(n,'')}』" for n in unclaimed)
  240. return ToolResult(
  241. title=f"计划漏了 {len(unclaimed)} 个章节",
  242. output=(
  243. f"原文有 {len(present)} 个章节, 你的计划只认领了 {sorted(claimed) or '无'}。"
  244. f"下面这些章节**没有任何工序认领**, 极可能被整段漏抽:\n{lines}\n\n"
  245. f"请逐个判断: 它是一条独立工序 (有自己的做法/产物), 还是某工序的若干步骤? "
  246. f"想清楚后**重新调用 plan_procedures**, 让每个工序的 source_sections 把这些章节都覆盖上。"
  247. f"(无独立做法、纯展示的章节可以并进相邻工序的 steps, 但要在某工序的 source_sections 里出现。)"
  248. ),
  249. error=f"unclaimed sections: {','.join(unclaimed)}",
  250. )
  251. bogus = sorted(claimed - present - {""})
  252. if bogus:
  253. warnings.append(f"source_sections 里 {bogus} 在原文里找不到对应 0N 章节 (写错章节号?)")
  254. # ---- 通过: 生成骨架 + 落盘 ----
  255. skeleton = _build_skeleton(summary, procs)
  256. out_dir: Optional[Path] = _PLAN_CTX.get("out_dir")
  257. nproc = len(skeleton["procedures"])
  258. nstep = sum(len(p["steps"]) for p in skeleton["procedures"])
  259. written = []
  260. if out_dir:
  261. scratch = out_dir / "_scratch"
  262. scratch.mkdir(parents=True, exist_ok=True)
  263. wf_path = out_dir / "workflow.json"
  264. wf_path.write_text(json.dumps(skeleton, ensure_ascii=False, indent=2), encoding="utf-8")
  265. written.append(wf_path.as_posix())
  266. # 计划原文留档 (供审计 workflow 是否仍按计划; 也方便你回看)
  267. plan_path = scratch / "understanding.json"
  268. plan_path.write_text(json.dumps(
  269. {"summary": summary, "procedures": procs}, ensure_ascii=False, indent=2), encoding="utf-8")
  270. written.append(plan_path.as_posix())
  271. # ---- 回灌进对话: 计划摘要 + 骨架结构 + 指针 (规则细节在 README, 这里不复述) ----
  272. spec_name = _PLAN_CTX.get("spec_name", "spec")
  273. recap = [f"✅ 计划已通过校验, 已据此生成 workflow.json 骨架: {nproc} 工序 / {nstep} 步。"]
  274. if present:
  275. recap.append(f"章节覆盖: 原文 {sorted(present)} 全部被认领 ✓")
  276. else:
  277. recap.append("ℹ 原文未检测到 `NN|` 分章标号, 章节覆盖门禁未启用 — 自行核对没有整段漏抽。")
  278. recap.append("")
  279. for p in skeleton["procedures"]:
  280. recap.append(f"【{p['id']}】{p['name']} ({p['category']}) — {len(p['steps'])} 步")
  281. for st in p["steps"]:
  282. ins = "+".join(io["type"] for io in st["inputs"]) or "—"
  283. outs = "+".join(io["type"] for io in st["outputs"]) or "—"
  284. tag = f" [{st['kind']}]" if st.get("kind") != "step" else ""
  285. recap.append(f" {st['id']}{tag} via={st['via'] or '—'}: [{ins}] → [{outs}]")
  286. if warnings:
  287. recap.append("")
  288. recap.append("⚠ 警告 (不阻塞, 但请核对):")
  289. for w in warnings:
  290. recap.append(f" - {w}")
  291. recap.append("")
  292. recap.append(
  293. "锁定范围 = 工序划分与步骤序列; **在已有步骤里补输入/输出是允许且常见的**"
  294. "(如生成步补第二个输入, wf-patch 路径不存在默认 upsert)。"
  295. f"接下来按 {spec_name}/README.md「第二阶段 · 2.0 填内容」执行: "
  296. "wf-patch 填 value(@quote)/anchor → verify-io.py 校验通过 → 进 2.1 归类。"
  297. "若工序/步骤划分本身不对, 修改计划后重新调用 plan_procedures (整体重生成)。"
  298. )
  299. return ToolResult(
  300. title=f"计划通过: {nproc} 工序 / {nstep} 步",
  301. output="\n".join(recap),
  302. metadata={"procedures": nproc, "steps": nstep,
  303. "sections_present": sorted(present), "written": written,
  304. "warnings": len(warnings)},
  305. )