| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- lint-case.py — workflow.json 轻量 lint + 自动 record 新 type 到 type_suggestions.md.
- 设计哲学: **不严格**.
- - 不分 error/warning 等级, 不卡 exit code (都返 0)
- - 主要副作用是 record 新 type 到 spec/taxonomy/type_suggestions.md
- - 检测项打 stdout 给 Agent / 用户看, 决定要不要回去修
- 用法:
- python spec/tools/lint-case.py --workflow outputs/case-{N}/workflow.json --case-id {N}
- python spec/tools/lint-case.py --workflow outputs/case-{N}/workflow.json --case-id {N} --no-record # 只校验不写
- 退出码:
- 0 始终 (不阻塞流程)
- 2 CLI 参数错误 / 文件不存在
- """
- from __future__ import annotations
- import argparse
- import json
- import re
- import sys
- from pathlib import Path
- # spec/tools/lint-case.py → procedure-dsl/
- DSL_ROOT = Path(__file__).resolve().parent.parent.parent
- TYPE_JSON = DSL_ROOT / 'spec' / 'taxonomy' / 'type.json'
- SUGGESTIONS = DSL_ROOT / 'spec' / 'taxonomy' / 'type_suggestions.md'
- # Windows 控制台 UTF-8
- for _s in (sys.stdout, sys.stderr):
- if hasattr(_s, 'reconfigure'):
- try:
- _s.reconfigure(encoding='utf-8', errors='replace')
- except Exception:
- pass
- def load_type_leaves() -> set[str]:
- """读 spec/taxonomy/type.json 的 $leaves 集合."""
- if not TYPE_JSON.exists():
- return set()
- return set(json.loads(TYPE_JSON.read_text(encoding='utf-8')).get('$leaves', []))
- # ===========================================================================
- # Check 1: type 完整性提示
- # ===========================================================================
- def _iter_procedures(case_data: dict):
- """遍历 workflow.json 的 procedures.
- Yields: (procedure_label, procedure_dict) — 含 steps + type_registry.
- """
- for p in case_data.get('procedures') or []:
- label = p.get('id') or p.get('name') or '?'
- yield (label, p)
- def check_type_completeness(case_data: dict) -> list[str]:
- """IO 用了 case-specific type 但 type_registry 漏写 entry → 提示.
- 只 hint 不 fail. Agent 看输出回去修.
- 多工序时, hint 前缀加 [proc_id] 让用户知道是哪个工序的问题.
- """
- leaves = load_type_leaves()
- hints: list[str] = []
- for proc_label, proc in _iter_procedures(case_data):
- type_reg = proc.get('type_registry') or {}
- for i, step in enumerate(proc.get('steps') or []):
- if not isinstance(step, dict):
- continue
- for kind in ('inputs', 'outputs'):
- for j, item in enumerate(step.get(kind) or []):
- if not isinstance(item, dict):
- continue
- t = item.get('type', '') or ''
- if not t:
- continue
- if t in leaves:
- continue # 字典叶子, OK
- if t not in type_reg:
- hints.append(
- f"[{proc_label}] step[{i}].{kind}[{j}].type={t!r} 是 case-specific "
- f"但 type_registry 没注册"
- )
- else:
- entry = type_reg[t]
- if isinstance(entry, dict):
- if not entry.get('extends'):
- hints.append(f"[{proc_label}] type_registry[{t!r}] 缺 extends 字段")
- if not entry.get('desc'):
- hints.append(f"[{proc_label}] type_registry[{t!r}] 缺 desc 字段 (renderer drawer 显示需要)")
- return hints
- # ===========================================================================
- # Check 2: value / directive 自包含性 (禁止引用占位)
- # ===========================================================================
- # value/directive 应填数据本身, 不是 anchor 的引用. 命中即「没真正回填」.
- META_REF = re.compile(r'[((]?\s*同\s*s[\d]|见\s*s[\d]|←\s*s[\d]|同上')
- def check_value_selfcontained(case_data: dict) -> list[str]:
- """扫每个 IO 的 value + 每个 directive, 找「引用占位」文案 (同 sX / 见 sX / ← sX ...).
- spec: value 逐字回填数据本身, 引用归 anchor (fields.md 数据流组 + syntax §6).
- 这种占位 schema/type 检查抓不到, 专门一条. 只 hint. 可用
- `wf-patch.py --resolve-passthrough` 自动从源回填.
- """
- hints: list[str] = []
- for proc_label, proc in _iter_procedures(case_data):
- for i, step in enumerate(proc.get('steps') or []):
- if not isinstance(step, dict):
- continue
- for kind in ('inputs', 'outputs'):
- for j, item in enumerate(step.get(kind) or []):
- if not isinstance(item, dict):
- continue
- v = item.get('value')
- if isinstance(v, str) and META_REF.search(v):
- hints.append(f"[{proc_label}] step[{i}].{kind}[{j}].value 是引用占位 {v[:24]!r} — 应逐字回填数据本身")
- for di, pair in enumerate(step.get('instruction') or []):
- if isinstance(pair, list) and len(pair) == 2 and pair[0] == 'directive':
- t = pair[1]
- if isinstance(t, str) and META_REF.search(t):
- hints.append(f"[{proc_label}] step[{i}].instruction[{di}](directive) 是引用占位 {t[:24]!r} — 应填实际 prompt 原文")
- return hints
- # ===========================================================================
- # Side effect: record 新 type 到 type_suggestions.md
- # ===========================================================================
- def record_new_types(case_data: dict, suggestions_path: Path = SUGGESTIONS) -> list[str]:
- """把 case_data.type_registry 里的 case-specific type append 到 suggestions.
- 幂等: 同一 (type_name, case_id) 二元组只 append 一次. Dedup 靠 grep 现有文件,
- 抽 `(来自 case-{N})` + 类型名 二元组.
- Returns:
- 本次新写入的条目 list (空 list = 没新东西要 record).
- """
- # 合并所有 procedures.type_registry
- type_reg: dict = {}
- for p in case_data.get('procedures', []):
- type_reg.update(p.get('type_registry') or {})
- if not type_reg:
- return []
- leaves = load_type_leaves()
- case_id = case_data.get('case_id') or '?'
- text = suggestions_path.read_text(encoding='utf-8') if suggestions_path.exists() else ''
- # 已 record 过的 (type_name, case_id) — 用 regex 抓 markdown list entry
- existing = set(re.findall(
- r'^- `([^`]+)`:.*?\(来自 case-([^,)\s]+)', text, re.M
- ))
- new_lines: list[str] = []
- for tname, entry in type_reg.items():
- if not isinstance(entry, dict):
- continue
- if tname in leaves:
- continue # 已是字典叶子, 不是新 type (Agent 误把 stdlib type 加进 case_data.type_registry)
- if (tname, str(case_id)) in existing:
- continue
- ext = entry.get('extends', '?')
- desc = entry.get('desc') or '(无 desc)'
- new_lines.append(f'- `{tname}`: {desc} (来自 case-{case_id}, extends `{ext}`)')
- if new_lines:
- # 确保 suggestions 文件存在 (没有就建个空骨架)
- if not suggestions_path.exists():
- suggestions_path.write_text(
- '# Type 字典扩展建议\n\n## 累积条目\n\n', encoding='utf-8'
- )
- # append 末尾
- with suggestions_path.open('a', encoding='utf-8') as f:
- f.write('\n' + '\n'.join(new_lines) + '\n')
- return new_lines
- # ===========================================================================
- # main
- # ===========================================================================
- def main() -> None:
- ap = argparse.ArgumentParser(
- prog='lint-case.py',
- description='workflow 轻量 lint + 自动 record 新 type 到 type_suggestions.md',
- )
- ap.add_argument('--workflow', type=Path, required=True,
- help='workflow.json (含 procedures 数组). lint 内部读 procedures + type_registry')
- ap.add_argument('--case-id', type=str, default=None,
- help='record suggestions 用的 case_id. 不传就 fallback workflow.case_id 或 ?')
- ap.add_argument('--no-record', action='store_true',
- help='只校验, 不写 suggestions')
- args = ap.parse_args()
- target_path = args.workflow
- if not target_path.exists():
- print(f'lint-case: 文件不存在 {target_path}', file=sys.stderr)
- sys.exit(2)
- try:
- case_data = json.loads(target_path.read_text(encoding='utf-8'))
- except json.JSONDecodeError as e:
- print(f'lint-case: {target_path} 不是合法 JSON: {e}', file=sys.stderr)
- sys.exit(2)
- # workflow 模式: 注入 case_id (suggestions record 需要)
- if args.case_id is not None and 'case_id' not in case_data:
- try:
- case_data['case_id'] = int(args.case_id)
- except ValueError:
- case_data['case_id'] = args.case_id
- case_id = case_data.get('case_id', '?')
- print(f'[lint] case-{case_id} ({target_path.name})')
- # check: type 完整性提示
- hints = check_type_completeness(case_data)
- if hints:
- print(f' · type 完整性: {len(hints)} 个提示')
- for h in hints:
- print(f' - {h}')
- else:
- print(' · type 完整性: OK')
- # check: value / directive 自包含性 (引用占位)
- vhints = check_value_selfcontained(case_data)
- if vhints:
- print(f' · value 自包含: {len(vhints)} 个引用占位 (跑 wf-patch.py --resolve-passthrough 自动回填)')
- for h in vhints:
- print(f' - {h}')
- else:
- print(' · value 自包含: OK')
- # side effect: record 新 type
- if not args.no_record:
- new_lines = record_new_types(case_data)
- if new_lines:
- print(f' · 已 record {len(new_lines)} 条新 type 到 {SUGGESTIONS.name}:')
- for ln in new_lines:
- print(f' {ln}')
- else:
- # 合并所有 procedure type_registry
- merged_reg: dict = {}
- for p in case_data.get('procedures', []):
- merged_reg.update(p.get('type_registry') or {})
- if not merged_reg:
- print(' · 无新 type 可 record (type_registry 为空 — 全部 type 命中字典叶子)')
- else:
- print(' · 无新 type 可 record (type_registry 里的项已全部 record 过)')
- # 不卡 exit code
- sys.exit(0)
- if __name__ == '__main__':
- main()
|