plan_tool.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """plan_tool.py — procedure-dsl 的「计划」内置工具 (技能本地, 仅 Cyber/run_cyber 引擎)。
  4. 为什么是工具而不是"写 understanding.md":
  5. 弱模型读了"先想清楚工序"的 prompt 也照样跳过、直接堆骨架。把这一步做成**工具**,
  6. LLM 必须**调用一次**把【工序分解 + 每工序步骤逐条展开 + 每工序覆盖原文哪些章节】
  7. 作为结构化参数交上来 —— 工具当场校验完整性 (混合门禁), 通过就**自动据计划生成
  8. workflow.json 骨架**, 结果回灌进对话锚定后续填充。于是:
  9. · understanding 重新独立成一步 (但由工具承载, 不靠自觉)
  10. · 工序由计划显式决定 (章节认领是声明式, 不再靠词汇模糊匹配)
  11. · workflow.json 结构严格按计划生成 (工序数/步骤数/顺序锁定, LLM 只填值)
  12. 设计边界 (遵守"runner 零业务知识"): 本模块是**技能本地**, 不进 agent/tools/builtin/,
  13. run_cyber 仅 `import plan_tool` 触发 @tool 注册 + 调 set_plan_context 注入原文/路径。
  14. Claude SDK 路 (run_procedure_dsl) 没有 repo 工具系统, 那条路 understanding 仍走 prompt。
  15. """
  16. # ⚠ 不能用 `from __future__ import annotations`: 它把类型注解变成字符串, 而框架
  17. # SchemaGenerator 靠运行时 get_origin() 内省真实类型 — 拿字符串会把 procedures 这种
  18. # List[Dict] 退化成 "string", LLM 就不知道该传数组了。
  19. import json
  20. import re
  21. from pathlib import Path
  22. from typing import Any, Dict, List, Optional
  23. from agent.tools import tool, ToolResult, ToolContext
  24. # ===========================================================================
  25. # 运行时上下文 (run_cyber.main() 在跑 runner 前注入; 工具闭包读它, 避免让 LLM 传路径)
  26. # ===========================================================================
  27. _PLAN_CTX: Dict[str, Any] = {}
  28. def set_plan_context(*, body_text: str = "", out_dir: Optional[Path] = None,
  29. case_id: Any = None, source: Optional[dict] = None,
  30. ocr: str = "") -> None:
  31. """run_cyber 在执行前调用, 把原文正文 / 配图 OCR / 输出目录 / source 元信息交给工具。"""
  32. _PLAN_CTX.clear()
  33. _PLAN_CTX.update({
  34. "body_text": body_text or "",
  35. "ocr": ocr or "",
  36. "out_dir": Path(out_dir) if out_dir else None,
  37. "case_id": case_id,
  38. "source": source or {},
  39. })
  40. # ===========================================================================
  41. # 原文章节解析 (与 lint-case.py 同口径: 行首 0N| 标号, 排除 "图 0N|" 配图说明)
  42. # ===========================================================================
  43. _SEC_RE = re.compile(r'(?m)^\s*(0\d)\s*[||]')
  44. def _source_sections(body: str) -> List[tuple]:
  45. """返回 [(章节号, 标题)],如 [('01','从一个案例开始'), ('02','结构化 Prompt 框架'), ...]。"""
  46. marks = [(m.start(), m.group(1)) for m in _SEC_RE.finditer(body or "")]
  47. out: List[tuple] = []
  48. for i, (pos, num) in enumerate(marks):
  49. end = marks[i + 1][0] if i + 1 < len(marks) else len(body)
  50. seg = body[pos:end]
  51. after = re.split(r'[||]', seg, 1)
  52. tail = after[-1] if len(after) > 1 else seg
  53. title = ""
  54. for ln in tail.splitlines():
  55. ln = ln.strip()
  56. if ln:
  57. title = ln[:24]
  58. break
  59. out.append((num, title))
  60. return out
  61. def _norm_sec(s: Any) -> str:
  62. """把 '1' / '01' / '第1章' 之类归一成两位章节号 '01'。无数字返回原串。"""
  63. m = re.search(r'\d+', str(s))
  64. return f"0{int(m.group())}"[-2:] if m and int(m.group()) < 10 else (m.group() if m else str(s).strip())
  65. # ===========================================================================
  66. # 入参容错 (弱模型常把 list 传成 JSON 字符串 / 包一层 {"procedures": [...]})
  67. # ===========================================================================
  68. def _coerce_procedures(procedures: Any) -> List[dict]:
  69. if isinstance(procedures, str):
  70. try:
  71. procedures = json.loads(procedures)
  72. except Exception:
  73. return []
  74. if isinstance(procedures, dict):
  75. procedures = procedures.get("procedures", procedures.get("list", []))
  76. return procedures if isinstance(procedures, list) else []
  77. # ===========================================================================
  78. # 据计划生成 workflow.json 骨架
  79. # ===========================================================================
  80. def _build_skeleton(summary: str, procedures: List[dict]) -> dict:
  81. src = _PLAN_CTX.get("source") or {}
  82. skeleton: Dict[str, Any] = {
  83. "source": {
  84. "platform": src.get("platform", ""),
  85. "author": src.get("author", ""),
  86. "date": src.get("date", ""),
  87. "url": src.get("url", ""),
  88. "title": src.get("title", ""),
  89. "excerpt": src.get("excerpt", "") or (summary or "")[:120],
  90. },
  91. "procedures": [],
  92. }
  93. for pi, proc in enumerate(procedures, 1):
  94. pid = f"p{pi}"
  95. steps_out: List[dict] = []
  96. for si, st in enumerate(proc.get("steps") or [], 1):
  97. if not isinstance(st, dict):
  98. continue
  99. sid = f"s{si}"
  100. in_label = (st.get("input") or "").strip()
  101. out_label = (st.get("output") or "").strip()
  102. step: Dict[str, Any] = {
  103. "id": sid,
  104. "kind": "step",
  105. "via": (st.get("tool") or "").strip(),
  106. "directive": "", # 待 LLM 用 wf-patch 填真实 prompt
  107. "inputs": ([{"type": in_label[:40], "value": "", "anchor": ""}]
  108. if in_label else []),
  109. "outputs": ([{"id": f"{sid}o1", "type": out_label[:40], "value": "", "anchor": ""}]
  110. if out_label else []),
  111. }
  112. steps_out.append(step)
  113. skeleton["procedures"].append({
  114. "id": pid,
  115. "name": (proc.get("name") or "").strip(),
  116. "purpose": (proc.get("final_product") or proc.get("purpose") or "").strip(),
  117. "category": (proc.get("category") or "").strip(),
  118. "platform": src.get("platform", ""),
  119. "author": src.get("author", ""),
  120. "declarations": {"inputs": [], "resources": [], "returns": {}},
  121. "steps": steps_out,
  122. })
  123. return skeleton
  124. # ===========================================================================
  125. # 工具本体
  126. # ===========================================================================
  127. _PLAN_DESC = (
  128. "【第一步必做·只调用一次】提交你对这篇教程的工序计划 (understanding)。读完原文+配图后, "
  129. "把它拆成若干工序、每个工序的步骤逐条展开、并声明每个工序覆盖原文哪些 0N 章节。"
  130. "工具会校验完整性 (有章节没被任何工序认领 → 报错让你补全; 工序只有单步 → 警告), "
  131. "通过后**自动据此生成 workflow.json 骨架** (工序/步骤/顺序锁定), 你之后只在骨架上填 "
  132. "value/directive/anchor, 不再增删工序或步骤。\n"
  133. "procedures 每项形如:\n"
  134. ' {"name":"工序名", "category":"产物创造|资产建设|自动化|分析|学习", '
  135. '"final_product":"终态产物", "source_sections":["01","03"], '
  136. '"steps":[{"tool":"工具名/human", "input":"输入数据的短标签(如 提示词)", '
  137. '"does":"这步做什么的一句话自由描述", "output":"产出的短标签(如 场景图)"}, ...]}\n'
  138. "要点: source_sections 必填 (原文每个 0N 章节都要被某个工序认领, 别漏整段); "
  139. "steps 逐步展开别压成单步; input/output 写**短标签**(它们会变成步骤输入/输出的 type); "
  140. "does 只是计划期的自由描述, **不是** workflow 的 taxonomy 动作词 (那留到 Phase 2 查 action.json 归类)。"
  141. )
  142. @tool(description=_PLAN_DESC, hidden_params=["context"], groups=["core"])
  143. async def plan_procedures(
  144. summary: str,
  145. procedures: List[Dict[str, Any]],
  146. context: Optional[ToolContext] = None,
  147. ) -> ToolResult:
  148. """提交工序计划 → 校验 → 自动生成 workflow.json 骨架。
  149. Args:
  150. summary: 2-4 句说清这篇在教什么、分几大板块。
  151. procedures: 工序列表 (结构见工具描述)。
  152. """
  153. procs = _coerce_procedures(procedures)
  154. if not procs:
  155. return ToolResult(
  156. title="计划为空",
  157. output="procedures 解析为空。请传一个工序数组, 每项含 name/category/"
  158. "final_product/source_sections/steps (steps 逐步展开)。",
  159. error="empty plan",
  160. )
  161. body = _PLAN_CTX.get("body_text", "")
  162. secs = _source_sections(body)
  163. present = {num for num, _ in secs}
  164. sec_title = dict(secs)
  165. # ---- 收集声明 + 基础校验 ----
  166. claimed: set = set()
  167. warnings: List[str] = []
  168. for pi, proc in enumerate(procs, 1):
  169. if not isinstance(proc, dict):
  170. warnings.append(f"procedures[{pi-1}] 不是对象, 已跳过")
  171. continue
  172. name = (proc.get("name") or "").strip() or f"(工序{pi})"
  173. steps = proc.get("steps") or []
  174. for s in (proc.get("source_sections") or []):
  175. claimed.add(_norm_sec(s))
  176. if not steps:
  177. warnings.append(f"工序『{name}』没有 steps — 至少要有步骤序列")
  178. elif len(steps) == 1:
  179. warnings.append(f"工序『{name}』只有 1 个步骤 — 确认它真的无法再展开? (多数工序是多步)")
  180. for sj, st in enumerate(steps, 1):
  181. if not isinstance(st, dict):
  182. continue
  183. miss = [k for k in ("tool", "input", "does", "output") if not (st.get(k) or "").strip()]
  184. if miss:
  185. warnings.append(f"工序『{name}』step{sj} 缺 {'/'.join(miss)} (四要素: 工具 tool·输入 input·动作 does·输出 output)")
  186. # ---- 硬门禁: 章节覆盖 (只在原文确有 0N 分章时启用) ----
  187. if present:
  188. unclaimed = sorted(present - claimed)
  189. if unclaimed:
  190. lines = "\n".join(f" · 章节 {n} 『{sec_title.get(n,'')}』" for n in unclaimed)
  191. return ToolResult(
  192. title=f"计划漏了 {len(unclaimed)} 个章节",
  193. output=(
  194. f"原文有 {len(present)} 个章节, 你的计划只认领了 {sorted(claimed) or '无'}。"
  195. f"下面这些章节**没有任何工序认领**, 极可能被整段漏抽:\n{lines}\n\n"
  196. f"请逐个判断: 它是一条独立工序 (有自己的做法/产物), 还是某工序的若干步骤? "
  197. f"想清楚后**重新调用 plan_procedures**, 让每个工序的 source_sections 把这些章节都覆盖上。"
  198. f"(无独立做法、纯展示的章节可以并进相邻工序的 steps, 但要在某工序的 source_sections 里出现。)"
  199. ),
  200. error=f"unclaimed sections: {','.join(unclaimed)}",
  201. )
  202. bogus = sorted(claimed - present - {""})
  203. if bogus:
  204. warnings.append(f"source_sections 里 {bogus} 在原文里找不到对应 0N 章节 (写错章节号?)")
  205. # ---- 通过: 生成骨架 + 落盘 ----
  206. skeleton = _build_skeleton(summary, procs)
  207. out_dir: Optional[Path] = _PLAN_CTX.get("out_dir")
  208. nproc = len(skeleton["procedures"])
  209. nstep = sum(len(p["steps"]) for p in skeleton["procedures"])
  210. written = []
  211. if out_dir:
  212. scratch = out_dir / "_scratch"
  213. scratch.mkdir(parents=True, exist_ok=True)
  214. wf_path = out_dir / "workflow.json"
  215. wf_path.write_text(json.dumps(skeleton, ensure_ascii=False, indent=2), encoding="utf-8")
  216. written.append(wf_path.as_posix())
  217. # 计划原文留档 (供审计 workflow 是否仍按计划; 也方便你回看)
  218. plan_path = scratch / "understanding.json"
  219. plan_path.write_text(json.dumps(
  220. {"summary": summary, "procedures": procs}, ensure_ascii=False, indent=2), encoding="utf-8")
  221. written.append(plan_path.as_posix())
  222. # ---- 回灌进对话: 计划摘要 + 骨架结构 + 下一步指令 ----
  223. recap = [f"✅ 计划已通过校验, 已据此生成 workflow.json 骨架: {nproc} 工序 / {nstep} 步。"]
  224. if present:
  225. recap.append(f"章节覆盖: 原文 {sorted(present)} 全部被认领 ✓")
  226. recap.append("")
  227. for p in skeleton["procedures"]:
  228. recap.append(f"【{p['id']}】{p['name']} ({p['category']}) — {len(p['steps'])} 步")
  229. for st in p["steps"]:
  230. ins = st["inputs"][0]["type"] if st["inputs"] else "—"
  231. outs = st["outputs"][0]["type"] if st["outputs"] else "—"
  232. recap.append(f" {st['id']} via={st['via'] or '—'}: [{ins}] → [{outs}]")
  233. if warnings:
  234. recap.append("")
  235. recap.append("⚠ 警告 (不阻塞, 但请核对):")
  236. for w in warnings:
  237. recap.append(f" - {w}")
  238. recap.append("")
  239. recap.append(
  240. "下一步 (在已生成的骨架上, 别增删工序/步骤):\n"
  241. " 1. 逐步用 wf-patch.py 填 inputs/outputs 的 value(文本类用 @quote 拽**完整逐字**原文)、"
  242. "anchor(数据流 ← / →); **提示词建成 type=提示词 的输入/输出 value(整段逐字), 别塞 directive**"
  243. "(directive 只放严格反推/比例 2:3 这类元指令, 多数生图步留空);\n"
  244. " 2. 填完后**必须跑 `python spec/tools/verify-io.py --workflow <wf> --source <原文> --ocr <ocr>`**: "
  245. "它校验文本 IO 的 value 逐字、生成步有没有 type=提示词 输入、提示词 value 完不完整、declarations; "
  246. "报问题就修(此时可重读原文, 提示词用 @quote 提全), 修完重跑直到通过, 才进 Phase 2;\n"
  247. " 3. type 现在是描述性短标签, Phase 2 再归一化到词表 + 补 effect/action/substance/form/intent;\n"
  248. " 4. 结构已按计划锁定 —— 若发现确实要加工序/步骤, 说明计划不完整, 重新调 plan_procedures。"
  249. )
  250. return ToolResult(
  251. title=f"计划通过: {nproc} 工序 / {nstep} 步",
  252. output="\n".join(recap),
  253. metadata={"procedures": nproc, "steps": nstep,
  254. "sections_present": sorted(present), "written": written,
  255. "warnings": len(warnings)},
  256. )