validate.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. #!/usr/bin/env python3
  2. """procedure skill 唯一校验器.
  3. agent 直接 Write 出 workflow.json 后跑本脚本, 一次性报出全部问题:
  4. ✗ 错误 (必须修, exit 1): 结构缺字段 / 词表不命中 / 数据流断链 / 编号重复 / value 是引用占位
  5. ⚠ 警告 (建议核对, 不拦截): 类型不一致 / 生成步缺提示词输入 / value 未在原文逐字命中
  6. 用法:
  7. python procedure/tools/validate.py --workflow outputs/case-N/workflow.json \
  8. [--source input/case-N.json] [--ocr outputs/case-N/_scratch/ocr.txt]
  9. 退出码: 0 = 无错误 / 1 = 有错误 / 2 = CLI 或文件问题
  10. """
  11. import argparse
  12. import json
  13. import re
  14. import sys
  15. from difflib import SequenceMatcher
  16. from pathlib import Path
  17. if hasattr(sys.stdout, "reconfigure"):
  18. sys.stdout.reconfigure(encoding="utf-8")
  19. sys.stderr.reconfigure(encoding="utf-8")
  20. # 词表常量与 wf-patch.py 同源 (同目录 vocab.py)
  21. sys.path.insert(0, str(Path(__file__).resolve().parent))
  22. from vocab import EFFECT_LEAVES, ACTION_LEAF2PATH, TYPE_LEAVES # noqa: E402
  23. CATEGORIES = {"产物创造", "资产建设", "自动化", "分析", "学习"}
  24. KINDS = {"step", "block", "nested"}
  25. STEP_ID_RE = re.compile(r"^s\d+(\.\d+)*$")
  26. SOURCE_REQUIRED = ["platform", "author", "date", "title", "excerpt"]
  27. PROC_REQUIRED = ["id", "name", "purpose", "category", "declarations", "steps"]
  28. STEP_REQUIRED = ["id", "kind", "via", "inputs", "outputs", "intent"]
  29. IO_REQUIRED = ["type", "value", "anchor"]
  30. INTENT_TOKEN_KINDS = {"effect", "via", "act", "in-type", "out-type"}
  31. def norm_ws(s):
  32. return re.sub(r"\s+", "", s or "")
  33. def _norm_map(raw):
  34. """去空白的归一文本 + 每个归一字符在原文里的下标 (用于把命中片段映射回原文逐字)."""
  35. chars, idx = [], []
  36. for i, ch in enumerate(raw):
  37. if not ch.isspace():
  38. chars.append(ch)
  39. idx.append(i)
  40. return "".join(chars), idx
  41. def _best_span(nv, doc_norm, doc_idx, doc_raw):
  42. """在一个文档里找与 nv 最相似的连续片段. 返回 (相似度, 原文逐字片段) 或 None."""
  43. sm = SequenceMatcher(None, nv, doc_norm, autojunk=False)
  44. m = sm.find_longest_match(0, len(nv), 0, len(doc_norm))
  45. if m.size < min(15, max(8, len(nv) // 4)): # 连最长公共串都很短 → 内容不在这个文档里
  46. return None
  47. # 以最长公共串为锚, 框出 nv 在文档里应当对应的窗口 (前后各留 1/5 余量)
  48. slack = max(10, len(nv) // 5)
  49. w_start = max(0, m.b - m.a - slack)
  50. w_end = min(len(doc_norm), m.b + (len(nv) - m.a) + slack)
  51. sm2 = SequenceMatcher(None, nv, doc_norm[w_start:w_end], autojunk=False)
  52. blocks = [b for b in sm2.get_matching_blocks() if b.size > 0]
  53. if not blocks:
  54. return None
  55. t_start = w_start + blocks[0].b # 收紧到首尾命中块之间
  56. t_end = w_start + blocks[-1].b + blocks[-1].size
  57. ratio = SequenceMatcher(None, nv, doc_norm[t_start:t_end], autojunk=False).ratio()
  58. return ratio, doc_raw[doc_idx[t_start]: doc_idx[t_end - 1] + 1]
  59. def fix_verbatim(wf, docs):
  60. """把未逐字命中的文本 value 替换成原文里最相似的连续片段 (相似度够高才动).
  61. docs = [(norm, idx_map, raw), ...] — 原文各字段 + OCR 各算一个文档, 片段不跨文档拼接.
  62. 返回 (fixed, skipped): fixed=[(path, ratio, 旧字数, 新字数)], skipped=[(path, 最高相似度)].
  63. """
  64. fixed, skipped = [], []
  65. for proc in wf.get("procedures") or []:
  66. pid = proc.get("id") or "p?"
  67. for s in proc.get("steps") or []:
  68. if not isinstance(s, dict):
  69. continue
  70. for arr in ("inputs", "outputs"):
  71. for i, io in enumerate(s.get(arr) or []):
  72. if not isinstance(io, dict):
  73. continue
  74. v = io.get("value")
  75. if not isinstance(v, str) or v.lstrip().startswith("<"):
  76. continue
  77. nv = norm_ws(v)
  78. if len(nv) < 40 or any(nv in d[0] for d in docs):
  79. continue # 短文本 / 已逐字命中
  80. best = None
  81. for dn, di, dr in docs:
  82. r = _best_span(nv, dn, di, dr)
  83. if r and (best is None or r[0] > best[0]):
  84. best = r
  85. path = f"{pid}.{s.get('id', 's?')}.{arr}[{i}]"
  86. new_n = norm_ws(best[1]) if best else ""
  87. if best and best[0] >= 0.60 and 0.4 * len(nv) <= len(new_n) <= 2.5 * len(nv):
  88. io["value"] = best[1]
  89. fixed.append((path, best[0], len(nv), len(new_n)))
  90. else:
  91. skipped.append((path, best[0] if best else 0.0))
  92. return fixed, skipped
  93. def collect_strings(obj, out):
  94. if isinstance(obj, str):
  95. out.append(obj)
  96. elif isinstance(obj, list):
  97. for x in obj:
  98. collect_strings(x, out)
  99. elif isinstance(obj, dict):
  100. for x in obj.values():
  101. collect_strings(x, out)
  102. class Report:
  103. def __init__(self):
  104. self.errors = []
  105. self.warns = []
  106. def err(self, path, msg):
  107. self.errors.append((path, msg))
  108. def warn(self, path, msg):
  109. self.warns.append((path, msg))
  110. def check_io(rep, io, path, is_output, proc_output_ids):
  111. if not isinstance(io, dict):
  112. rep.err(path, "IO 项必须是对象")
  113. return
  114. for k in IO_REQUIRED:
  115. if k not in io:
  116. rep.err(path, f"缺字段 {k}")
  117. if is_output and not io.get("id"):
  118. rep.err(path, "输出缺 id (如 s2o1)")
  119. value = io.get("value", "")
  120. if isinstance(value, str):
  121. if not value.strip():
  122. rep.err(path, "value 为空 — 文字填原文逐字内容, 媒体填 <整段描述>")
  123. elif re.match(r"^\s*(←|\(同|(同|见\s*s\d)", value):
  124. rep.err(path, f"value 是引用占位 ({value[:20]!r}) — 引用写 anchor, value 抄真实内容")
  125. anchor = io.get("anchor")
  126. if isinstance(anchor, str) and anchor.strip():
  127. a = anchor.strip()
  128. if is_output:
  129. if not a.startswith("→"):
  130. rep.err(path, f"输出 anchor 应以 → 开头, 现为 {a!r}")
  131. else:
  132. if not a.startswith("←"):
  133. rep.err(path, f"输入 anchor 应以 ← 开头, 现为 {a!r}")
  134. else:
  135. ref = a.lstrip("←").strip()
  136. ref_id = re.sub(r"\[[^\]]*\]$", "", ref).strip()
  137. if re.match(r"^s\d", ref_id) and ref_id not in proc_output_ids:
  138. rep.err(path, f"anchor 引用了不存在的输出编号 {ref_id!r}")
  139. if io.get("inferred") is True and not io.get("inferred_reason"):
  140. rep.err(path, "inferred=true 必须带 inferred_reason")
  141. def check_intent(rep, intent, path):
  142. if not isinstance(intent, str) or not intent.strip():
  143. rep.err(path, "缺 intent")
  144. return
  145. if "→" in intent:
  146. rep.err(path, "intent 写成了公式 (含 →) — 要一句通顺人话")
  147. for kind in re.findall(r"\{([^:{}]+):", intent):
  148. if kind not in INTENT_TOKEN_KINDS:
  149. rep.err(path, f"intent 标记类别 {{{kind}:}} 非法 — 只能用 {sorted(INTENT_TOKEN_KINDS)}")
  150. plain = re.sub(r"\{[^:{}]+:([^{}]*)\}", r"\1", intent)
  151. if len(plain) > 40:
  152. rep.warn(path, f"intent 偏长 ({len(plain)} 字, 建议 ≤25)")
  153. def validate(wf, vocab, source_text):
  154. rep = Report()
  155. src = wf.get("source")
  156. if not isinstance(src, dict):
  157. rep.err("source", "缺 source 块")
  158. else:
  159. for k in SOURCE_REQUIRED:
  160. if not src.get(k):
  161. rep.err("source", f"缺字段 {k}")
  162. procs = wf.get("procedures")
  163. if not isinstance(procs, list) or not procs:
  164. rep.err("procedures", "procedures 必须是非空数组")
  165. return rep
  166. for proc in procs:
  167. pid = proc.get("id") or "p?"
  168. for k in PROC_REQUIRED:
  169. if k not in proc:
  170. rep.err(pid, f"缺字段 {k}")
  171. if proc.get("category") and proc["category"] not in CATEGORIES:
  172. rep.err(pid, f"category={proc['category']!r} 不合法 — 只能是 {sorted(CATEGORIES)}")
  173. decl = proc.get("declarations")
  174. if isinstance(decl, dict):
  175. for k in ("inputs", "resources", "returns"):
  176. if k not in decl:
  177. rep.err(f"{pid}.declarations", f"缺字段 {k}")
  178. # type_registry: 每条要 extends 到标准类型叶子 + desc
  179. registry = proc.get("type_registry") or {}
  180. for tname, entry in registry.items():
  181. tpath = f"{pid}.type_registry.{tname}"
  182. if not isinstance(entry, dict) or not entry.get("extends"):
  183. rep.err(tpath, "缺 extends")
  184. elif entry["extends"] not in vocab["type_leaves"]:
  185. rep.err(tpath, f"extends={entry['extends']!r} 不是 type 词表叶子")
  186. if not isinstance(entry, dict) or not entry.get("desc"):
  187. rep.err(tpath, "缺 desc")
  188. known_types = vocab["type_leaves"] | set(registry)
  189. steps = proc.get("steps") or []
  190. block_ids = {s.get("id") for s in steps if isinstance(s, dict) and s.get("kind") == "block"}
  191. output_ids = set()
  192. out_type_by_id = {}
  193. for s in steps:
  194. if not isinstance(s, dict):
  195. continue
  196. for o in s.get("outputs") or []:
  197. if isinstance(o, dict) and o.get("id"):
  198. spath = f"{pid}.{s.get('id', 's?')}"
  199. if o["id"] in output_ids:
  200. rep.err(spath, f"输出编号 {o['id']!r} 重复")
  201. output_ids.add(o["id"])
  202. out_type_by_id[o["id"]] = o.get("type")
  203. for s in steps:
  204. if not isinstance(s, dict):
  205. rep.err(pid, "steps 含非对象项")
  206. continue
  207. sid = s.get("id") or "s?"
  208. spath = f"{pid}.{sid}"
  209. for k in STEP_REQUIRED:
  210. if k not in s:
  211. rep.err(spath, f"缺字段 {k}")
  212. if s.get("id") and not STEP_ID_RE.match(s["id"]):
  213. rep.err(spath, f"step id {s['id']!r} 不合规 (s1 / s5.1)")
  214. kind = s.get("kind")
  215. if kind not in KINDS:
  216. rep.err(spath, f"kind={kind!r} 不合法 — 只能是 {sorted(KINDS)}")
  217. if kind == "nested":
  218. if not s.get("group"):
  219. rep.err(spath, "nested 步缺 group")
  220. elif s["group"] not in block_ids:
  221. rep.err(spath, f"group={s['group']!r} 不是已存在的 block id")
  222. # render schema: directive 若存在必须是字符串 (人工/控制步没有 prompt 就省略字段或写空串)
  223. if "directive" in s and not isinstance(s["directive"], str):
  224. rep.err(spath, f"directive={s['directive']!r} 必须是字符串 — 没有就删掉该字段, 不要写 null")
  225. if kind in ("step", "nested"):
  226. # render 硬门禁对齐: via 非空、inputs/outputs 非空数组 (缺 IO 按工艺推断补, 标 inferred)
  227. if not str(s.get("via") or "").strip():
  228. rep.err(spath, "via 为空 — 步骤要写用的工具 (human / 工具名 / 占位 (AI 生图工具))")
  229. for arr, label in (("inputs", "输入"), ("outputs", "输出")):
  230. if not s.get(arr):
  231. rep.err(spath, f"{arr} 为空数组 — 步骤必有{label}; 原文没明写就按工艺推断补, "
  232. f"标 inferred:true + inferred_reason, 不要为过校验而删 IO")
  233. # effect / action 必须命中词表
  234. eff = s.get("effect")
  235. if not eff:
  236. rep.err(spath, "缺 effect")
  237. elif eff not in vocab["effect_leaves"]:
  238. rep.err(spath, f"effect={eff!r} 不是合法叶子 — 9 选 1: {sorted(vocab['effect_leaves'])}")
  239. act = s.get("action")
  240. if not act:
  241. rep.err(spath, "缺 action")
  242. elif act not in vocab["action_leaves"] and act not in vocab["action_paths"]:
  243. rep.err(spath, f"action={act!r} 不在动作词表 (叶子名或 根/…/叶 全路径)")
  244. # substance / form 必须显式处理 (填值或 null)
  245. for k in ("substance", "form"):
  246. if k not in s:
  247. rep.err(spath, f"缺 {k} (没有就显式设 null)")
  248. check_intent(rep, s.get("intent"), spath)
  249. for arr, is_out in (("inputs", False), ("outputs", True)):
  250. for i, io in enumerate(s.get(arr) or []):
  251. iopath = f"{spath}.{arr}[{i}]"
  252. check_io(rep, io, iopath, is_out, output_ids)
  253. if not isinstance(io, dict):
  254. continue
  255. t = io.get("type")
  256. if t and t not in known_types:
  257. rep.err(iopath, f"type={t!r} 不在词表也没在 type_registry 挂靠")
  258. # 输入类型与来源输出类型一致性
  259. if not is_out and isinstance(io.get("anchor"), str):
  260. ref_id = re.sub(r"\[[^\]]*\]$", "", io["anchor"].lstrip("←").strip()).strip()
  261. src_t = out_type_by_id.get(ref_id)
  262. if src_t and t and src_t != t:
  263. rep.warn(iopath, f"输入 type={t!r} 与来源 {ref_id} 的 type={src_t!r} 不一致")
  264. # 生成步建议有提示词输入 + directive
  265. if kind in ("step", "nested") and isinstance(s.get("action"), str) and s["action"].split("/")[0] == "生成":
  266. in_types = {io.get("type") for io in s.get("inputs") or [] if isinstance(io, dict)}
  267. if not in_types & {"提示词", "负向提示词", "描述"}:
  268. rep.warn(spath, "生成步没有 提示词/描述 类输入 — 确认是否漏建")
  269. # value 逐字核对 (有 --source 才跑)
  270. if source_text:
  271. for s in steps:
  272. if not isinstance(s, dict):
  273. continue
  274. for arr in ("inputs", "outputs"):
  275. for i, io in enumerate(s.get(arr) or []):
  276. if not isinstance(io, dict):
  277. continue
  278. v = io.get("value")
  279. if not isinstance(v, str) or v.lstrip().startswith("<"):
  280. continue # 媒体类描述不要求逐字
  281. nv = norm_ws(v)
  282. if len(nv) >= 40 and nv not in source_text:
  283. rep.warn(f"{pid}.{s.get('id','s?')}.{arr}[{i}]",
  284. f"value ({len(nv)} 字) 未在原文/OCR 逐字命中 — 确认没缩写/改写"
  285. f" (可加 --fix-verbatim 自动替换为原文片段)")
  286. return rep
  287. def main():
  288. ap = argparse.ArgumentParser()
  289. ap.add_argument("--workflow", required=True)
  290. ap.add_argument("--source", help="原文 case json — 给了才跑 value 逐字核对")
  291. ap.add_argument("--ocr", help="配图 OCR 文本, 并入逐字核对语料")
  292. ap.add_argument("--fix-verbatim", action="store_true",
  293. help="把未逐字命中的文本 value 自动替换为原文里最相似的连续片段 "
  294. "(相似度 ≥60%% 才动, 改不动的留警告; 需要 --source)")
  295. args = ap.parse_args()
  296. try:
  297. wf = json.loads(Path(args.workflow).read_text(encoding="utf-8"))
  298. except FileNotFoundError:
  299. print(f"✗ 文件不存在: {args.workflow}")
  300. return 2
  301. except json.JSONDecodeError as e:
  302. print(f"✗ JSON 解析失败: {e}")
  303. return 1
  304. vocab = {
  305. "effect_leaves": EFFECT_LEAVES,
  306. "action_leaves": set(ACTION_LEAF2PATH),
  307. "action_paths": set(ACTION_LEAF2PATH.values()),
  308. "type_leaves": TYPE_LEAVES,
  309. }
  310. source_text = ""
  311. docs = [] # 逐字段文档 [(norm, idx_map, raw)], --fix-verbatim 用 (片段不跨字段拼)
  312. if args.source:
  313. chunks = []
  314. collect_strings(json.loads(Path(args.source).read_text(encoding="utf-8")), chunks)
  315. source_text = norm_ws("".join(chunks))
  316. docs = [(*_norm_map(c), c) for c in chunks if len(norm_ws(c)) >= 40]
  317. if args.ocr and Path(args.ocr).exists():
  318. ocr_raw = Path(args.ocr).read_text(encoding="utf-8")
  319. source_text += norm_ws(ocr_raw)
  320. docs.append((*_norm_map(ocr_raw), ocr_raw))
  321. if args.fix_verbatim:
  322. if not docs:
  323. print("✗ --fix-verbatim 需要 --source (原文语料)")
  324. return 2
  325. fixed, skipped = fix_verbatim(wf, docs)
  326. if fixed:
  327. Path(args.workflow).write_text(
  328. json.dumps(wf, ensure_ascii=False, indent=2), encoding="utf-8")
  329. for p, r, a, b in fixed:
  330. print(f" ✦ [{p}] value 已替换为原文逐字片段 (相似度 {r:.0%}, {a}→{b} 字)")
  331. for p, r in skipped:
  332. print(f" ⚠ [{p}] 找不到足够相似的原文连续片段 (最高 {r:.0%}) — 可能是多段拼接/自创内容, 人工处理")
  333. rep = validate(wf, vocab, source_text)
  334. for path, msg in rep.errors:
  335. print(f" ✗ [{path}] {msg}")
  336. for path, msg in rep.warns:
  337. print(f" ⚠ [{path}] {msg}")
  338. print(f"[validate] {args.workflow}: {len(rep.errors)} 错误, {len(rep.warns)} 警告")
  339. return 1 if rep.errors else 0
  340. if __name__ == "__main__":
  341. sys.exit(main())