lint-case.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. lint-case.py — workflow.json 轻量 lint + 自动 record 新 type 到 type_suggestions.md.
  5. 设计哲学: **不严格**.
  6. - 不分 error/warning 等级, 不卡 exit code (都返 0)
  7. - 主要副作用是 record 新 type 到 spec/taxonomy/type_suggestions.md
  8. - 检测项打 stdout 给 Agent / 用户看, 决定要不要回去修
  9. 用法:
  10. python spec/tools/lint-case.py --workflow outputs/case-{N}/workflow.json --case-id {N}
  11. python spec/tools/lint-case.py --workflow outputs/case-{N}/workflow.json --case-id {N} --no-record # 只校验不写
  12. 退出码:
  13. 0 始终 (不阻塞流程)
  14. 2 CLI 参数错误 / 文件不存在
  15. """
  16. from __future__ import annotations
  17. import argparse
  18. import json
  19. import re
  20. import sys
  21. from pathlib import Path
  22. # spec/tools/lint-case.py → procedure-dsl/
  23. DSL_ROOT = Path(__file__).resolve().parent.parent.parent
  24. TYPE_JSON = DSL_ROOT / 'spec' / 'taxonomy' / 'type.json'
  25. SUGGESTIONS = DSL_ROOT / 'spec' / 'taxonomy' / 'type_suggestions.md'
  26. # Windows 控制台 UTF-8
  27. for _s in (sys.stdout, sys.stderr):
  28. if hasattr(_s, 'reconfigure'):
  29. try:
  30. _s.reconfigure(encoding='utf-8', errors='replace')
  31. except Exception:
  32. pass
  33. def load_type_leaves() -> set[str]:
  34. """读 spec/taxonomy/type.json 的 $leaves 集合."""
  35. if not TYPE_JSON.exists():
  36. return set()
  37. return set(json.loads(TYPE_JSON.read_text(encoding='utf-8')).get('$leaves', []))
  38. # ===========================================================================
  39. # Check 1: type 完整性提示
  40. # ===========================================================================
  41. def _iter_procedures(case_data: dict):
  42. """遍历 workflow.json 的 procedures.
  43. Yields: (procedure_label, procedure_dict) — 含 steps + type_registry.
  44. """
  45. for p in case_data.get('procedures') or []:
  46. label = p.get('id') or p.get('name') or '?'
  47. yield (label, p)
  48. def check_type_completeness(case_data: dict) -> list[str]:
  49. """IO 用了 case-specific type 但 type_registry 漏写 entry → 提示.
  50. 只 hint 不 fail. Agent 看输出回去修.
  51. 多工序时, hint 前缀加 [proc_id] 让用户知道是哪个工序的问题.
  52. """
  53. leaves = load_type_leaves()
  54. hints: list[str] = []
  55. for proc_label, proc in _iter_procedures(case_data):
  56. type_reg = proc.get('type_registry') or {}
  57. for i, step in enumerate(proc.get('steps') or []):
  58. if not isinstance(step, dict):
  59. continue
  60. for kind in ('inputs', 'outputs'):
  61. for j, item in enumerate(step.get(kind) or []):
  62. if not isinstance(item, dict):
  63. continue
  64. t = item.get('type', '') or ''
  65. if not t:
  66. continue
  67. if t in leaves:
  68. continue # 字典叶子, OK
  69. if t not in type_reg:
  70. hints.append(
  71. f"[{proc_label}] step[{i}].{kind}[{j}].type={t!r} 是 case-specific "
  72. f"但 type_registry 没注册"
  73. )
  74. else:
  75. entry = type_reg[t]
  76. if isinstance(entry, dict):
  77. if not entry.get('extends'):
  78. hints.append(f"[{proc_label}] type_registry[{t!r}] 缺 extends 字段")
  79. if not entry.get('desc'):
  80. hints.append(f"[{proc_label}] type_registry[{t!r}] 缺 desc 字段 (renderer drawer 显示需要)")
  81. return hints
  82. # ===========================================================================
  83. # Check 2: value / directive 自包含性 (禁止引用占位)
  84. # ===========================================================================
  85. # value/directive 应填数据本身, 不是 anchor 的引用. 命中即「没真正回填」.
  86. META_REF = re.compile(r'[((]?\s*同\s*s[\d]|见\s*s[\d]|←\s*s[\d]|同上')
  87. def check_value_selfcontained(case_data: dict) -> list[str]:
  88. """扫每个 IO 的 value + 每个 directive, 找「引用占位」文案 (同 sX / 见 sX / ← sX ...).
  89. spec: value 逐字回填数据本身, 引用归 anchor (workflow-format.md §2 数据流).
  90. 这种占位 schema/type 检查抓不到, 专门一条. 只 hint. 可用
  91. `wf-patch.py --resolve-passthrough` 自动从源回填.
  92. """
  93. hints: list[str] = []
  94. for proc_label, proc in _iter_procedures(case_data):
  95. for i, step in enumerate(proc.get('steps') or []):
  96. if not isinstance(step, dict):
  97. continue
  98. for kind in ('inputs', 'outputs'):
  99. for j, item in enumerate(step.get(kind) or []):
  100. if not isinstance(item, dict):
  101. continue
  102. v = item.get('value')
  103. if isinstance(v, str) and META_REF.search(v):
  104. hints.append(f"[{proc_label}] step[{i}].{kind}[{j}].value 是引用占位 {v[:24]!r} — 应逐字回填数据本身")
  105. t = step.get('directive')
  106. if isinstance(t, str) and META_REF.search(t):
  107. hints.append(f"[{proc_label}] step[{i}].directive 是引用占位 {t[:24]!r} — 应填实际 prompt 原文")
  108. return hints
  109. # ===========================================================================
  110. # Check 3: value 占位 / directive 缺失 (提示用 quote-source 回填真内容)
  111. # ===========================================================================
  112. # 纯 <...> 占位 (value 该填真实内容, <...> 仅限无文字的图/视频)
  113. PLACEHOLDER_RE = re.compile(r'^\s*<[^>]*>\s*$')
  114. # 模态分类关键词 (TEXT 优先于 MEDIA, 因 "配音文案" 这类既含媒体词又是文本)
  115. _TEXT_KW = ('提示词', '描述', '参数', '评', '大纲', '脚本', '文案', '歌词', '字幕',
  116. '标题', '正文', '词', '知识', '工作流', '对标', '规格', '批处理', '模板', '版式',
  117. '数据', '分析', '报告', '记录', '方案', '思路', '设定', '依据', '标准', '清单', '列表', '文本', '文字')
  118. _MEDIA_KW = ('图', '视频', '音频', '帧', '片段', '截图', '蒙版', '音效', '配音', 'BGM',
  119. '数字人', '滤镜', '海报', '封面')
  120. def _type_modality(type_name: str, type_reg: dict) -> str:
  121. """按类型名(case-specific 类型先经 type_registry.extends 解析到 stdlib 叶子)判模态.
  122. 返回 'media' (图/视频/音频 — 可 <描述>) / 'text' (提示词/数据/报告 — 必须真实文本) / 'unknown'.
  123. media 用关键词可靠识别; 非 media 一律按"需真实文本"对待 (data/text 占多数, 宁严勿漏).
  124. """
  125. base, seen = type_name, set()
  126. while base in (type_reg or {}) and base not in seen:
  127. seen.add(base)
  128. ent = type_reg[base]
  129. ext = ent.get('extends') if isinstance(ent, dict) else None
  130. if not ext:
  131. break
  132. base = ext
  133. nm = base or type_name or ''
  134. if any(k in nm for k in _TEXT_KW):
  135. return 'text'
  136. if any(k in nm for k in _MEDIA_KW):
  137. return 'media'
  138. return 'unknown'
  139. def check_placeholder_content(case_data: dict) -> list[str]:
  140. """逐 IO 按模态审计 value + 工具步骤 directive → 提示用 quote-source 回填真内容.
  141. 规则 (phase1 §value): 文本类 IO(提示词/数据/报告)的 value 必须是从原文匹配到的真实内容,
  142. 不能写 <…> 占位; 原文确实没有 → 标 inferred:true + inferred_reason 显式说明 (本检查放行).
  143. 媒体类 IO(图/视频/音频)允许 <具体描述>. 工具步骤(via 是具体工具)必须带原文那段 prompt 当 directive.
  144. 弱模型常把所有 value 写成 <…> 占位、整个漏 directive (实测 test-7 全踩), 这条逐 IO 抓, 只 hint.
  145. """
  146. hints: list[str] = []
  147. for proc_label, proc in _iter_procedures(case_data):
  148. type_reg = proc.get('type_registry') or {}
  149. for i, step in enumerate(proc.get('steps') or []):
  150. if not isinstance(step, dict):
  151. continue
  152. for kind in ('inputs', 'outputs'):
  153. for j, item in enumerate(step.get(kind) or []):
  154. if not isinstance(item, dict):
  155. continue
  156. if item.get('inferred'): # 已显式标 inferred 说明 → 放行
  157. continue
  158. v = item.get('value')
  159. if not isinstance(v, str):
  160. continue
  161. if re.search(r'原文(未提供|未给出|没有|无)', v):
  162. continue # 显式标「原文未提供」→ 放行 (LLM 确认原文确无)
  163. if not PLACEHOLDER_RE.match(v):
  164. continue # value 不是 <…> 占位 (已填真内容)
  165. t = item.get('type', '') or ''
  166. mod = _type_modality(t, type_reg)
  167. if mod == 'media':
  168. continue # 图/视频/音频 用 <描述> 合理
  169. label = '文本类' if mod == 'text' else '非媒体(疑似数据/文本)'
  170. # 输出占位 = 步骤产出物没回填; 原文/OCR 里通常紧跟在 prompt 后展示了它
  171. extra = (';这是步骤**产出物**, 原文/配图 OCR 里常紧跟 prompt 展示了它, '
  172. '用 quote-source --from/--to 把那段产出也捞进 value') if kind == 'outputs' else ''
  173. hints.append(
  174. f"[{proc_label}] step[{i}].{kind}[{j}] type={t!r}({label}) value={v.strip()!r} 仍是占位 "
  175. f"—— 你即便已 quote 到原文也**必须把真实内容替换进 value**(别只填 directive){extra}; "
  176. f"原文确无则标 inferred:true + inferred_reason; 若其实是无文字图/视频, 让类型/描述体现"
  177. )
  178. via = (step.get('via') or '').strip()
  179. directive = (step.get('directive') or '').strip()
  180. if step.get('kind', 'step') == 'step' and via and via not in ('human', '-') and not directive:
  181. hints.append(
  182. f"[{proc_label}] step[{i}](via={via!r}) directive 空 — 若原文有给工具的提示词/指令, "
  183. f"用 quote-source 捞原文那段填进 directive"
  184. )
  185. # substance/form 缺失 (Phase 2 该提炼实质/形式; 纯技术步可显式设 null, 但别整个漏掉 key)
  186. if step.get('kind', 'step') in ('step', 'nested'):
  187. miss = [f for f in ('substance', 'form') if f not in step]
  188. if miss:
  189. hints.append(
  190. f"[{proc_label}] step[{i}] 缺 {'/'.join(miss)} — Phase 2 漏做了实质/形式提炼; "
  191. f"读懂这步内容提炼元素点填上(纯技术步可显式设 null, 但别漏掉字段)"
  192. )
  193. # intent 缺失 (Phase 2 每步都要填目的列, 一句话概括)
  194. if step.get('kind', 'step') in ('step', 'block', 'nested') and not (step.get('intent') or '').strip():
  195. hints.append(
  196. f"[{proc_label}] step[{i}] 缺 intent — Phase 2 每步都要填目的列(一句话概括这步在做什么, ≤25 字)"
  197. )
  198. return hints
  199. # ===========================================================================
  200. # Check 4: 章节覆盖 (结构强制 — 需 --source) + value 逐字 (值强制 — 需 --source)
  201. # ===========================================================================
  202. #
  203. # 弱模型在 Phase 1 骨架阶段走两条最省力的路, 都靠"看原文"才抓得到:
  204. # (结构) 只挑两个最显眼的工序就收工, 整段章节(框架/附加案例/总结)漏抽
  205. # (值) 挑中的 value 也打字缩写成标题纲要, 不是逐字原文 (能过 render 门禁因为不是 <占位>)
  206. # 这两条 check 都需要原文 (--source input/case-N.json [--ocr ocr.txt]) 才能比对.
  207. # 比对噪声: 空白 + 各式引号 (原文 “”、骨架常写成 「」/"",内容一致只是引号风格不同, 不该算缩写)
  208. _QUOTE_NOISE = dict.fromkeys(map(ord, '「」『』“”‘’"\'"''), None)
  209. def _norm(s: str) -> str:
  210. """归一化用于子串比对: 去所有空白 (原文常把一个词拆到两行) + 抹掉引号风格差异."""
  211. return re.sub(r'\s+', '', s or '').translate(_QUOTE_NOISE)
  212. def _load_source_corpus(source_path: Path | None, ocr_path: Path | None) -> tuple[str, str]:
  213. """读原文语料: 返回 (raw_text, normed). raw 用来切章节, normed 用来子串比对.
  214. source = input/case-N.json 的 title + body_text; ocr = 配图 OCR 文本 (可选).
  215. """
  216. parts: list[str] = []
  217. if source_path and source_path.exists():
  218. try:
  219. sd = json.loads(source_path.read_text(encoding='utf-8'))
  220. parts.append(sd.get('title', '') or '')
  221. parts.append(sd.get('body_text', '') or sd.get('content', '') or '')
  222. except Exception:
  223. parts.append(source_path.read_text(encoding='utf-8'))
  224. if ocr_path and ocr_path.exists():
  225. parts.append(ocr_path.read_text(encoding='utf-8'))
  226. raw = '\n'.join(parts)
  227. return raw, _norm(raw)
  228. def _sections(body: str) -> list[tuple[str, str, str]]:
  229. """切原文章节: 按行首 `0N |` 标号 (排除 `图 0N |` 配图说明). 返回 [(号, 标题, 正文段)]."""
  230. marks = [(m.start(), m.group(1)) for m in re.finditer(r'(?m)^\s*(0\d)\s*[||]', body)]
  231. out: list[tuple[str, str, str]] = []
  232. for idx, (pos, num) in enumerate(marks):
  233. end = marks[idx + 1][0] if idx + 1 < len(marks) else len(body)
  234. seg = body[pos:end]
  235. after = re.split(r'[||]', seg, 1)
  236. tail = after[-1] if len(after) > 1 else seg
  237. title = ''
  238. for line in tail.splitlines():
  239. line = line.strip()
  240. if line:
  241. title = line[:24]
  242. break
  243. out.append((num, title, seg))
  244. return out
  245. # 章节正文里的"要点标记": 思路X / 第X层 / 第X步 / 案例X / 冒号短标签 (人物特征:…)
  246. _POINT_MARKER = re.compile(
  247. r'(?m)^\s*(思路[一二三四五]|第[一二三四五六七八九十]+[层步]|案例[一二三四五六七八九十]+)')
  248. _POINT_COLON = re.compile(r'(?m)^\s*([^\n::((]{2,12})\s*[::]')
  249. def _section_points(seg: str) -> list[str]:
  250. """抽一节正文的要点短语 (用来量化它被骨架覆盖了多少)."""
  251. pts: list[str] = []
  252. for m in _POINT_MARKER.finditer(seg):
  253. line = seg[m.start():].splitlines()[0].strip()
  254. pts.append(line[:16])
  255. for m in _POINT_COLON.finditer(seg):
  256. lab = m.group(1).strip()
  257. # 纯序号标记 (第X步/第X层/思路X/案例X) 是结构序号不是内容要点, 骨架改写成动作后必然对不上 → 跳过
  258. if re.fullmatch(r'(思路[一二三四五]|第[一二三四五六七八九十]+[层步]|案例[一二三四五六七八九十]+)', lab):
  259. continue
  260. if re.search(r'[一-龥]', lab):
  261. pts.append(lab)
  262. seen: set[str] = set()
  263. out: list[str] = []
  264. for p in pts:
  265. if p and p not in seen:
  266. seen.add(p)
  267. out.append(p)
  268. return out
  269. def _point_covered(point: str, wf_norm: str) -> bool:
  270. """要点是否被骨架覆盖: 去掉结构前缀后, 任一 4-gram 命中 workflow 文本即算覆盖 (从宽)."""
  271. core = re.sub(r'^(思路[一二三四五]|第[一二三四五六七八九十]+[层步]|案例[一二三四五六七八九十]+)', '', point)
  272. core = _norm(core) or _norm(point)
  273. if len(core) < 4:
  274. return core in wf_norm
  275. return any(core[k:k + 4] in wf_norm for k in range(len(core) - 3))
  276. def check_section_coverage(case_data: dict, source_raw: str, wf_norm: str) -> list[str]:
  277. """结构强制: 逐章节算骨架覆盖率, 整段漏抽的章节 (<40%) 报出来 + 给缺失要点样例."""
  278. hints: list[str] = []
  279. secs = _sections(source_raw)
  280. if not secs:
  281. return hints
  282. for num, title, seg in secs:
  283. pts = _section_points(seg)
  284. if len(pts) < 2:
  285. continue # 没足够要点 (纯过渡/口号段), 不评判
  286. missed = [p for p in pts if not _point_covered(p, wf_norm)]
  287. ratio = 1 - len(missed) / len(pts)
  288. if ratio < 0.40:
  289. sample = '、'.join(missed[:5])
  290. hints.append(
  291. f"章节『{num} {title}』覆盖率 {ratio:.0%} ({len(pts) - len(missed)}/{len(pts)} 要点) "
  292. f"—— 疑似整段漏抽; 缺: {sample}{' …' if len(missed) > 5 else ''}; "
  293. f"回去为它补 procedure/step (每个 0N 章节至少对应一个工序或子步骤)"
  294. )
  295. return hints
  296. def _longest_run(v_norm: str, source_norm: str) -> int:
  297. """value 在原文里能连续命中的最长子串长度. 逐字原文应是原文一整段连续文本;
  298. 拼接/缩写出来的(把分散的小标题用、串起来)最长连续命中会很短."""
  299. n = len(v_norm)
  300. best = 0
  301. for i in range(n):
  302. if n - i <= best:
  303. break # 剩余长度已不可能超过 best
  304. lo, hi = 0, n - i
  305. while lo < hi: # 二分该起点能命中的最长长度
  306. mid = (lo + hi + 1) // 2
  307. if v_norm[i:i + mid] in source_norm:
  308. lo = mid
  309. else:
  310. hi = mid - 1
  311. if lo > best:
  312. best = lo
  313. return best
  314. def check_value_verbatim(case_data: dict, source_norm: str) -> list[str]:
  315. """值强制: 文本类 value 必须是原文里的「一整段连续文本」. 最长连续命中 <80% 判缩写/改写/截断.
  316. 跳过: 占位<…>(归 check3)、inferred、原文未提供、未 resolve 的 @quote、媒体类、短值(<12字).
  317. 用最长连续命中而非逐子句覆盖: 后者会被「人物、产品、环境」这种"原文小标题拼盘"骗过
  318. (每个词单独在原文里, 但整体不是任何一段原文 — 真正的逐字细节全被丢了).
  319. """
  320. hints: list[str] = []
  321. if not source_norm:
  322. return hints
  323. for proc_label, proc in _iter_procedures(case_data):
  324. type_reg = proc.get('type_registry') or {}
  325. for i, step in enumerate(proc.get('steps') or []):
  326. if not isinstance(step, dict):
  327. continue
  328. for kind in ('inputs', 'outputs'):
  329. for j, item in enumerate(step.get(kind) or []):
  330. if not isinstance(item, dict):
  331. continue
  332. if item.get('inferred'):
  333. continue
  334. v = item.get('value')
  335. if not isinstance(v, str) or not v.strip():
  336. continue
  337. if PLACEHOLDER_RE.match(v):
  338. continue # 占位 → check3 管
  339. if v.startswith('@quote'):
  340. continue # 未回填的 quote, resolve 后才比
  341. if re.search(r'原文(未提供|未给出|没有|无)', v):
  342. continue
  343. if _type_modality(item.get('type', '') or '', type_reg) == 'media':
  344. continue # 媒体描述不要求逐字
  345. vn = _norm(v)
  346. if len(vn) < 12:
  347. continue # 短标签不查 (无所谓缩写)
  348. run = _longest_run(vn, source_norm)
  349. ratio = run / len(vn)
  350. # 绝对护栏: 连续命中 ≥80 字 = 铁证级真引用 (没人会"凑巧"逐字打 80 字),
  351. # 哪怕整体比例因中途一处微小偏差掉到 80% 也放行, 只抓"短值缩写"。
  352. if run >= 80:
  353. continue
  354. if ratio < 0.80:
  355. hints.append(
  356. f"[{proc_label}] step[{i}].{kind}[{j}] value 最长连续命中原文仅 {run}/{len(vn)} 字"
  357. f"({ratio:.0%}) —— 整体不是一整段原文(疑似开头逐字后就缩写/改写); value={v[:40]!r}…; "
  358. f"用 @quote|起锚|止锚 + wf-patch --resolve-quotes 把整段原文逐字拽进来(原文那段提示词约 350 字)"
  359. )
  360. return hints
  361. # ===========================================================================
  362. # Side effect: record 新 type 到 type_suggestions.md
  363. # ===========================================================================
  364. def record_new_types(case_data: dict, suggestions_path: Path = SUGGESTIONS) -> list[str]:
  365. """把 case_data.type_registry 里的 case-specific type append 到 suggestions.
  366. 幂等: 同一 (type_name, case_id) 二元组只 append 一次. Dedup 靠 grep 现有文件,
  367. 抽 `(来自 case-{N})` + 类型名 二元组.
  368. Returns:
  369. 本次新写入的条目 list (空 list = 没新东西要 record).
  370. """
  371. # 合并所有 procedures.type_registry
  372. type_reg: dict = {}
  373. for p in case_data.get('procedures', []):
  374. type_reg.update(p.get('type_registry') or {})
  375. if not type_reg:
  376. return []
  377. leaves = load_type_leaves()
  378. case_id = case_data.get('case_id') or '?'
  379. text = suggestions_path.read_text(encoding='utf-8') if suggestions_path.exists() else ''
  380. # 已 record 过的 (type_name, case_id) — 用 regex 抓 markdown list entry
  381. existing = set(re.findall(
  382. r'^- `([^`]+)`:.*?\(来自 case-([^,)\s]+)', text, re.M
  383. ))
  384. new_lines: list[str] = []
  385. for tname, entry in type_reg.items():
  386. if not isinstance(entry, dict):
  387. continue
  388. if tname in leaves:
  389. continue # 已是字典叶子, 不是新 type (Agent 误把 stdlib type 加进 case_data.type_registry)
  390. if (tname, str(case_id)) in existing:
  391. continue
  392. ext = entry.get('extends', '?')
  393. desc = entry.get('desc') or '(无 desc)'
  394. new_lines.append(f'- `{tname}`: {desc} (来自 case-{case_id}, extends `{ext}`)')
  395. if new_lines:
  396. # 确保 suggestions 文件存在 (没有就建个空骨架)
  397. if not suggestions_path.exists():
  398. suggestions_path.write_text(
  399. '# Type 字典扩展建议\n\n## 累积条目\n\n', encoding='utf-8'
  400. )
  401. # append 末尾
  402. with suggestions_path.open('a', encoding='utf-8') as f:
  403. f.write('\n' + '\n'.join(new_lines) + '\n')
  404. return new_lines
  405. # ===========================================================================
  406. # main
  407. # ===========================================================================
  408. def main() -> None:
  409. ap = argparse.ArgumentParser(
  410. prog='lint-case.py',
  411. description='workflow 轻量 lint + 自动 record 新 type 到 type_suggestions.md',
  412. )
  413. ap.add_argument('--workflow', type=Path, required=True,
  414. help='workflow.json (含 procedures 数组). lint 内部读 procedures + type_registry')
  415. ap.add_argument('--case-id', type=str, default=None,
  416. help='record suggestions 用的 case_id. 不传就 fallback workflow.case_id 或 ?')
  417. ap.add_argument('--source', type=Path, default=None,
  418. help='原文 input/case-N.json. 传了才启用「章节覆盖」+「value 逐字」两条结构/值强制校验')
  419. ap.add_argument('--ocr', type=Path, default=None,
  420. help='配图 OCR 文本 (可选). 并入原文语料, 让逐字校验也认配图里的文字')
  421. ap.add_argument('--no-record', action='store_true',
  422. help='只校验, 不写 suggestions')
  423. args = ap.parse_args()
  424. target_path = args.workflow
  425. if not target_path.exists():
  426. print(f'lint-case: 文件不存在 {target_path}', file=sys.stderr)
  427. sys.exit(2)
  428. try:
  429. case_data = json.loads(target_path.read_text(encoding='utf-8'))
  430. except json.JSONDecodeError as e:
  431. print(f'lint-case: {target_path} 不是合法 JSON: {e}', file=sys.stderr)
  432. sys.exit(2)
  433. # workflow 模式: 注入 case_id (suggestions record 需要)
  434. if args.case_id is not None and 'case_id' not in case_data:
  435. try:
  436. case_data['case_id'] = int(args.case_id)
  437. except ValueError:
  438. case_data['case_id'] = args.case_id
  439. case_id = case_data.get('case_id', '?')
  440. print(f'[lint] case-{case_id} ({target_path.name})')
  441. # check: type 完整性提示
  442. hints = check_type_completeness(case_data)
  443. if hints:
  444. print(f' · type 完整性: {len(hints)} 个提示')
  445. for h in hints:
  446. print(f' - {h}')
  447. else:
  448. print(' · type 完整性: OK')
  449. # check: value / directive 自包含性 (引用占位)
  450. vhints = check_value_selfcontained(case_data)
  451. if vhints:
  452. print(f' · value 自包含: {len(vhints)} 个引用占位 (跑 wf-patch.py --resolve-passthrough 自动回填)')
  453. for h in vhints:
  454. print(f' - {h}')
  455. else:
  456. print(' · value 自包含: OK')
  457. # check: value 占位 / directive 缺失 (用 quote-source 回填真内容)
  458. chints = check_placeholder_content(case_data)
  459. if chints:
  460. print(f' · value/directive 真实性: {len(chints)} 处占位/缺失 (用 quote-source.py 从原文/配图 OCR 捞真内容回填)')
  461. for h in chints:
  462. print(f' - {h}')
  463. else:
  464. print(' · value/directive 真实性: OK')
  465. # check: 章节覆盖 + value 逐字 (结构/值强制 — 需 --source)
  466. if args.source is not None:
  467. source_raw, source_norm = _load_source_corpus(args.source, args.ocr)
  468. wf_norm = _norm(json.dumps(case_data, ensure_ascii=False))
  469. shints = check_section_coverage(case_data, source_raw, wf_norm)
  470. if shints:
  471. print(f' · 章节覆盖(结构强制): {len(shints)} 个章节疑似漏抽 —— 骨架要覆盖原文每个章节')
  472. for h in shints:
  473. print(f' - {h}')
  474. else:
  475. print(' · 章节覆盖(结构强制): OK (原文各章节都有对应工序)')
  476. bhints = check_value_verbatim(case_data, source_norm)
  477. if bhints:
  478. print(f' · value 逐字(值强制): {len(bhints)} 处疑似缩写/改写 —— 文本类 value 要逐字搬原文(用 @quote)')
  479. for h in bhints:
  480. print(f' - {h}')
  481. else:
  482. print(' · value 逐字(值强制): OK (文本类 value 都逐字命中原文)')
  483. else:
  484. print(' · 章节覆盖 + value 逐字: skipped (传 --source input/case-N.json [--ocr ocr.txt] 启用结构/值强制)')
  485. # side effect: record 新 type
  486. if not args.no_record:
  487. new_lines = record_new_types(case_data)
  488. if new_lines:
  489. print(f' · 已 record {len(new_lines)} 条新 type 到 {SUGGESTIONS.name}:')
  490. for ln in new_lines:
  491. print(f' {ln}')
  492. else:
  493. # 合并所有 procedure type_registry
  494. merged_reg: dict = {}
  495. for p in case_data.get('procedures', []):
  496. merged_reg.update(p.get('type_registry') or {})
  497. if not merged_reg:
  498. print(' · 无新 type 可 record (type_registry 为空 — 全部 type 命中字典叶子)')
  499. else:
  500. print(' · 无新 type 可 record (type_registry 里的项已全部 record 过)')
  501. # 不卡 exit code
  502. sys.exit(0)
  503. if __name__ == '__main__':
  504. main()