#!/usr/bin/env python3 """procedure skill 唯一校验器. agent 直接 Write 出 workflow.json 后跑本脚本, 一次性报出全部问题: ✗ 错误 (必须修, exit 1): 结构缺字段 / 词表不命中 / 数据流断链 / 编号重复 / value 是引用占位 ⚠ 警告 (建议核对, 不拦截): 类型不一致 / 生成步缺提示词输入 / value 未在原文逐字命中 用法: python procedure/tools/validate.py --workflow outputs/case-N/workflow.json \ [--source input/case-N.json] [--ocr outputs/case-N/_scratch/ocr.txt] 退出码: 0 = 无错误 / 1 = 有错误 / 2 = CLI 或文件问题 """ import argparse import json import re import sys from difflib import SequenceMatcher from pathlib import Path if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") sys.stderr.reconfigure(encoding="utf-8") # 词表常量与 wf-patch.py 同源 (同目录 vocab.py) sys.path.insert(0, str(Path(__file__).resolve().parent)) from vocab import EFFECT_LEAVES, ACTION_LEAF2PATH, TYPE_LEAVES # noqa: E402 CATEGORIES = {"产物创造", "资产建设", "自动化", "分析", "学习"} KINDS = {"step", "block", "nested"} STEP_ID_RE = re.compile(r"^s\d+(\.\d+)*$") SOURCE_REQUIRED = ["platform", "author", "date", "title", "excerpt"] PROC_REQUIRED = ["id", "name", "purpose", "category", "declarations", "steps"] STEP_REQUIRED = ["id", "kind", "via", "inputs", "outputs", "intent"] IO_REQUIRED = ["type", "value", "anchor"] INTENT_TOKEN_KINDS = {"effect", "via", "act", "in-type", "out-type"} def norm_ws(s): return re.sub(r"\s+", "", s or "") def _norm_map(raw): """去空白的归一文本 + 每个归一字符在原文里的下标 (用于把命中片段映射回原文逐字).""" chars, idx = [], [] for i, ch in enumerate(raw): if not ch.isspace(): chars.append(ch) idx.append(i) return "".join(chars), idx def _best_span(nv, doc_norm, doc_idx, doc_raw): """在一个文档里找与 nv 最相似的连续片段. 返回 (相似度, 原文逐字片段) 或 None.""" sm = SequenceMatcher(None, nv, doc_norm, autojunk=False) m = sm.find_longest_match(0, len(nv), 0, len(doc_norm)) if m.size < min(15, max(8, len(nv) // 4)): # 连最长公共串都很短 → 内容不在这个文档里 return None # 以最长公共串为锚, 框出 nv 在文档里应当对应的窗口 (前后各留 1/5 余量) slack = max(10, len(nv) // 5) w_start = max(0, m.b - m.a - slack) w_end = min(len(doc_norm), m.b + (len(nv) - m.a) + slack) sm2 = SequenceMatcher(None, nv, doc_norm[w_start:w_end], autojunk=False) blocks = [b for b in sm2.get_matching_blocks() if b.size > 0] if not blocks: return None t_start = w_start + blocks[0].b # 收紧到首尾命中块之间 t_end = w_start + blocks[-1].b + blocks[-1].size ratio = SequenceMatcher(None, nv, doc_norm[t_start:t_end], autojunk=False).ratio() return ratio, doc_raw[doc_idx[t_start]: doc_idx[t_end - 1] + 1] def fix_verbatim(wf, docs): """把未逐字命中的文本 value 替换成原文里最相似的连续片段 (相似度够高才动). docs = [(norm, idx_map, raw), ...] — 原文各字段 + OCR 各算一个文档, 片段不跨文档拼接. 返回 (fixed, skipped): fixed=[(path, ratio, 旧字数, 新字数)], skipped=[(path, 最高相似度)]. """ fixed, skipped = [], [] for proc in wf.get("procedures") or []: pid = proc.get("id") or "p?" for s in proc.get("steps") or []: if not isinstance(s, dict): continue for arr in ("inputs", "outputs"): for i, io in enumerate(s.get(arr) or []): if not isinstance(io, dict): continue v = io.get("value") if not isinstance(v, str) or v.lstrip().startswith("<"): continue nv = norm_ws(v) if len(nv) < 40 or any(nv in d[0] for d in docs): continue # 短文本 / 已逐字命中 best = None for dn, di, dr in docs: r = _best_span(nv, dn, di, dr) if r and (best is None or r[0] > best[0]): best = r path = f"{pid}.{s.get('id', 's?')}.{arr}[{i}]" new_n = norm_ws(best[1]) if best else "" if best and best[0] >= 0.60 and 0.4 * len(nv) <= len(new_n) <= 2.5 * len(nv): io["value"] = best[1] fixed.append((path, best[0], len(nv), len(new_n))) else: skipped.append((path, best[0] if best else 0.0)) return fixed, skipped def collect_strings(obj, out): if isinstance(obj, str): out.append(obj) elif isinstance(obj, list): for x in obj: collect_strings(x, out) elif isinstance(obj, dict): for x in obj.values(): collect_strings(x, out) class Report: def __init__(self): self.errors = [] self.warns = [] def err(self, path, msg): self.errors.append((path, msg)) def warn(self, path, msg): self.warns.append((path, msg)) def check_io(rep, io, path, is_output, proc_output_ids): if not isinstance(io, dict): rep.err(path, "IO 项必须是对象") return for k in IO_REQUIRED: if k not in io: rep.err(path, f"缺字段 {k}") if is_output and not io.get("id"): rep.err(path, "输出缺 id (如 s2o1)") value = io.get("value", "") if isinstance(value, str): if not value.strip(): rep.err(path, "value 为空 — 文字填原文逐字内容, 媒体填 <整段描述>") elif re.match(r"^\s*(←|\(同|(同|见\s*s\d)", value): rep.err(path, f"value 是引用占位 ({value[:20]!r}) — 引用写 anchor, value 抄真实内容") anchor = io.get("anchor") if isinstance(anchor, str) and anchor.strip(): a = anchor.strip() if is_output: if not a.startswith("→"): rep.err(path, f"输出 anchor 应以 → 开头, 现为 {a!r}") else: if not a.startswith("←"): rep.err(path, f"输入 anchor 应以 ← 开头, 现为 {a!r}") else: ref = a.lstrip("←").strip() ref_id = re.sub(r"\[[^\]]*\]$", "", ref).strip() if re.match(r"^s\d", ref_id) and ref_id not in proc_output_ids: rep.err(path, f"anchor 引用了不存在的输出编号 {ref_id!r}") if io.get("inferred") is True and not io.get("inferred_reason"): rep.err(path, "inferred=true 必须带 inferred_reason") def check_intent(rep, intent, path): if not isinstance(intent, str) or not intent.strip(): rep.err(path, "缺 intent") return if "→" in intent: rep.err(path, "intent 写成了公式 (含 →) — 要一句通顺人话") for kind in re.findall(r"\{([^:{}]+):", intent): if kind not in INTENT_TOKEN_KINDS: rep.err(path, f"intent 标记类别 {{{kind}:}} 非法 — 只能用 {sorted(INTENT_TOKEN_KINDS)}") plain = re.sub(r"\{[^:{}]+:([^{}]*)\}", r"\1", intent) if len(plain) > 40: rep.warn(path, f"intent 偏长 ({len(plain)} 字, 建议 ≤25)") def validate(wf, vocab, source_text): rep = Report() src = wf.get("source") if not isinstance(src, dict): rep.err("source", "缺 source 块") else: for k in SOURCE_REQUIRED: if not src.get(k): rep.err("source", f"缺字段 {k}") procs = wf.get("procedures") if not isinstance(procs, list) or not procs: rep.err("procedures", "procedures 必须是非空数组") return rep for proc in procs: pid = proc.get("id") or "p?" for k in PROC_REQUIRED: if k not in proc: rep.err(pid, f"缺字段 {k}") if proc.get("category") and proc["category"] not in CATEGORIES: rep.err(pid, f"category={proc['category']!r} 不合法 — 只能是 {sorted(CATEGORIES)}") decl = proc.get("declarations") if isinstance(decl, dict): for k in ("inputs", "resources", "returns"): if k not in decl: rep.err(f"{pid}.declarations", f"缺字段 {k}") # type_registry: 每条要 extends 到标准类型叶子 + desc registry = proc.get("type_registry") or {} for tname, entry in registry.items(): tpath = f"{pid}.type_registry.{tname}" if not isinstance(entry, dict) or not entry.get("extends"): rep.err(tpath, "缺 extends") elif entry["extends"] not in vocab["type_leaves"]: rep.err(tpath, f"extends={entry['extends']!r} 不是 type 词表叶子") if not isinstance(entry, dict) or not entry.get("desc"): rep.err(tpath, "缺 desc") known_types = vocab["type_leaves"] | set(registry) steps = proc.get("steps") or [] block_ids = {s.get("id") for s in steps if isinstance(s, dict) and s.get("kind") == "block"} output_ids = set() out_type_by_id = {} for s in steps: if not isinstance(s, dict): continue for o in s.get("outputs") or []: if isinstance(o, dict) and o.get("id"): spath = f"{pid}.{s.get('id', 's?')}" if o["id"] in output_ids: rep.err(spath, f"输出编号 {o['id']!r} 重复") output_ids.add(o["id"]) out_type_by_id[o["id"]] = o.get("type") for s in steps: if not isinstance(s, dict): rep.err(pid, "steps 含非对象项") continue sid = s.get("id") or "s?" spath = f"{pid}.{sid}" for k in STEP_REQUIRED: if k not in s: rep.err(spath, f"缺字段 {k}") if s.get("id") and not STEP_ID_RE.match(s["id"]): rep.err(spath, f"step id {s['id']!r} 不合规 (s1 / s5.1)") kind = s.get("kind") if kind not in KINDS: rep.err(spath, f"kind={kind!r} 不合法 — 只能是 {sorted(KINDS)}") if kind == "nested": if not s.get("group"): rep.err(spath, "nested 步缺 group") elif s["group"] not in block_ids: rep.err(spath, f"group={s['group']!r} 不是已存在的 block id") # render schema: directive 若存在必须是字符串 (人工/控制步没有 prompt 就省略字段或写空串) if "directive" in s and not isinstance(s["directive"], str): rep.err(spath, f"directive={s['directive']!r} 必须是字符串 — 没有就删掉该字段, 不要写 null") if kind in ("step", "nested"): # render 硬门禁对齐: via 非空、inputs/outputs 非空数组 (缺 IO 按工艺推断补, 标 inferred) if not str(s.get("via") or "").strip(): rep.err(spath, "via 为空 — 步骤要写用的工具 (human / 工具名 / 占位 (AI 生图工具))") for arr, label in (("inputs", "输入"), ("outputs", "输出")): if not s.get(arr): rep.err(spath, f"{arr} 为空数组 — 步骤必有{label}; 原文没明写就按工艺推断补, " f"标 inferred:true + inferred_reason, 不要为过校验而删 IO") # effect / action 必须命中词表 eff = s.get("effect") if not eff: rep.err(spath, "缺 effect") elif eff not in vocab["effect_leaves"]: rep.err(spath, f"effect={eff!r} 不是合法叶子 — 9 选 1: {sorted(vocab['effect_leaves'])}") act = s.get("action") if not act: rep.err(spath, "缺 action") elif act not in vocab["action_leaves"] and act not in vocab["action_paths"]: rep.err(spath, f"action={act!r} 不在动作词表 (叶子名或 根/…/叶 全路径)") # substance / form 必须显式处理 (填值或 null) for k in ("substance", "form"): if k not in s: rep.err(spath, f"缺 {k} (没有就显式设 null)") check_intent(rep, s.get("intent"), spath) for arr, is_out in (("inputs", False), ("outputs", True)): for i, io in enumerate(s.get(arr) or []): iopath = f"{spath}.{arr}[{i}]" check_io(rep, io, iopath, is_out, output_ids) if not isinstance(io, dict): continue t = io.get("type") if t and t not in known_types: rep.err(iopath, f"type={t!r} 不在词表也没在 type_registry 挂靠") # 输入类型与来源输出类型一致性 if not is_out and isinstance(io.get("anchor"), str): ref_id = re.sub(r"\[[^\]]*\]$", "", io["anchor"].lstrip("←").strip()).strip() src_t = out_type_by_id.get(ref_id) if src_t and t and src_t != t: rep.warn(iopath, f"输入 type={t!r} 与来源 {ref_id} 的 type={src_t!r} 不一致") # 生成步建议有提示词输入 + directive if kind in ("step", "nested") and isinstance(s.get("action"), str) and s["action"].split("/")[0] == "生成": in_types = {io.get("type") for io in s.get("inputs") or [] if isinstance(io, dict)} if not in_types & {"提示词", "负向提示词", "描述"}: rep.warn(spath, "生成步没有 提示词/描述 类输入 — 确认是否漏建") # value 逐字核对 (有 --source 才跑) if source_text: for s in steps: if not isinstance(s, dict): continue for arr in ("inputs", "outputs"): for i, io in enumerate(s.get(arr) or []): if not isinstance(io, dict): continue v = io.get("value") if not isinstance(v, str) or v.lstrip().startswith("<"): continue # 媒体类描述不要求逐字 nv = norm_ws(v) if len(nv) >= 40 and nv not in source_text: rep.warn(f"{pid}.{s.get('id','s?')}.{arr}[{i}]", f"value ({len(nv)} 字) 未在原文/OCR 逐字命中 — 确认没缩写/改写" f" (可加 --fix-verbatim 自动替换为原文片段)") return rep def main(): ap = argparse.ArgumentParser() ap.add_argument("--workflow", required=True) ap.add_argument("--source", help="原文 case json — 给了才跑 value 逐字核对") ap.add_argument("--ocr", help="配图 OCR 文本, 并入逐字核对语料") ap.add_argument("--fix-verbatim", action="store_true", help="把未逐字命中的文本 value 自动替换为原文里最相似的连续片段 " "(相似度 ≥60%% 才动, 改不动的留警告; 需要 --source)") args = ap.parse_args() try: wf = json.loads(Path(args.workflow).read_text(encoding="utf-8")) except FileNotFoundError: print(f"✗ 文件不存在: {args.workflow}") return 2 except json.JSONDecodeError as e: print(f"✗ JSON 解析失败: {e}") return 1 vocab = { "effect_leaves": EFFECT_LEAVES, "action_leaves": set(ACTION_LEAF2PATH), "action_paths": set(ACTION_LEAF2PATH.values()), "type_leaves": TYPE_LEAVES, } source_text = "" docs = [] # 逐字段文档 [(norm, idx_map, raw)], --fix-verbatim 用 (片段不跨字段拼) if args.source: chunks = [] collect_strings(json.loads(Path(args.source).read_text(encoding="utf-8")), chunks) source_text = norm_ws("".join(chunks)) docs = [(*_norm_map(c), c) for c in chunks if len(norm_ws(c)) >= 40] if args.ocr and Path(args.ocr).exists(): ocr_raw = Path(args.ocr).read_text(encoding="utf-8") source_text += norm_ws(ocr_raw) docs.append((*_norm_map(ocr_raw), ocr_raw)) if args.fix_verbatim: if not docs: print("✗ --fix-verbatim 需要 --source (原文语料)") return 2 fixed, skipped = fix_verbatim(wf, docs) if fixed: Path(args.workflow).write_text( json.dumps(wf, ensure_ascii=False, indent=2), encoding="utf-8") for p, r, a, b in fixed: print(f" ✦ [{p}] value 已替换为原文逐字片段 (相似度 {r:.0%}, {a}→{b} 字)") for p, r in skipped: print(f" ⚠ [{p}] 找不到足够相似的原文连续片段 (最高 {r:.0%}) — 可能是多段拼接/自创内容, 人工处理") rep = validate(wf, vocab, source_text) for path, msg in rep.errors: print(f" ✗ [{path}] {msg}") for path, msg in rep.warns: print(f" ⚠ [{path}] {msg}") print(f"[validate] {args.workflow}: {len(rep.errors)} 错误, {len(rep.warns)} 警告") return 1 if rep.errors else 0 if __name__ == "__main__": sys.exit(main())