lint-case.py 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. lint-case.py — workflow.json 轻量 lint + 自动 record 新 type 到 type_suggestions.md.
  5. 设计哲学: **不严格**.
  6. - 不分 error/warning 等级, 不卡 exit code (都返 0)
  7. - 主要副作用是 record 新 type 到 spec/taxonomy/type_suggestions.md
  8. - 检测项打 stdout 给 Agent / 用户看, 决定要不要回去修
  9. 用法:
  10. python spec/tools/lint-case.py --workflow outputs/case-{N}/workflow.json --case-id {N}
  11. python spec/tools/lint-case.py --workflow outputs/case-{N}/workflow.json --case-id {N} --no-record # 只校验不写
  12. python spec/tools/lint-case.py --workflow ... --case-id {N} --json # 机器可读输出 (runner 完成度判据消费)
  13. 退出码:
  14. 0 始终 (不阻塞流程)
  15. 2 CLI 参数错误 / 文件不存在
  16. """
  17. from __future__ import annotations
  18. import argparse
  19. import json
  20. import re
  21. import sys
  22. from pathlib import Path
  23. # spec/tools/lint-case.py → procedure-dsl/
  24. DSL_ROOT = Path(__file__).resolve().parent.parent.parent
  25. TYPE_JSON = DSL_ROOT / 'spec' / 'taxonomy' / 'type.json'
  26. SUGGESTIONS = DSL_ROOT / 'spec' / 'taxonomy' / 'type_suggestions.md'
  27. # Windows 控制台 UTF-8
  28. for _s in (sys.stdout, sys.stderr):
  29. if hasattr(_s, 'reconfigure'):
  30. try:
  31. _s.reconfigure(encoding='utf-8', errors='replace')
  32. except Exception:
  33. pass
  34. def load_type_leaves() -> set[str]:
  35. """读 spec/taxonomy/type.json 的 $leaves 集合."""
  36. if not TYPE_JSON.exists():
  37. return set()
  38. return set(json.loads(TYPE_JSON.read_text(encoding='utf-8')).get('$leaves', []))
  39. # ===========================================================================
  40. # Check 1: type 完整性提示
  41. # ===========================================================================
  42. def _iter_procedures(case_data: dict):
  43. """遍历 workflow.json 的 procedures.
  44. Yields: (procedure_label, procedure_dict) — 含 steps + type_registry.
  45. """
  46. for p in case_data.get('procedures') or []:
  47. label = p.get('id') or p.get('name') or '?'
  48. yield (label, p)
  49. def check_type_completeness(case_data: dict) -> list[str]:
  50. """IO 用了 case-specific type 但 type_registry 漏写 entry → 提示.
  51. 只 hint 不 fail. Agent 看输出回去修.
  52. 多工序时, hint 前缀加 [proc_id] 让用户知道是哪个工序的问题.
  53. """
  54. leaves = load_type_leaves()
  55. hints: list[str] = []
  56. for proc_label, proc in _iter_procedures(case_data):
  57. type_reg = proc.get('type_registry') or {}
  58. for i, step in enumerate(proc.get('steps') or []):
  59. if not isinstance(step, dict):
  60. continue
  61. for kind in ('inputs', 'outputs'):
  62. for j, item in enumerate(step.get(kind) or []):
  63. if not isinstance(item, dict):
  64. continue
  65. t = item.get('type', '') or ''
  66. if not t:
  67. continue
  68. if t in leaves:
  69. continue # 字典叶子, OK
  70. if t not in type_reg:
  71. hints.append(
  72. f"[{proc_label}] step[{i}].{kind}[{j}].type={t!r} 是 case-specific "
  73. f"但 type_registry 没注册"
  74. )
  75. else:
  76. entry = type_reg[t]
  77. if isinstance(entry, dict):
  78. if not entry.get('extends'):
  79. hints.append(f"[{proc_label}] type_registry[{t!r}] 缺 extends 字段")
  80. if not entry.get('desc'):
  81. hints.append(f"[{proc_label}] type_registry[{t!r}] 缺 desc 字段 (renderer drawer 显示需要)")
  82. return hints
  83. # ===========================================================================
  84. # Check 2: value / directive 自包含性 (禁止引用占位)
  85. # ===========================================================================
  86. # value/directive 应填数据本身, 不是 anchor 的引用. 命中即「没真正回填」.
  87. META_REF = re.compile(r'[((]?\s*同\s*s[\d]|见\s*s[\d]|←\s*s[\d]|同上')
  88. def check_value_selfcontained(case_data: dict) -> list[str]:
  89. """扫每个 IO 的 value + 每个 directive, 找「引用占位」文案 (同 sX / 见 sX / ← sX ...).
  90. spec: value 逐字回填数据本身, 引用归 anchor (README「第二阶段 · 2.0.2 连数据流」).
  91. 这种占位 schema/type 检查抓不到, 专门一条. 只 hint. 可用
  92. `wf-patch.py --resolve-passthrough` 自动从源回填.
  93. """
  94. hints: list[str] = []
  95. for proc_label, proc in _iter_procedures(case_data):
  96. for i, step in enumerate(proc.get('steps') or []):
  97. if not isinstance(step, dict):
  98. continue
  99. for kind in ('inputs', 'outputs'):
  100. for j, item in enumerate(step.get(kind) or []):
  101. if not isinstance(item, dict):
  102. continue
  103. v = item.get('value')
  104. if isinstance(v, str) and META_REF.search(v):
  105. hints.append(f"[{proc_label}] step[{i}].{kind}[{j}].value 是引用占位 {v[:24]!r} — 应逐字回填数据本身")
  106. t = step.get('directive')
  107. if isinstance(t, str) and META_REF.search(t):
  108. hints.append(f"[{proc_label}] step[{i}].directive 是引用占位 {t[:24]!r} — 应填实际 prompt 原文")
  109. return hints
  110. # ===========================================================================
  111. # Check 2b: anchor 闭合 (透传输入回填了没 + anchor 格式对不对)
  112. # ===========================================================================
  113. # JSON 路径式 anchor (错): ← p1.s1.outputs[0] / ← s3.inputs[1] / ...outputs[0].id;
  114. # 正确写法是输出**编号** ← s1o1 (或 ← 工序输入 / ← sNoM[i])。只认 .outputs[/.inputs[ 这种
  115. # 明确的路径序列化, 不误伤 ← s2.正向提示词 这类按名引用、← s5o1[-1] 这类带索引的合法编号。
  116. _ANCHOR_JSONPATH = re.compile(r'\.(?:outputs|inputs)\[')
  117. def check_anchor_closure(case_data: dict) -> list[str]:
  118. """透传输入(带 ← anchor)有没有真把 value/type 回填 + anchor 是不是写成了 JSON 路径。
  119. case-2-test-1 暴露的静默丢数据: 模型给输入设了 anchor 却——
  120. A. 漏跑 `wf-patch --resolve-passthrough` → value/type 一直空, verify/lint 当时都没拦;
  121. B. anchor 写成 JSON 路径 `← p1.s1.outputs[0]` 而非编号 `← s1o1` → resolve 永远匹配不上。
  122. 两者都是**确定性**判断(空/非空、是不是路径), 适合做硬门禁(render 前也跑, 逼回填)。
  123. 只查输入(透传方向 ←); 按名引用 `← s2.正向提示词`、带索引 `← s5o1[-1]` 都放行。
  124. """
  125. issues: list[str] = []
  126. for proc_label, proc in _iter_procedures(case_data):
  127. for i, step in enumerate(proc.get('steps') or []):
  128. if not isinstance(step, dict):
  129. continue
  130. for j, io in enumerate(step.get('inputs') or []):
  131. if not isinstance(io, dict) or io.get('inferred'):
  132. continue
  133. anchor = (io.get('anchor') or '').strip()
  134. if not anchor.startswith('←'):
  135. continue
  136. ref = anchor[1:].strip()
  137. # B. JSON 路径式 anchor (resolve 永远匹配不上)
  138. if _ANCHOR_JSONPATH.search(ref):
  139. issues.append(
  140. f"[{proc_label}] step[{i}].inputs[{j}] anchor={anchor[:32]!r} 是 JSON 路径写法 — "
  141. f"数据流来源要用**输出编号**(如 ← s1o1), 不是 ← p1.s1.outputs[0]; "
  142. f"否则 --resolve-passthrough 匹配不到、value 永远空")
  143. continue
  144. # 外部/工序输入 (← 工序输入 / ← 输入) 是参数, 上游无 step 输出可抄, value 可空 → 不强制
  145. ref_base = ref.split('[')[0].strip()
  146. if ref_base in ('工序输入', '输入') or ref_base.startswith('工序输入'):
  147. continue
  148. # A. 引用了上游 step 输出却 value/type 空 = 透传没回填
  149. v, t = io.get('value'), io.get('type')
  150. v_empty = v is None or (isinstance(v, str) and not v.strip())
  151. t_empty = t is None or (isinstance(t, str) and not t.strip())
  152. miss = [n for n, e in (('value', v_empty), ('type', t_empty)) if e]
  153. if miss:
  154. issues.append(
  155. f"[{proc_label}] step[{i}].inputs[{j}] 有 ← anchor({anchor[:20]!r}) 但 {'/'.join(miss)} 空 — "
  156. f"透传没回填: 跑 `wf-patch.py --resolve-passthrough` 顺编号自动抄上游内容, "
  157. f"或确认 anchor 指向的输出本身非空")
  158. return issues
  159. def check_skeleton_filled(case_data: dict) -> list[str]:
  160. """Phase 1 干骨架(via/value/anchor 空)必须由 Phase 2.0 填满, 这些空字段不该活到 render。
  161. case-2-test-2 暴露的「填充整体没做」: 步骤 via 空、输入 value 和 anchor 都空——schema 把它们
  162. 声明成无 minLength 的 string, 空串合法; 占位门禁只抓 <占位>; anchor 门禁只查带 ← 的输入,
  163. 于是「连 ← 都没有、value 也空」的纯骨架残留一路漏到成品。这条专补这个洞:
  164. - kind=step/nested 的 via 空 = 步骤没工具(控制块 kind=block 用 via='-' 合法, 不算);
  165. - kind=step/nested 的 inputs / outputs **数组为空** = 步骤没有输入或没有产物
  166. (README: 步骤=对已有数据执行操作产生新产物 — 没 IO 的"步骤"是骨架洞, 不是步骤;
  167. ⚠ 只查每条 IO 的质量会教模型"删掉报错的条目"过关, 所以必须同时查条目存在性);
  168. - IO 的 type 空 = 没标签 (Phase 1 就该有粗略标签);
  169. - 输入 value 和 anchor 都空 = 既无内容(字面量)也无来源(数据流);
  170. - 输出 value 空 = 这步没产物。输出的 → anchor 是去处不是内容, 所以输出**必须有 value**
  171. (文本类逐字内容 / 媒体类 <描述>), 没有"用 anchor 顶替"的退路。
  172. 都是确定性判断, 适合做硬门禁。inferred IO 豁免 (条目级); 整条 IO 确实不存在时
  173. 用 inferred:true 的条目显式补上, 不允许留空数组。
  174. """
  175. issues: list[str] = []
  176. for proc_label, proc in _iter_procedures(case_data):
  177. for i, step in enumerate(proc.get('steps') or []):
  178. if not isinstance(step, dict):
  179. continue
  180. kind = step.get('kind', 'step')
  181. via = (step.get('via') or '').strip()
  182. if kind in ('step', 'nested') and not via:
  183. issues.append(
  184. f"[{proc_label}] step[{i}](id={step.get('id')}) via 空 — 步骤要写用的工具"
  185. f"(如 nano_banana / human / 剪映); 只有控制块 kind=block 才用 via='-'")
  186. if kind in ('step', 'nested'):
  187. for io_kind, label in (('inputs', '输入'), ('outputs', '输出')):
  188. if not step.get(io_kind):
  189. issues.append(
  190. f"[{proc_label}] step[{i}](id={step.get('id')}) {io_kind} 为空数组 — "
  191. f"步骤必有{label}(对已有数据操作→产生新产物); 用 wf-patch 补上这条 IO "
  192. f"(原文没明写就按工艺推断, 标 inferred:true + inferred_reason), **不要**为过校验而删 IO")
  193. for io_kind in ('inputs', 'outputs'):
  194. for j, io in enumerate(step.get(io_kind) or []):
  195. if isinstance(io, dict) and not str(io.get('type') or '').strip():
  196. issues.append(
  197. f"[{proc_label}] step[{i}].{io_kind}[{j}] type 空 — 每个 IO 都要有类型标签"
  198. f"(Phase 1 粗略标签即可, Phase 2 归一到词表)")
  199. for j, io in enumerate(step.get('inputs') or []):
  200. if not isinstance(io, dict) or io.get('inferred'):
  201. continue
  202. v, a = io.get('value'), (io.get('anchor') or '').strip()
  203. v_empty = v is None or (isinstance(v, str) and not v.strip())
  204. if v_empty and not a:
  205. issues.append(
  206. f"[{proc_label}] step[{i}].inputs[{j}] type={io.get('type', '')!r} 的 value 和 anchor 都空 — "
  207. f"输入要么填字面量 value(@quote 拽原文), 要么用 anchor ← 上游编号 引数据流; 二者必有其一")
  208. for j, io in enumerate(step.get('outputs') or []):
  209. if not isinstance(io, dict) or io.get('inferred'):
  210. continue
  211. v = io.get('value')
  212. if v is None or (isinstance(v, str) and not v.strip()):
  213. issues.append(
  214. f"[{proc_label}] step[{i}].outputs[{j}] type={io.get('type', '')!r} 的 value 空 — "
  215. f"输出是这步的产物, 必须有值: 文本类填逐字内容、媒体类填 <描述>; "
  216. f"原文确无则用 <占位>(原文未提供) 或标 inferred:true")
  217. return issues
  218. def check_dataflow_connected(case_data: dict) -> list[str]:
  219. """多步工序却一个 anchor 都没有 = 2.0.2 连数据流整段没做。
  220. 弱模型常把内容当字面量塞进每个 IO、步骤间不连任何 ← / →; 渲染出来"来源/去处"全空,
  221. 去处也无从反推(反推需要输入 ← 作来源)。一个真·工序是一条数据流水线, ≥2 步必有跨步传递,
  222. 所以"≥2 步 & 0 anchor"是确定性的"流程没连"信号 (单步工序豁免; 任何非空 anchor 都算已连)。
  223. """
  224. issues: list[str] = []
  225. for proc_label, proc in _iter_procedures(case_data):
  226. steps = [s for s in (proc.get('steps') or []) if isinstance(s, dict)]
  227. if len(steps) < 2:
  228. continue
  229. n_anchor = sum(
  230. 1 for s in steps for k in ('inputs', 'outputs')
  231. for io in (s.get(k) or [])
  232. if isinstance(io, dict) and (io.get('anchor') or '').strip())
  233. if n_anchor == 0:
  234. issues.append(
  235. f"[{proc_label}] {len(steps)} 个步骤却 0 个 anchor — 数据流(2.0.2 连来源/去处)整段没做: "
  236. f"下游输入用 `← 上游输出编号`(如 ← s1o1)引数据、别把内容当字面量重抄; "
  237. f"至少把工序内的传递链连起来, 否则 HTML 里来源/去处全空")
  238. return issues
  239. # ===========================================================================
  240. # Check 3: value 占位 / directive 缺失 (提示用 quote-source 回填真内容)
  241. # ===========================================================================
  242. # 纯 <...> 占位 (value 该填真实内容, <...> 仅限无文字的图/视频)
  243. PLACEHOLDER_RE = re.compile(r'^\s*<[^>]*>\s*$')
  244. # 「原文确无该信息」逃生标记 → 占位/逐字检查放行 (等同 inferred)。常见措辞都要认:
  245. # 原文未提供 / 原文确无 / 原文中没有 / 原帖里无 …。render-case.py 的 _NOSRC_RE 与此保持一致。
  246. NOSRC_RE = re.compile(r'原[文帖].{0,2}(未提供|未给出|未写|没有|没写|确无|无)')
  247. # 模态分类关键词 (TEXT 优先于 MEDIA, 因 "配音文案" 这类既含媒体词又是文本)
  248. _TEXT_KW = ('提示词', '描述', '参数', '评', '大纲', '脚本', '文案', '歌词', '字幕',
  249. '标题', '正文', '词', '知识', '工作流', '对标', '规格', '批处理', '模板', '版式',
  250. '数据', '分析', '报告', '记录', '方案', '思路', '设定', '依据', '标准', '清单', '列表', '文本', '文字')
  251. _MEDIA_KW = ('图', '视频', '音频', '帧', '片段', '截图', '蒙版', '音效', '配音', 'BGM',
  252. '数字人', '滤镜', '海报', '封面')
  253. def _type_modality(type_name: str, type_reg: dict) -> str:
  254. """按类型名(case-specific 类型先经 type_registry.extends 解析到 stdlib 叶子)判模态.
  255. 返回 'media' (图/视频/音频 — 可 <描述>) / 'text' (提示词/数据/报告 — 必须真实文本) / 'unknown'.
  256. media 用关键词可靠识别; 非 media 一律按"需真实文本"对待 (data/text 占多数, 宁严勿漏).
  257. """
  258. base, seen = type_name, set()
  259. while base in (type_reg or {}) and base not in seen:
  260. seen.add(base)
  261. ent = type_reg[base]
  262. ext = ent.get('extends') if isinstance(ent, dict) else None
  263. if not ext:
  264. break
  265. base = ext
  266. nm = base or type_name or ''
  267. if any(k in nm for k in _TEXT_KW):
  268. return 'text'
  269. if any(k in nm for k in _MEDIA_KW):
  270. return 'media'
  271. return 'unknown'
  272. def check_placeholder_content(case_data: dict) -> list[str]:
  273. """逐 IO 按模态审计 value + 工具步骤 directive → 提示用 quote-source 回填真内容.
  274. 规则 (README「第二阶段 · 2.0.1 填 value」): 文本类 IO(提示词/数据/报告)的 value 必须是从原文匹配到的真实内容,
  275. 不能写 <…> 占位; 原文确实没有 → 标 inferred:true + inferred_reason 显式说明 (本检查放行).
  276. 媒体类 IO(图/视频/音频)允许 <具体描述>. 工具步骤(via 是具体工具)必须带原文那段 prompt 当 directive.
  277. 弱模型常把所有 value 写成 <…> 占位、整个漏 directive (实测 test-7 全踩), 这条逐 IO 抓, 只 hint.
  278. """
  279. hints: list[str] = []
  280. for proc_label, proc in _iter_procedures(case_data):
  281. type_reg = proc.get('type_registry') or {}
  282. for i, step in enumerate(proc.get('steps') or []):
  283. if not isinstance(step, dict):
  284. continue
  285. for kind in ('inputs', 'outputs'):
  286. for j, item in enumerate(step.get(kind) or []):
  287. if not isinstance(item, dict):
  288. continue
  289. if item.get('inferred'): # 已显式标 inferred 说明 → 放行
  290. continue
  291. v = item.get('value')
  292. if not isinstance(v, str):
  293. continue
  294. if NOSRC_RE.search(v):
  295. continue # 显式标「原文未提供/确无」→ 放行 (LLM 确认原文确无)
  296. if not PLACEHOLDER_RE.match(v):
  297. continue # value 不是 <…> 占位 (已填真内容)
  298. t = item.get('type', '') or ''
  299. mod = _type_modality(t, type_reg)
  300. if mod == 'media':
  301. continue # 图/视频/音频 用 <描述> 合理
  302. label = '文本类' if mod == 'text' else '非媒体(疑似数据/文本)'
  303. # 输出占位 = 步骤产出物没回填; 原文/OCR 里通常紧跟在 prompt 后展示了它
  304. extra = (';这是步骤**产出物**, 原文/配图 OCR 里常紧跟 prompt 展示了它, '
  305. '用 quote-source --from/--to 把那段产出也捞进 value') if kind == 'outputs' else ''
  306. hints.append(
  307. f"[{proc_label}] step[{i}].{kind}[{j}] type={t!r}({label}) value={v.strip()!r} 仍是占位 "
  308. f"—— 你即便已 quote 到原文也**必须把真实内容替换进 value**(别只填 directive){extra}; "
  309. f"原文确无则标 inferred:true + inferred_reason; 若其实是无文字图/视频, 让类型/描述体现"
  310. )
  311. via = (step.get('via') or '').strip()
  312. directive = (step.get('directive') or '').strip()
  313. if step.get('kind', 'step') == 'step' and via and via not in ('human', '-') and not directive:
  314. hints.append(
  315. f"[{proc_label}] step[{i}](via={via!r}) directive 空 — 若原文有给工具的提示词/指令, "
  316. f"用 quote-source 捞原文那段填进 directive"
  317. )
  318. # substance/form 缺失 (Phase 2 该提炼实质/形式; 纯技术步可显式设 null, 但别整个漏掉 key)
  319. if step.get('kind', 'step') in ('step', 'nested'):
  320. miss = [f for f in ('substance', 'form') if f not in step]
  321. if miss:
  322. hints.append(
  323. f"[{proc_label}] step[{i}] 缺 {'/'.join(miss)} — Phase 2 漏做了实质/形式提炼; "
  324. f"读懂这步内容提炼元素点填上(纯技术步可显式设 null, 但别漏掉字段)"
  325. )
  326. # intent 缺失 (Phase 2 每步都要填目的列, 一句话概括)
  327. if step.get('kind', 'step') in ('step', 'block', 'nested') and not (step.get('intent') or '').strip():
  328. hints.append(
  329. f"[{proc_label}] step[{i}] 缺 intent — Phase 2 每步都要填目的列(一句话概括这步在做什么, ≤25 字)"
  330. )
  331. return hints
  332. # ===========================================================================
  333. # Check 3a: 未解析的 @quote 标记残留
  334. # ===========================================================================
  335. def check_unresolved_quotes(case_data: dict) -> list[str]:
  336. """value/directive 里残留的 `@quote|起锚|止锚` 标记 → 报出 (确定性, render 前必须清零).
  337. @quote 是喂给 wf-patch --resolve-quotes 的**中间态**: 锚点没匹配上时 wf-patch 只 ⚠ 警告、
  338. 标记原样留下。它不是 <占位>(占位门禁不认)、逐字检查又显式跳过它(resolve 后才比) ——
  339. 之前没有任何检查管它, 会一路漏进 HTML 当正文显示。修法: 改锚点(两段独特短串, 来自原文/OCR
  340. 逐字)重跑 --resolve-quotes, 或放弃标记直接填真实内容。
  341. """
  342. issues: list[str] = []
  343. for proc_label, proc in _iter_procedures(case_data):
  344. for i, step in enumerate(proc.get('steps') or []):
  345. if not isinstance(step, dict):
  346. continue
  347. for kind in ('inputs', 'outputs'):
  348. for j, io in enumerate(step.get(kind) or []):
  349. if not isinstance(io, dict):
  350. continue
  351. v = io.get('value')
  352. if isinstance(v, str) and v.lstrip().startswith('@quote'):
  353. issues.append(
  354. f"[{proc_label}] step[{i}].{kind}[{j}] value 是未解析的 @quote 标记 "
  355. f"({v.strip()[:48]!r}…) — 锚点没匹配上原文/OCR; 改锚点重跑 "
  356. f"`wf-patch --resolve-quotes --source <原文> [--ocr <ocr.txt>]`, 或直接填真实内容")
  357. d = step.get('directive')
  358. if isinstance(d, str) and d.lstrip().startswith('@quote'):
  359. issues.append(
  360. f"[{proc_label}] step[{i}].directive 是未解析的 @quote 标记 ({d.strip()[:48]!r}…) — "
  361. f"改锚点重跑 --resolve-quotes, 或直接填真实内容")
  362. return issues
  363. # ===========================================================================
  364. # Check 3b: 归类完成度 (Phase 2.1 做完没 — effect/action 填了、intent 是标记格式)
  365. # ===========================================================================
  366. # intent 标记格式 (README「目的列」): 合法标记类别 effect/via/act/in-type/out-type
  367. _INTENT_MARKERS = ('in-type:', 'out-type:', 'act:', 'via:', 'effect:')
  368. def check_classification_done(case_data: dict) -> list[str]:
  369. """Phase 2.1 归类是否做完: 每个非控制块步骤要有 effect+action; 有 IO 的步骤 intent 要用标记格式.
  370. runner 的自动续跑兜底用这条判「还差什么」(via --json), 规则只活在这里, 别在 runner 里复刻.
  371. """
  372. missing: list[str] = []
  373. intent_bad: list[str] = []
  374. for proc_label, proc in _iter_procedures(case_data):
  375. for s in proc.get('steps') or []:
  376. if not isinstance(s, dict) or s.get('kind') == 'block':
  377. continue # 控制块不要求 effect/action
  378. sid = f"{proc_label}.{s.get('id')}"
  379. if not (s.get('effect') or '').strip() or not (s.get('action') or '').strip():
  380. missing.append(sid)
  381. has_io = bool(s.get('inputs') or s.get('outputs'))
  382. intent = (s.get('intent') or '').strip()
  383. if has_io and (not intent or '{' not in intent
  384. or not any(m in intent for m in _INTENT_MARKERS)):
  385. intent_bad.append(sid)
  386. hints: list[str] = []
  387. if missing:
  388. hints.append(f"{len(missing)} 个步骤缺 effect/action (Phase 2.1 没做完): "
  389. f"{', '.join(missing[:8])}{' …' if len(missing) > 8 else ''}")
  390. if intent_bad:
  391. hints.append(f"{len(intent_bad)} 个步骤的 intent 没用标记格式 (README「目的列」: 写成带 "
  392. f"{{in-type:X}}/{{out-type:Y}}/{{act:Z}} 的句子): "
  393. f"{', '.join(intent_bad[:8])}{' …' if len(intent_bad) > 8 else ''}")
  394. return hints
  395. # ===========================================================================
  396. # Check 4: 章节覆盖 (结构强制 — 需 --source) + value 逐字 (值强制 — 需 --source)
  397. # ===========================================================================
  398. #
  399. # 弱模型在 Phase 1 骨架阶段走两条最省力的路, 都靠"看原文"才抓得到:
  400. # (结构) 只挑两个最显眼的工序就收工, 整段章节(框架/附加案例/总结)漏抽
  401. # (值) 挑中的 value 也打字缩写成标题纲要, 不是逐字原文 (能过 render 门禁因为不是 <占位>)
  402. # 这两条 check 都需要原文 (--source input/case-N.json [--ocr ocr.txt]) 才能比对.
  403. # 比对噪声: 空白 + 各式引号 (原文 “”、骨架常写成 「」/"",内容一致只是引号风格不同, 不该算缩写)
  404. _QUOTE_NOISE = dict.fromkeys(map(ord, '「」『』“”‘’"\'"''), None)
  405. def _norm(s: str) -> str:
  406. """归一化用于子串比对: 去所有空白 (原文常把一个词拆到两行) + 抹掉引号风格差异."""
  407. return re.sub(r'\s+', '', s or '').translate(_QUOTE_NOISE)
  408. def _load_source_corpus(source_path: Path | None, ocr_path: Path | None) -> tuple[str, str]:
  409. """读原文语料: 返回 (raw_text, normed). raw 用来切章节, normed 用来子串比对.
  410. source = input/case-N.json 的 title + body_text; ocr = 配图 OCR 文本 (可选).
  411. """
  412. parts: list[str] = []
  413. if source_path and source_path.exists():
  414. try:
  415. sd = json.loads(source_path.read_text(encoding='utf-8'))
  416. parts.append(sd.get('title', '') or '')
  417. parts.append(sd.get('body_text', '') or sd.get('content', '') or '')
  418. except Exception:
  419. parts.append(source_path.read_text(encoding='utf-8'))
  420. if ocr_path and ocr_path.exists():
  421. parts.append(ocr_path.read_text(encoding='utf-8'))
  422. raw = '\n'.join(parts)
  423. return raw, _norm(raw)
  424. def _sections(body: str) -> list[tuple[str, str, str]]:
  425. """切原文章节: 按行首 `NN |` 两位标号 (01..99; 行首要求天然排除 `图 0N |` 配图说明). 返回 [(号, 标题, 正文段)]."""
  426. marks = [(m.start(), m.group(1)) for m in re.finditer(r'(?m)^\s*(\d{2})\s*[||]', body)]
  427. out: list[tuple[str, str, str]] = []
  428. for idx, (pos, num) in enumerate(marks):
  429. end = marks[idx + 1][0] if idx + 1 < len(marks) else len(body)
  430. seg = body[pos:end]
  431. after = re.split(r'[||]', seg, 1)
  432. tail = after[-1] if len(after) > 1 else seg
  433. title = ''
  434. for line in tail.splitlines():
  435. line = line.strip()
  436. if line:
  437. title = line[:24]
  438. break
  439. out.append((num, title, seg))
  440. return out
  441. # 章节正文里的"要点标记": 思路X / 第X层 / 第X步 / 案例X / 冒号短标签 (人物特征:…)
  442. _POINT_MARKER = re.compile(
  443. r'(?m)^\s*(思路[一二三四五]|第[一二三四五六七八九十]+[层步]|案例[一二三四五六七八九十]+)')
  444. _POINT_COLON = re.compile(r'(?m)^\s*([^\n::((]{2,12})\s*[::]')
  445. def _section_points(seg: str) -> list[str]:
  446. """抽一节正文的要点短语 (用来量化它被骨架覆盖了多少)."""
  447. pts: list[str] = []
  448. for m in _POINT_MARKER.finditer(seg):
  449. line = seg[m.start():].splitlines()[0].strip()
  450. pts.append(line[:16])
  451. for m in _POINT_COLON.finditer(seg):
  452. lab = m.group(1).strip()
  453. # 纯序号标记 (第X步/第X层/思路X/案例X) 是结构序号不是内容要点, 骨架改写成动作后必然对不上 → 跳过
  454. if re.fullmatch(r'(思路[一二三四五]|第[一二三四五六七八九十]+[层步]|案例[一二三四五六七八九十]+)', lab):
  455. continue
  456. if re.search(r'[一-龥]', lab):
  457. pts.append(lab)
  458. seen: set[str] = set()
  459. out: list[str] = []
  460. for p in pts:
  461. if p and p not in seen:
  462. seen.add(p)
  463. out.append(p)
  464. return out
  465. def _point_covered(point: str, wf_norm: str) -> bool:
  466. """要点是否被骨架覆盖: 去掉结构前缀后, 任一 4-gram 命中 workflow 文本即算覆盖 (从宽)."""
  467. core = re.sub(r'^(思路[一二三四五]|第[一二三四五六七八九十]+[层步]|案例[一二三四五六七八九十]+)', '', point)
  468. core = _norm(core) or _norm(point)
  469. if len(core) < 4:
  470. return core in wf_norm
  471. return any(core[k:k + 4] in wf_norm for k in range(len(core) - 3))
  472. def check_section_coverage(case_data: dict, source_raw: str, wf_norm: str) -> list[str]:
  473. """结构强制: 逐章节算骨架覆盖率, 整段漏抽的章节 (<40%) 报出来 + 给缺失要点样例."""
  474. hints: list[str] = []
  475. secs = _sections(source_raw)
  476. if not secs:
  477. return hints
  478. for num, title, seg in secs:
  479. pts = _section_points(seg)
  480. if len(pts) < 2:
  481. continue # 没足够要点 (纯过渡/口号段), 不评判
  482. missed = [p for p in pts if not _point_covered(p, wf_norm)]
  483. ratio = 1 - len(missed) / len(pts)
  484. if ratio < 0.40:
  485. sample = '、'.join(missed[:5])
  486. hints.append(
  487. f"章节『{num} {title}』覆盖率 {ratio:.0%} ({len(pts) - len(missed)}/{len(pts)} 要点) "
  488. f"—— 疑似整段漏抽; 缺: {sample}{' …' if len(missed) > 5 else ''}; "
  489. f"回去为它补 procedure/step (每个 0N 章节至少对应一个工序或子步骤)"
  490. )
  491. return hints
  492. def _longest_run(v_norm: str, source_norm: str) -> int:
  493. """value 在原文里能连续命中的最长子串长度. 逐字原文应是原文一整段连续文本;
  494. 拼接/缩写出来的(把分散的小标题用、串起来)最长连续命中会很短."""
  495. n = len(v_norm)
  496. best = 0
  497. for i in range(n):
  498. if n - i <= best:
  499. break # 剩余长度已不可能超过 best
  500. lo, hi = 0, n - i
  501. while lo < hi: # 二分该起点能命中的最长长度
  502. mid = (lo + hi + 1) // 2
  503. if v_norm[i:i + mid] in source_norm:
  504. lo = mid
  505. else:
  506. hi = mid - 1
  507. if lo > best:
  508. best = lo
  509. return best
  510. def check_value_verbatim(case_data: dict, source_norm: str) -> list[str]:
  511. """值强制: 文本类 value 必须是原文里的「一整段连续文本」. 最长连续命中 <80% 判缩写/改写/截断.
  512. 跳过: 占位<…>(归 check3)、inferred、原文未提供、未 resolve 的 @quote、媒体类、短值(<12字).
  513. 用最长连续命中而非逐子句覆盖: 后者会被「人物、产品、环境」这种"原文小标题拼盘"骗过
  514. (每个词单独在原文里, 但整体不是任何一段原文 — 真正的逐字细节全被丢了).
  515. """
  516. hints: list[str] = []
  517. if not source_norm:
  518. return hints
  519. for proc_label, proc in _iter_procedures(case_data):
  520. type_reg = proc.get('type_registry') or {}
  521. for i, step in enumerate(proc.get('steps') or []):
  522. if not isinstance(step, dict):
  523. continue
  524. for kind in ('inputs', 'outputs'):
  525. for j, item in enumerate(step.get(kind) or []):
  526. if not isinstance(item, dict):
  527. continue
  528. if item.get('inferred'):
  529. continue
  530. v = item.get('value')
  531. if not isinstance(v, str) or not v.strip():
  532. continue
  533. if PLACEHOLDER_RE.match(v):
  534. continue # 占位 → check3 管
  535. if v.startswith('@quote'):
  536. continue # 未回填的 quote, resolve 后才比
  537. if NOSRC_RE.search(v):
  538. continue
  539. if _type_modality(item.get('type', '') or '', type_reg) == 'media':
  540. continue # 媒体描述不要求逐字
  541. vn = _norm(v)
  542. if len(vn) < 12:
  543. continue # 短标签不查 (无所谓缩写)
  544. run = _longest_run(vn, source_norm)
  545. ratio = run / len(vn)
  546. # 绝对护栏: 连续命中 ≥80 字 = 铁证级真引用 (没人会"凑巧"逐字打 80 字),
  547. # 哪怕整体比例因中途一处微小偏差掉到 80% 也放行, 只抓"短值缩写"。
  548. if run >= 80:
  549. continue
  550. if ratio < 0.80:
  551. hints.append(
  552. f"[{proc_label}] step[{i}].{kind}[{j}] value 最长连续命中原文仅 {run}/{len(vn)} 字"
  553. f"({ratio:.0%}) —— 整体不是一整段原文(疑似开头逐字后就缩写/改写); value={v[:40]!r}…; "
  554. f"用 @quote|起锚|止锚 + wf-patch --resolve-quotes 把整段原文逐字拽进来(原文那段提示词约 350 字)"
  555. )
  556. return hints
  557. # ===========================================================================
  558. # Side effect: record 新 type 到 type_suggestions.md
  559. # ===========================================================================
  560. def record_new_types(case_data: dict, suggestions_path: Path = SUGGESTIONS) -> list[str]:
  561. """把 case_data.type_registry 里的 case-specific type append 到 suggestions.
  562. 幂等: 同一 (type_name, case_id) 二元组只 append 一次. Dedup 靠 grep 现有文件,
  563. 抽 `(来自 case-{N})` + 类型名 二元组.
  564. Returns:
  565. 本次新写入的条目 list (空 list = 没新东西要 record).
  566. """
  567. # 合并所有 procedures.type_registry
  568. type_reg: dict = {}
  569. for p in case_data.get('procedures', []):
  570. type_reg.update(p.get('type_registry') or {})
  571. if not type_reg:
  572. return []
  573. leaves = load_type_leaves()
  574. case_id = case_data.get('case_id') or '?'
  575. text = suggestions_path.read_text(encoding='utf-8') if suggestions_path.exists() else ''
  576. # 已 record 过的 (type_name, case_id) — 用 regex 抓 markdown list entry
  577. existing = set(re.findall(
  578. r'^- `([^`]+)`:.*?\(来自 case-([^,)\s]+)', text, re.M
  579. ))
  580. new_lines: list[str] = []
  581. for tname, entry in type_reg.items():
  582. if not isinstance(entry, dict):
  583. continue
  584. if tname in leaves:
  585. continue # 已是字典叶子, 不是新 type (Agent 误把 stdlib type 加进 case_data.type_registry)
  586. if (tname, str(case_id)) in existing:
  587. continue
  588. ext = entry.get('extends', '?')
  589. desc = entry.get('desc') or '(无 desc)'
  590. new_lines.append(f'- `{tname}`: {desc} (来自 case-{case_id}, extends `{ext}`)')
  591. if new_lines:
  592. # 确保 suggestions 文件存在 (没有就建个空骨架)
  593. if not suggestions_path.exists():
  594. suggestions_path.write_text(
  595. '# Type 字典扩展建议\n\n## 累积条目\n\n', encoding='utf-8'
  596. )
  597. # append 末尾
  598. with suggestions_path.open('a', encoding='utf-8') as f:
  599. f.write('\n' + '\n'.join(new_lines) + '\n')
  600. return new_lines
  601. # ===========================================================================
  602. # main
  603. # ===========================================================================
  604. def main() -> None:
  605. ap = argparse.ArgumentParser(
  606. prog='lint-case.py',
  607. description='workflow 轻量 lint + 自动 record 新 type 到 type_suggestions.md',
  608. )
  609. ap.add_argument('--workflow', type=Path, required=True,
  610. help='workflow.json (含 procedures 数组). lint 内部读 procedures + type_registry')
  611. ap.add_argument('--case-id', type=str, default=None,
  612. help='record suggestions 用的 case_id. 不传就 fallback workflow.case_id 或 ?')
  613. ap.add_argument('--source', type=Path, default=None,
  614. help='原文 input/case-N.json. 传了才启用「章节覆盖」+「value 逐字」两条结构/值强制校验')
  615. ap.add_argument('--ocr', type=Path, default=None,
  616. help='配图 OCR 文本 (可选). 并入原文语料, 让逐字校验也认配图里的文字')
  617. ap.add_argument('--no-record', action='store_true',
  618. help='只校验, 不写 suggestions')
  619. ap.add_argument('--json', action='store_true',
  620. help='输出机器可读 JSON ({"checks": {名: [提示...]}}); runner 的完成度判据消费它')
  621. args = ap.parse_args()
  622. target_path = args.workflow
  623. if not target_path.exists():
  624. print(f'lint-case: 文件不存在 {target_path}', file=sys.stderr)
  625. sys.exit(2)
  626. try:
  627. case_data = json.loads(target_path.read_text(encoding='utf-8'))
  628. except json.JSONDecodeError as e:
  629. print(f'lint-case: {target_path} 不是合法 JSON: {e}', file=sys.stderr)
  630. sys.exit(2)
  631. # workflow 模式: 注入 case_id (suggestions record 需要)
  632. if args.case_id is not None and 'case_id' not in case_data:
  633. try:
  634. case_data['case_id'] = int(args.case_id)
  635. except ValueError:
  636. case_data['case_id'] = args.case_id
  637. case_id = case_data.get('case_id', '?')
  638. # 全部检查跑进 results (检查名 → 提示列表); --json 和人读输出共用这一份
  639. results: dict[str, list[str]] = {
  640. 'type_completeness': check_type_completeness(case_data),
  641. 'value_selfcontained': check_value_selfcontained(case_data),
  642. 'placeholder_content': check_placeholder_content(case_data),
  643. 'unresolved_quotes': check_unresolved_quotes(case_data),
  644. 'classification_done': check_classification_done(case_data),
  645. }
  646. source_checked = args.source is not None
  647. if source_checked:
  648. source_raw, source_norm = _load_source_corpus(args.source, args.ocr)
  649. wf_norm = _norm(json.dumps(case_data, ensure_ascii=False))
  650. results['section_coverage'] = check_section_coverage(case_data, source_raw, wf_norm)
  651. results['value_verbatim'] = check_value_verbatim(case_data, source_norm)
  652. # side effect: record 新 type
  653. recorded: list[str] = []
  654. if not args.no_record:
  655. recorded = record_new_types(case_data)
  656. if args.json:
  657. print(json.dumps({'case_id': case_id, 'checks': results,
  658. 'source_checked': source_checked,
  659. 'recorded': len(recorded)}, ensure_ascii=False, indent=1))
  660. sys.exit(0)
  661. print(f'[lint] case-{case_id} ({target_path.name})')
  662. _HEADERS = {
  663. 'type_completeness': ('type 完整性', '个提示'),
  664. 'value_selfcontained': ('value 自包含', '个引用占位 (跑 wf-patch.py --resolve-passthrough 自动回填)'),
  665. 'placeholder_content': ('value/directive 真实性', '处占位/缺失 (用 quote-source.py 从原文/配图 OCR 捞真内容回填)'),
  666. 'unresolved_quotes': ('@quote 残留', '处未解析标记 (改锚点重跑 --resolve-quotes 或直接填真实内容)'),
  667. 'classification_done': ('归类完成度', '项未完成 (Phase 2.1: effect/action 对词表填, intent 用标记格式)'),
  668. 'section_coverage': ('章节覆盖(结构强制)', '个章节疑似漏抽 —— 骨架要覆盖原文每个章节'),
  669. 'value_verbatim': ('value 逐字(值强制)', '处疑似缩写/改写 —— 文本类 value 要逐字搬原文(用 @quote)'),
  670. }
  671. for name, hints in results.items():
  672. title, suffix = _HEADERS[name]
  673. if hints:
  674. print(f' · {title}: {len(hints)} {suffix}')
  675. for h in hints:
  676. print(f' - {h}')
  677. else:
  678. print(f' · {title}: OK')
  679. if not source_checked:
  680. print(' · 章节覆盖 + value 逐字: skipped (传 --source input/case-N.json [--ocr ocr.txt] 启用结构/值强制)')
  681. if not args.no_record:
  682. if recorded:
  683. print(f' · 已 record {len(recorded)} 条新 type 到 {SUGGESTIONS.name}:')
  684. for ln in recorded:
  685. print(f' {ln}')
  686. else:
  687. merged_reg: dict = {}
  688. for p in case_data.get('procedures', []):
  689. merged_reg.update(p.get('type_registry') or {})
  690. if not merged_reg:
  691. print(' · 无新 type 可 record (type_registry 为空 — 全部 type 命中字典叶子)')
  692. else:
  693. print(' · 无新 type 可 record (type_registry 里的项已全部 record 过)')
  694. # 不卡 exit code
  695. sys.exit(0)
  696. if __name__ == '__main__':
  697. main()