| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- #!/usr/bin/env python3
- """procedure skill 唯一校验器.
- agent 直接 Write 出 workflow.json 后跑本脚本, 一次性报出全部问题:
- ✗ 错误 (必须修, exit 1): 结构缺字段 / 词表不命中 / 数据流断链 / 编号重复 / value 是引用占位
- ⚠ 警告 (建议核对, 不拦截): 类型不一致 / 生成步缺提示词输入 / value 未在原文逐字命中
- 用法:
- python procedure/tools/validate.py --workflow outputs/case-N/workflow.json \
- [--source input/case-N.json] [--ocr outputs/case-N/_scratch/ocr.txt]
- 退出码: 0 = 无错误 / 1 = 有错误 / 2 = CLI 或文件问题
- """
- import argparse
- import json
- import re
- import sys
- from difflib import SequenceMatcher
- from pathlib import Path
- if hasattr(sys.stdout, "reconfigure"):
- sys.stdout.reconfigure(encoding="utf-8")
- sys.stderr.reconfigure(encoding="utf-8")
- # 词表常量与 wf-patch.py 同源 (同目录 vocab.py)
- sys.path.insert(0, str(Path(__file__).resolve().parent))
- from vocab import EFFECT_LEAVES, ACTION_LEAF2PATH, TYPE_LEAVES # noqa: E402
- CATEGORIES = {"产物创造", "资产建设", "自动化", "分析", "学习"}
- KINDS = {"step", "block", "nested"}
- STEP_ID_RE = re.compile(r"^s\d+(\.\d+)*$")
- SOURCE_REQUIRED = ["platform", "author", "date", "title", "excerpt"]
- PROC_REQUIRED = ["id", "name", "purpose", "category", "declarations", "steps"]
- STEP_REQUIRED = ["id", "kind", "via", "inputs", "outputs", "intent"]
- IO_REQUIRED = ["type", "value", "anchor"]
- INTENT_TOKEN_KINDS = {"effect", "via", "act", "in-type", "out-type"}
- def norm_ws(s):
- return re.sub(r"\s+", "", s or "")
- def _norm_map(raw):
- """去空白的归一文本 + 每个归一字符在原文里的下标 (用于把命中片段映射回原文逐字)."""
- chars, idx = [], []
- for i, ch in enumerate(raw):
- if not ch.isspace():
- chars.append(ch)
- idx.append(i)
- return "".join(chars), idx
- def _best_span(nv, doc_norm, doc_idx, doc_raw):
- """在一个文档里找与 nv 最相似的连续片段. 返回 (相似度, 原文逐字片段) 或 None."""
- sm = SequenceMatcher(None, nv, doc_norm, autojunk=False)
- m = sm.find_longest_match(0, len(nv), 0, len(doc_norm))
- if m.size < min(15, max(8, len(nv) // 4)): # 连最长公共串都很短 → 内容不在这个文档里
- return None
- # 以最长公共串为锚, 框出 nv 在文档里应当对应的窗口 (前后各留 1/5 余量)
- slack = max(10, len(nv) // 5)
- w_start = max(0, m.b - m.a - slack)
- w_end = min(len(doc_norm), m.b + (len(nv) - m.a) + slack)
- sm2 = SequenceMatcher(None, nv, doc_norm[w_start:w_end], autojunk=False)
- blocks = [b for b in sm2.get_matching_blocks() if b.size > 0]
- if not blocks:
- return None
- t_start = w_start + blocks[0].b # 收紧到首尾命中块之间
- t_end = w_start + blocks[-1].b + blocks[-1].size
- ratio = SequenceMatcher(None, nv, doc_norm[t_start:t_end], autojunk=False).ratio()
- return ratio, doc_raw[doc_idx[t_start]: doc_idx[t_end - 1] + 1]
- def fix_verbatim(wf, docs):
- """把未逐字命中的文本 value 替换成原文里最相似的连续片段 (相似度够高才动).
- docs = [(norm, idx_map, raw), ...] — 原文各字段 + OCR 各算一个文档, 片段不跨文档拼接.
- 返回 (fixed, skipped): fixed=[(path, ratio, 旧字数, 新字数)], skipped=[(path, 最高相似度)].
- """
- fixed, skipped = [], []
- for proc in wf.get("procedures") or []:
- pid = proc.get("id") or "p?"
- for s in proc.get("steps") or []:
- if not isinstance(s, dict):
- continue
- for arr in ("inputs", "outputs"):
- for i, io in enumerate(s.get(arr) or []):
- if not isinstance(io, dict):
- continue
- v = io.get("value")
- if not isinstance(v, str) or v.lstrip().startswith("<"):
- continue
- nv = norm_ws(v)
- if len(nv) < 40 or any(nv in d[0] for d in docs):
- continue # 短文本 / 已逐字命中
- best = None
- for dn, di, dr in docs:
- r = _best_span(nv, dn, di, dr)
- if r and (best is None or r[0] > best[0]):
- best = r
- path = f"{pid}.{s.get('id', 's?')}.{arr}[{i}]"
- new_n = norm_ws(best[1]) if best else ""
- if best and best[0] >= 0.60 and 0.4 * len(nv) <= len(new_n) <= 2.5 * len(nv):
- io["value"] = best[1]
- fixed.append((path, best[0], len(nv), len(new_n)))
- else:
- skipped.append((path, best[0] if best else 0.0))
- return fixed, skipped
- def collect_strings(obj, out):
- if isinstance(obj, str):
- out.append(obj)
- elif isinstance(obj, list):
- for x in obj:
- collect_strings(x, out)
- elif isinstance(obj, dict):
- for x in obj.values():
- collect_strings(x, out)
- class Report:
- def __init__(self):
- self.errors = []
- self.warns = []
- def err(self, path, msg):
- self.errors.append((path, msg))
- def warn(self, path, msg):
- self.warns.append((path, msg))
- def check_io(rep, io, path, is_output, proc_output_ids):
- if not isinstance(io, dict):
- rep.err(path, "IO 项必须是对象")
- return
- for k in IO_REQUIRED:
- if k not in io:
- rep.err(path, f"缺字段 {k}")
- if is_output and not io.get("id"):
- rep.err(path, "输出缺 id (如 s2o1)")
- value = io.get("value", "")
- if isinstance(value, str):
- if not value.strip():
- rep.err(path, "value 为空 — 文字填原文逐字内容, 媒体填 <整段描述>")
- elif re.match(r"^\s*(←|\(同|(同|见\s*s\d)", value):
- rep.err(path, f"value 是引用占位 ({value[:20]!r}) — 引用写 anchor, value 抄真实内容")
- anchor = io.get("anchor")
- if isinstance(anchor, str) and anchor.strip():
- a = anchor.strip()
- if is_output:
- if not a.startswith("→"):
- rep.err(path, f"输出 anchor 应以 → 开头, 现为 {a!r}")
- else:
- if not a.startswith("←"):
- rep.err(path, f"输入 anchor 应以 ← 开头, 现为 {a!r}")
- else:
- ref = a.lstrip("←").strip()
- ref_id = re.sub(r"\[[^\]]*\]$", "", ref).strip()
- if re.match(r"^s\d", ref_id) and ref_id not in proc_output_ids:
- rep.err(path, f"anchor 引用了不存在的输出编号 {ref_id!r}")
- if io.get("inferred") is True and not io.get("inferred_reason"):
- rep.err(path, "inferred=true 必须带 inferred_reason")
- def check_intent(rep, intent, path):
- if not isinstance(intent, str) or not intent.strip():
- rep.err(path, "缺 intent")
- return
- if "→" in intent:
- rep.err(path, "intent 写成了公式 (含 →) — 要一句通顺人话")
- for kind in re.findall(r"\{([^:{}]+):", intent):
- if kind not in INTENT_TOKEN_KINDS:
- rep.err(path, f"intent 标记类别 {{{kind}:}} 非法 — 只能用 {sorted(INTENT_TOKEN_KINDS)}")
- plain = re.sub(r"\{[^:{}]+:([^{}]*)\}", r"\1", intent)
- if len(plain) > 40:
- rep.warn(path, f"intent 偏长 ({len(plain)} 字, 建议 ≤25)")
- def validate(wf, vocab, source_text):
- rep = Report()
- src = wf.get("source")
- if not isinstance(src, dict):
- rep.err("source", "缺 source 块")
- else:
- for k in SOURCE_REQUIRED:
- if not src.get(k):
- rep.err("source", f"缺字段 {k}")
- procs = wf.get("procedures")
- if not isinstance(procs, list) or not procs:
- rep.err("procedures", "procedures 必须是非空数组")
- return rep
- for proc in procs:
- pid = proc.get("id") or "p?"
- for k in PROC_REQUIRED:
- if k not in proc:
- rep.err(pid, f"缺字段 {k}")
- if proc.get("category") and proc["category"] not in CATEGORIES:
- rep.err(pid, f"category={proc['category']!r} 不合法 — 只能是 {sorted(CATEGORIES)}")
- decl = proc.get("declarations")
- if isinstance(decl, dict):
- for k in ("inputs", "resources", "returns"):
- if k not in decl:
- rep.err(f"{pid}.declarations", f"缺字段 {k}")
- # type_registry: 每条要 extends 到标准类型叶子 + desc
- registry = proc.get("type_registry") or {}
- for tname, entry in registry.items():
- tpath = f"{pid}.type_registry.{tname}"
- if not isinstance(entry, dict) or not entry.get("extends"):
- rep.err(tpath, "缺 extends")
- elif entry["extends"] not in vocab["type_leaves"]:
- rep.err(tpath, f"extends={entry['extends']!r} 不是 type 词表叶子")
- if not isinstance(entry, dict) or not entry.get("desc"):
- rep.err(tpath, "缺 desc")
- known_types = vocab["type_leaves"] | set(registry)
- steps = proc.get("steps") or []
- block_ids = {s.get("id") for s in steps if isinstance(s, dict) and s.get("kind") == "block"}
- output_ids = set()
- out_type_by_id = {}
- for s in steps:
- if not isinstance(s, dict):
- continue
- for o in s.get("outputs") or []:
- if isinstance(o, dict) and o.get("id"):
- spath = f"{pid}.{s.get('id', 's?')}"
- if o["id"] in output_ids:
- rep.err(spath, f"输出编号 {o['id']!r} 重复")
- output_ids.add(o["id"])
- out_type_by_id[o["id"]] = o.get("type")
- for s in steps:
- if not isinstance(s, dict):
- rep.err(pid, "steps 含非对象项")
- continue
- sid = s.get("id") or "s?"
- spath = f"{pid}.{sid}"
- for k in STEP_REQUIRED:
- if k not in s:
- rep.err(spath, f"缺字段 {k}")
- if s.get("id") and not STEP_ID_RE.match(s["id"]):
- rep.err(spath, f"step id {s['id']!r} 不合规 (s1 / s5.1)")
- kind = s.get("kind")
- if kind not in KINDS:
- rep.err(spath, f"kind={kind!r} 不合法 — 只能是 {sorted(KINDS)}")
- if kind == "nested":
- if not s.get("group"):
- rep.err(spath, "nested 步缺 group")
- elif s["group"] not in block_ids:
- rep.err(spath, f"group={s['group']!r} 不是已存在的 block id")
- # render schema: directive 若存在必须是字符串 (人工/控制步没有 prompt 就省略字段或写空串)
- if "directive" in s and not isinstance(s["directive"], str):
- rep.err(spath, f"directive={s['directive']!r} 必须是字符串 — 没有就删掉该字段, 不要写 null")
- if kind in ("step", "nested"):
- # render 硬门禁对齐: via 非空、inputs/outputs 非空数组 (缺 IO 按工艺推断补, 标 inferred)
- if not str(s.get("via") or "").strip():
- rep.err(spath, "via 为空 — 步骤要写用的工具 (human / 工具名 / 占位 (AI 生图工具))")
- for arr, label in (("inputs", "输入"), ("outputs", "输出")):
- if not s.get(arr):
- rep.err(spath, f"{arr} 为空数组 — 步骤必有{label}; 原文没明写就按工艺推断补, "
- f"标 inferred:true + inferred_reason, 不要为过校验而删 IO")
- # effect / action 必须命中词表
- eff = s.get("effect")
- if not eff:
- rep.err(spath, "缺 effect")
- elif eff not in vocab["effect_leaves"]:
- rep.err(spath, f"effect={eff!r} 不是合法叶子 — 9 选 1: {sorted(vocab['effect_leaves'])}")
- act = s.get("action")
- if not act:
- rep.err(spath, "缺 action")
- elif act not in vocab["action_leaves"] and act not in vocab["action_paths"]:
- rep.err(spath, f"action={act!r} 不在动作词表 (叶子名或 根/…/叶 全路径)")
- # substance / form 必须显式处理 (填值或 null)
- for k in ("substance", "form"):
- if k not in s:
- rep.err(spath, f"缺 {k} (没有就显式设 null)")
- check_intent(rep, s.get("intent"), spath)
- for arr, is_out in (("inputs", False), ("outputs", True)):
- for i, io in enumerate(s.get(arr) or []):
- iopath = f"{spath}.{arr}[{i}]"
- check_io(rep, io, iopath, is_out, output_ids)
- if not isinstance(io, dict):
- continue
- t = io.get("type")
- if t and t not in known_types:
- rep.err(iopath, f"type={t!r} 不在词表也没在 type_registry 挂靠")
- # 输入类型与来源输出类型一致性
- if not is_out and isinstance(io.get("anchor"), str):
- ref_id = re.sub(r"\[[^\]]*\]$", "", io["anchor"].lstrip("←").strip()).strip()
- src_t = out_type_by_id.get(ref_id)
- if src_t and t and src_t != t:
- rep.warn(iopath, f"输入 type={t!r} 与来源 {ref_id} 的 type={src_t!r} 不一致")
- # 生成步建议有提示词输入 + directive
- if kind in ("step", "nested") and isinstance(s.get("action"), str) and s["action"].split("/")[0] == "生成":
- in_types = {io.get("type") for io in s.get("inputs") or [] if isinstance(io, dict)}
- if not in_types & {"提示词", "负向提示词", "描述"}:
- rep.warn(spath, "生成步没有 提示词/描述 类输入 — 确认是否漏建")
- # value 逐字核对 (有 --source 才跑)
- if source_text:
- for s in steps:
- if not isinstance(s, dict):
- continue
- for arr in ("inputs", "outputs"):
- for i, io in enumerate(s.get(arr) or []):
- if not isinstance(io, dict):
- continue
- v = io.get("value")
- if not isinstance(v, str) or v.lstrip().startswith("<"):
- continue # 媒体类描述不要求逐字
- nv = norm_ws(v)
- if len(nv) >= 40 and nv not in source_text:
- rep.warn(f"{pid}.{s.get('id','s?')}.{arr}[{i}]",
- f"value ({len(nv)} 字) 未在原文/OCR 逐字命中 — 确认没缩写/改写"
- f" (可加 --fix-verbatim 自动替换为原文片段)")
- return rep
- def main():
- ap = argparse.ArgumentParser()
- ap.add_argument("--workflow", required=True)
- ap.add_argument("--source", help="原文 case json — 给了才跑 value 逐字核对")
- ap.add_argument("--ocr", help="配图 OCR 文本, 并入逐字核对语料")
- ap.add_argument("--fix-verbatim", action="store_true",
- help="把未逐字命中的文本 value 自动替换为原文里最相似的连续片段 "
- "(相似度 ≥60%% 才动, 改不动的留警告; 需要 --source)")
- args = ap.parse_args()
- try:
- wf = json.loads(Path(args.workflow).read_text(encoding="utf-8"))
- except FileNotFoundError:
- print(f"✗ 文件不存在: {args.workflow}")
- return 2
- except json.JSONDecodeError as e:
- print(f"✗ JSON 解析失败: {e}")
- return 1
- vocab = {
- "effect_leaves": EFFECT_LEAVES,
- "action_leaves": set(ACTION_LEAF2PATH),
- "action_paths": set(ACTION_LEAF2PATH.values()),
- "type_leaves": TYPE_LEAVES,
- }
- source_text = ""
- docs = [] # 逐字段文档 [(norm, idx_map, raw)], --fix-verbatim 用 (片段不跨字段拼)
- if args.source:
- chunks = []
- collect_strings(json.loads(Path(args.source).read_text(encoding="utf-8")), chunks)
- source_text = norm_ws("".join(chunks))
- docs = [(*_norm_map(c), c) for c in chunks if len(norm_ws(c)) >= 40]
- if args.ocr and Path(args.ocr).exists():
- ocr_raw = Path(args.ocr).read_text(encoding="utf-8")
- source_text += norm_ws(ocr_raw)
- docs.append((*_norm_map(ocr_raw), ocr_raw))
- if args.fix_verbatim:
- if not docs:
- print("✗ --fix-verbatim 需要 --source (原文语料)")
- return 2
- fixed, skipped = fix_verbatim(wf, docs)
- if fixed:
- Path(args.workflow).write_text(
- json.dumps(wf, ensure_ascii=False, indent=2), encoding="utf-8")
- for p, r, a, b in fixed:
- print(f" ✦ [{p}] value 已替换为原文逐字片段 (相似度 {r:.0%}, {a}→{b} 字)")
- for p, r in skipped:
- print(f" ⚠ [{p}] 找不到足够相似的原文连续片段 (最高 {r:.0%}) — 可能是多段拼接/自创内容, 人工处理")
- rep = validate(wf, vocab, source_text)
- for path, msg in rep.errors:
- print(f" ✗ [{path}] {msg}")
- for path, msg in rep.warns:
- print(f" ⚠ [{path}] {msg}")
- print(f"[validate] {args.workflow}: {len(rep.errors)} 错误, {len(rep.warns)} 警告")
- return 1 if rep.errors else 0
- if __name__ == "__main__":
- sys.exit(main())
|