lint-case.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. lint-case.py — workflow.json 轻量 lint + 自动 record 新 type 到 type_suggestions.md.
  5. 设计哲学: **不严格**.
  6. - 不分 error/warning 等级, 不卡 exit code (都返 0)
  7. - 主要副作用是 record 新 type 到 spec/taxonomy/type_suggestions.md
  8. - 检测项打 stdout 给 Agent / 用户看, 决定要不要回去修
  9. 用法:
  10. python spec/tools/lint-case.py --workflow outputs/case-{N}/workflow.json --case-id {N}
  11. python spec/tools/lint-case.py --workflow outputs/case-{N}/workflow.json --case-id {N} --no-record # 只校验不写
  12. 退出码:
  13. 0 始终 (不阻塞流程)
  14. 2 CLI 参数错误 / 文件不存在
  15. """
  16. from __future__ import annotations
  17. import argparse
  18. import json
  19. import re
  20. import sys
  21. from pathlib import Path
  22. # spec/tools/lint-case.py → procedure-dsl/
  23. DSL_ROOT = Path(__file__).resolve().parent.parent.parent
  24. TYPE_JSON = DSL_ROOT / 'spec' / 'taxonomy' / 'type.json'
  25. SUGGESTIONS = DSL_ROOT / 'spec' / 'taxonomy' / 'type_suggestions.md'
  26. # Windows 控制台 UTF-8
  27. for _s in (sys.stdout, sys.stderr):
  28. if hasattr(_s, 'reconfigure'):
  29. try:
  30. _s.reconfigure(encoding='utf-8', errors='replace')
  31. except Exception:
  32. pass
  33. def load_type_leaves() -> set[str]:
  34. """读 spec/taxonomy/type.json 的 $leaves 集合."""
  35. if not TYPE_JSON.exists():
  36. return set()
  37. return set(json.loads(TYPE_JSON.read_text(encoding='utf-8')).get('$leaves', []))
  38. # ===========================================================================
  39. # Check 1: type 完整性提示
  40. # ===========================================================================
  41. def _iter_procedures(case_data: dict):
  42. """遍历 workflow.json 的 procedures.
  43. Yields: (procedure_label, procedure_dict) — 含 steps + type_registry.
  44. """
  45. for p in case_data.get('procedures') or []:
  46. label = p.get('id') or p.get('name') or '?'
  47. yield (label, p)
  48. def check_type_completeness(case_data: dict) -> list[str]:
  49. """IO 用了 case-specific type 但 type_registry 漏写 entry → 提示.
  50. 只 hint 不 fail. Agent 看输出回去修.
  51. 多工序时, hint 前缀加 [proc_id] 让用户知道是哪个工序的问题.
  52. """
  53. leaves = load_type_leaves()
  54. hints: list[str] = []
  55. for proc_label, proc in _iter_procedures(case_data):
  56. type_reg = proc.get('type_registry') or {}
  57. for i, step in enumerate(proc.get('steps') or []):
  58. if not isinstance(step, dict):
  59. continue
  60. for kind in ('inputs', 'outputs'):
  61. for j, item in enumerate(step.get(kind) or []):
  62. if not isinstance(item, dict):
  63. continue
  64. t = item.get('type', '') or ''
  65. if not t:
  66. continue
  67. if t in leaves:
  68. continue # 字典叶子, OK
  69. if t not in type_reg:
  70. hints.append(
  71. f"[{proc_label}] step[{i}].{kind}[{j}].type={t!r} 是 case-specific "
  72. f"但 type_registry 没注册"
  73. )
  74. else:
  75. entry = type_reg[t]
  76. if isinstance(entry, dict):
  77. if not entry.get('extends'):
  78. hints.append(f"[{proc_label}] type_registry[{t!r}] 缺 extends 字段")
  79. if not entry.get('desc'):
  80. hints.append(f"[{proc_label}] type_registry[{t!r}] 缺 desc 字段 (renderer drawer 显示需要)")
  81. return hints
  82. # ===========================================================================
  83. # Check 2: value / directive 自包含性 (禁止引用占位)
  84. # ===========================================================================
  85. # value/directive 应填数据本身, 不是 anchor 的引用. 命中即「没真正回填」.
  86. META_REF = re.compile(r'[((]?\s*同\s*s[\d]|见\s*s[\d]|←\s*s[\d]|同上')
  87. def check_value_selfcontained(case_data: dict) -> list[str]:
  88. """扫每个 IO 的 value + 每个 directive, 找「引用占位」文案 (同 sX / 见 sX / ← sX ...).
  89. spec: value 逐字回填数据本身, 引用归 anchor (fields.md 数据流组 + syntax §6).
  90. 这种占位 schema/type 检查抓不到, 专门一条. 只 hint. 可用
  91. `wf-patch.py --resolve-passthrough` 自动从源回填.
  92. """
  93. hints: list[str] = []
  94. for proc_label, proc in _iter_procedures(case_data):
  95. for i, step in enumerate(proc.get('steps') or []):
  96. if not isinstance(step, dict):
  97. continue
  98. for kind in ('inputs', 'outputs'):
  99. for j, item in enumerate(step.get(kind) or []):
  100. if not isinstance(item, dict):
  101. continue
  102. v = item.get('value')
  103. if isinstance(v, str) and META_REF.search(v):
  104. hints.append(f"[{proc_label}] step[{i}].{kind}[{j}].value 是引用占位 {v[:24]!r} — 应逐字回填数据本身")
  105. for di, pair in enumerate(step.get('instruction') or []):
  106. if isinstance(pair, list) and len(pair) == 2 and pair[0] == 'directive':
  107. t = pair[1]
  108. if isinstance(t, str) and META_REF.search(t):
  109. hints.append(f"[{proc_label}] step[{i}].instruction[{di}](directive) 是引用占位 {t[:24]!r} — 应填实际 prompt 原文")
  110. return hints
  111. # ===========================================================================
  112. # Side effect: record 新 type 到 type_suggestions.md
  113. # ===========================================================================
  114. def record_new_types(case_data: dict, suggestions_path: Path = SUGGESTIONS) -> list[str]:
  115. """把 case_data.type_registry 里的 case-specific type append 到 suggestions.
  116. 幂等: 同一 (type_name, case_id) 二元组只 append 一次. Dedup 靠 grep 现有文件,
  117. 抽 `(来自 case-{N})` + 类型名 二元组.
  118. Returns:
  119. 本次新写入的条目 list (空 list = 没新东西要 record).
  120. """
  121. # 合并所有 procedures.type_registry
  122. type_reg: dict = {}
  123. for p in case_data.get('procedures', []):
  124. type_reg.update(p.get('type_registry') or {})
  125. if not type_reg:
  126. return []
  127. leaves = load_type_leaves()
  128. case_id = case_data.get('case_id') or '?'
  129. text = suggestions_path.read_text(encoding='utf-8') if suggestions_path.exists() else ''
  130. # 已 record 过的 (type_name, case_id) — 用 regex 抓 markdown list entry
  131. existing = set(re.findall(
  132. r'^- `([^`]+)`:.*?\(来自 case-([^,)\s]+)', text, re.M
  133. ))
  134. new_lines: list[str] = []
  135. for tname, entry in type_reg.items():
  136. if not isinstance(entry, dict):
  137. continue
  138. if tname in leaves:
  139. continue # 已是字典叶子, 不是新 type (Agent 误把 stdlib type 加进 case_data.type_registry)
  140. if (tname, str(case_id)) in existing:
  141. continue
  142. ext = entry.get('extends', '?')
  143. desc = entry.get('desc') or '(无 desc)'
  144. new_lines.append(f'- `{tname}`: {desc} (来自 case-{case_id}, extends `{ext}`)')
  145. if new_lines:
  146. # 确保 suggestions 文件存在 (没有就建个空骨架)
  147. if not suggestions_path.exists():
  148. suggestions_path.write_text(
  149. '# Type 字典扩展建议\n\n## 累积条目\n\n', encoding='utf-8'
  150. )
  151. # append 末尾
  152. with suggestions_path.open('a', encoding='utf-8') as f:
  153. f.write('\n' + '\n'.join(new_lines) + '\n')
  154. return new_lines
  155. # ===========================================================================
  156. # main
  157. # ===========================================================================
  158. def main() -> None:
  159. ap = argparse.ArgumentParser(
  160. prog='lint-case.py',
  161. description='workflow 轻量 lint + 自动 record 新 type 到 type_suggestions.md',
  162. )
  163. ap.add_argument('--workflow', type=Path, required=True,
  164. help='workflow.json (含 procedures 数组). lint 内部读 procedures + type_registry')
  165. ap.add_argument('--case-id', type=str, default=None,
  166. help='record suggestions 用的 case_id. 不传就 fallback workflow.case_id 或 ?')
  167. ap.add_argument('--no-record', action='store_true',
  168. help='只校验, 不写 suggestions')
  169. args = ap.parse_args()
  170. target_path = args.workflow
  171. if not target_path.exists():
  172. print(f'lint-case: 文件不存在 {target_path}', file=sys.stderr)
  173. sys.exit(2)
  174. try:
  175. case_data = json.loads(target_path.read_text(encoding='utf-8'))
  176. except json.JSONDecodeError as e:
  177. print(f'lint-case: {target_path} 不是合法 JSON: {e}', file=sys.stderr)
  178. sys.exit(2)
  179. # workflow 模式: 注入 case_id (suggestions record 需要)
  180. if args.case_id is not None and 'case_id' not in case_data:
  181. try:
  182. case_data['case_id'] = int(args.case_id)
  183. except ValueError:
  184. case_data['case_id'] = args.case_id
  185. case_id = case_data.get('case_id', '?')
  186. print(f'[lint] case-{case_id} ({target_path.name})')
  187. # check: type 完整性提示
  188. hints = check_type_completeness(case_data)
  189. if hints:
  190. print(f' · type 完整性: {len(hints)} 个提示')
  191. for h in hints:
  192. print(f' - {h}')
  193. else:
  194. print(' · type 完整性: OK')
  195. # check: value / directive 自包含性 (引用占位)
  196. vhints = check_value_selfcontained(case_data)
  197. if vhints:
  198. print(f' · value 自包含: {len(vhints)} 个引用占位 (跑 wf-patch.py --resolve-passthrough 自动回填)')
  199. for h in vhints:
  200. print(f' - {h}')
  201. else:
  202. print(' · value 自包含: OK')
  203. # side effect: record 新 type
  204. if not args.no_record:
  205. new_lines = record_new_types(case_data)
  206. if new_lines:
  207. print(f' · 已 record {len(new_lines)} 条新 type 到 {SUGGESTIONS.name}:')
  208. for ln in new_lines:
  209. print(f' {ln}')
  210. else:
  211. # 合并所有 procedure type_registry
  212. merged_reg: dict = {}
  213. for p in case_data.get('procedures', []):
  214. merged_reg.update(p.get('type_registry') or {})
  215. if not merged_reg:
  216. print(' · 无新 type 可 record (type_registry 为空 — 全部 type 命中字典叶子)')
  217. else:
  218. print(' · 无新 type 可 record (type_registry 里的项已全部 record 过)')
  219. # 不卡 exit code
  220. sys.exit(0)
  221. if __name__ == '__main__':
  222. main()