#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ wf-patch.py — workflow.json 的安全批量字段设置器. 为什么有这个工具: workflow.json 由各 phase **直接 Write 骨架 + 逐字段填充** 演化. 但「给几十个 IO 逐个加 anchor」「给每个 step 填 effect/action/type」这类批量字段赋值, 用 Edit 一处一处改太碎, 手写整段 JSON 又极易踩转义 / 控制字符坑 (把文件搞坏). wf-patch 卡在中间: **你只负责语义决策 (path=value), 工具负责安全落盘 + 合法性校验**. - 安全 IO: 工具自己 json.load -> 改 -> json.dump(ensure_ascii=False), 你永远不手写 JSON. - 写入即校验 (fail-fast): 每条赋值立刻对照字典树 / type_registry / anchor 格式校验, **任何一条非法 -> 报具体哪条错, 整批不写** (不产出悄悄错的文件). lint 仍做全局兜底. 用法: # 单条 / 多条 --set (path=value, 只在第一个 '=' 处切, value 可含 '=' 和空格) python spec/tools/wf-patch.py --workflow outputs/case-N/workflow.json \ --set 'p1.s1.inputs[0].anchor=← s0.主角图' \ --set 'p1.s2.effect=主体生成' \ --set 'p1.s2.action=生成/图像生成/文生图' # 或一次性喂一份 patch 清单 (适合 1.3 加 anchor / 2A 填字段这种几十处批量) python spec/tools/wf-patch.py --workflow outputs/case-N/workflow.json --patch _scratch/anchors.json # anchors.json = [{"path": "p1.s1.inputs[0].anchor", "value": "← s0.x"}, ...] # 只校验不写 python spec/tools/wf-patch.py --workflow ... --set '...' --dry-run # 删字段 (取代手 Edit 删; 字段不存在则幂等跳过) python spec/tools/wf-patch.py --workflow ... --unset 'p1.declarations.inputs[0].inferred' # 只校验不写 python spec/tools/wf-patch.py --workflow ... --set '...' --dry-run 路径语法 (proc / step 按 id 寻址, 不是下标; 只有真列表才用 [i]; 嵌套步 id 带点 s2.1 也支持): p1.s2.effect step 标量字段 (effect/via/action/feature/control/kind/intent/group) p1.s1.inputs[0].anchor IO 字段 (anchor/type/substance/form/name/value) p1.s2.focus step 的 focus 数组 (逗号分隔: focus=via,action,out-type-0) p1.purpose procedure 头部字段 (name/purpose/category/platform/author) p1.declarations.inputs[0].desc declarations 内任意字段 (通用下钻) source.url case-level 原帖信息 (platform/author/date/url/title/excerpt) p1.type_registry.场景图.extends 注册 case-specific 类型 (会自动建 type_registry 段) value 特殊取值: __null__ -> JSON null (用于 substance/form/url 可空) 仍用 Write / Edit 的只剩 (尽量别碰生 JSON): - workflow.json 骨架的首次创建 (Phase 1.2 从 template Write) - instruction (列表套列表, 手动 Edit; 透传 directive 用 --resolve-passthrough) 改字段/删字段/改 source 现在都走本工具, 不要再 Read→Edit 改 workflow.json (会反复重读、烧 token). 退出码: 0 全部校验通过并写入 (--dry-run 时为校验通过) 1 有校验失败 (整批未写) / 路径解析失败 2 CLI 参数错误 / 文件不存在 / JSON 损坏 """ from __future__ import annotations import argparse import json import re import subprocess import sys from pathlib import Path # spec/tools/wf-patch.py -> procedure-dsl/ DSL_ROOT = Path(__file__).resolve().parent.parent.parent TAX_DIR = DSL_ROOT / 'spec' / 'taxonomy' LOOKUP = DSL_ROOT / 'spec' / 'tools' / 'taxonomy-lookup.py' # Windows 控制台 UTF-8 for _s in (sys.stdout, sys.stderr): if hasattr(_s, 'reconfigure'): try: _s.reconfigure(encoding='utf-8', errors='replace') except Exception: pass # 受控词 (与 syntax.md §3 / action.json $control 对齐) FEATURE_VOCAB = {'随机', '幂等', '人工', '本地', '写外部', '读外部', '-'} KIND_VOCAB = {'step', 'block', 'nested', 'atom'} # value/directive 里的「引用占位」文案 — 这些是 anchor 的活, value 应填数据本身. # 命中即视为「未真正回填」(--resolve-passthrough 会尝试填, lint 会报警). META_REF = re.compile(r'[((]?\s*同\s*s[\d]|见\s*s[\d]|←\s*s[\d]|同上') class PathError(Exception): """路径无法解析到 workflow.json 里的目标位置.""" # =========================================================================== # 字典树加载: leaf 集 + {leaf: 全路径} + 全叶路径集 (与 lint 同款叶子派生) # =========================================================================== def _load_tree(name: str): """读 spec/taxonomy/{name}.json. 返回 (leaves:set, leaf2path:dict, control:list).""" f = TAX_DIR / f'{name}.json' if not f.exists(): return set(), {}, [] d = json.loads(f.read_text(encoding='utf-8')) leaf2path: dict[str, str] = {} def walk(node: dict, prefix: list[str]): nm = node.get('分类名称') if not nm: return p = prefix + [nm] kids = node.get('子分类') or [] if not kids: # 无子分类 = 叶子 leaf2path[nm] = '/'.join(p) for c in kids: walk(c, p) for top in d.get('最终分类树') or []: walk(top, []) leaves = set(d.get('$leaves') or leaf2path.keys()) return leaves, leaf2path, (d.get('$control') or []) EFFECT_LEAVES, EFFECT_PATHS, _ = _load_tree('effect') ACTION_LEAVES, ACTION_PATHS, ACTION_CONTROL = _load_tree('action') TYPE_LEAVES, TYPE_PATHS, _ = _load_tree('type') CONTROL_VOCAB = set(ACTION_CONTROL) | {'-'} # substance/form 校验结果缓存 (subprocess 较慢) _taxo_cache: dict[tuple[str, str], bool] = {} def _taxo_valid(dim: str, path: str) -> bool: """调 taxonomy-lookup.py --validate, exit 0 = 合法. 结果缓存.""" key = (dim, path) if key in _taxo_cache: return _taxo_cache[key] try: import os env = os.environ.copy() env['PYTHONIOENCODING'] = 'utf-8' r = subprocess.run( [sys.executable, str(LOOKUP), '--dim', dim, '--validate', path], capture_output=True, text=True, encoding='utf-8', errors='replace', env=env, ) ok = (r.returncode == 0) except Exception: ok = False # 校验器跑不起来时, 保守判非法 _taxo_cache[key] = ok return ok def _closest(name: str, leaves) -> str: """给个最接近的叶子名做提示 (子串/前缀朴素匹配, 仅供报错文案).""" cands = [lf for lf in leaves if name and (name in lf or lf in name)] return (' 最接近: ' + '/'.join(cands[:3])) if cands else '' # =========================================================================== # 字段校验 -> (ok, normalized_value, err_msg) # =========================================================================== def validate_field(field: str, value, proc: dict, pending_types: set[str] = None): # null 哨兵 (substance/form/url 可空) if value == '__null__': if field in ('substance', 'form', 'url'): return True, None, '' return False, value, f'__null__ 只对 substance/form/url 有意义, {field} 不可为 null' # focus 是数组: 逗号分隔 → list ('via,action,out-type-0'); 空串 → [] if field == 'focus': items = [t.strip() for t in str(value).split(',') if t.strip()] return True, items, '' if field == 'effect': if value in EFFECT_LEAVES: return True, value, '' # 给了全路径 -> 归一到叶名 (schema 存叶名) for leaf, path in EFFECT_PATHS.items(): if value == path: return True, leaf, '' return False, value, f'effect={value!r} 不是 effect.json 叶子(存叶名).{_closest(value, EFFECT_LEAVES)}' if field == 'action': # action 存全路径; 给叶名自动展开, 给全叶路径原样接受 if value in ACTION_PATHS: # 是叶名 return True, ACTION_PATHS[value], '' if value in ACTION_PATHS.values(): # 是合法叶路径 return True, value, '' return False, value, (f'action={value!r} 不是合法动作叶子/叶路径 ' f'(形如 生成/图像生成/文生图).{_closest(value.split("/")[-1], ACTION_LEAVES)}') if field == 'type': if value in TYPE_LEAVES: return True, value, '' reg = proc.get('type_registry') or {} if value in reg: return True, value, '' if pending_types and value in pending_types: return True, value, '' return False, value, (f'type={value!r} 不是 type.json 叶子, 也没在本工序 type_registry 注册. ' f'先 --set {proc.get("id")}.type_registry.{value}.extends=<叶子> 再用.{_closest(value, TYPE_LEAVES)}') if field == 'extends': # type_registry entry 的 extends 必须桥到 stdlib 叶子 if value in TYPE_LEAVES: return True, value, '' return False, value, f'type_registry extends={value!r} 必须是 type.json 叶子.{_closest(value, TYPE_LEAVES)}' if field == 'substance': if isinstance(value, str): if '+' in value: paths = [p.strip() for p in value.split('+') if p.strip()] else: paths = [value.strip()] elif isinstance(value, list): paths = [str(p).strip() for p in value if str(p).strip()] else: return False, value, 'substance 必须是字符串或数组' invalid_paths = [] for p in paths: if not _taxo_valid('实质', p): invalid_paths.append(p) if invalid_paths: return False, value, f'以下 substance 路径不在实质词表: {invalid_paths}' norm_val = paths if (isinstance(value, list) or (isinstance(value, str) and '+' in value)) else paths[0] return True, norm_val, '' if field == 'form': if isinstance(value, str): if '+' in value: paths = [p.strip() for p in value.split('+') if p.strip()] else: paths = [value.strip()] elif isinstance(value, list): paths = [str(p).strip() for p in value if str(p).strip()] else: return False, value, 'form 必须是字符串或数组' invalid_paths = [] for p in paths: if not _taxo_valid('形式', p): invalid_paths.append(p) if invalid_paths: return False, value, f'以下 form 路径不在形式词表: {invalid_paths}' norm_val = paths if (isinstance(value, list) or (isinstance(value, str) and '+' in value)) else paths[0] return True, norm_val, '' if field == 'anchor': if re.match(r'^\s*(←|→)', str(value)): return True, value, '' return False, value, f'anchor={value!r} 须以 ← (输入引用) 或 → (输出去向) 开头' if field == 'feature': if value in FEATURE_VOCAB: return True, value, '' return False, value, f'feature={value!r} 不在受控词 {sorted(FEATURE_VOCAB)}' if field == 'control': if value in CONTROL_VOCAB: return True, value, '' return False, value, f'control={value!r} 不在受控词 {sorted(CONTROL_VOCAB)}' if field == 'kind': if value in KIND_VOCAB: return True, value, '' return False, value, f'kind={value!r} 不在 {sorted(KIND_VOCAB)}' # 自由文本字段 (name/value/intent/via/purpose/category/platform/author/desc/group...) return True, value, '' # =========================================================================== # 路径解析 -> (parent_container, key, proc, field_name) # =========================================================================== _SEG = re.compile(r'^([^\[]+)(?:\[(\d+)\])?$') def _split_seg(seg: str): m = _SEG.match(seg) if not m: raise PathError(f'非法路径段 {seg!r}') return m.group(1), (int(m.group(2)) if m.group(2) is not None else None) def _descend(container, segs): """沿 segs 走进 container, 返回 (parent, last_key). 中间节点必须已存在. segs 每段可带 [i] 下标. last_key 是 dict 键 (str) 或列表下标 (int); 设置即 parent[last_key]=value, 删除即 del parent[last_key]. 用于 source.* / declarations.* 等通用路径 (proc/step 的 id 寻址不走这里). """ cur = container for i, seg in enumerate(segs): name, idx = _split_seg(seg) last = (i == len(segs) - 1) if last and idx is None: if not isinstance(cur, dict): raise PathError(f'{name!r} 的父级不是对象') return cur, name if not isinstance(cur, dict) or name not in cur: raise PathError(f'路径段 {name!r} 不存在, 无法下钻') nxt = cur[name] if idx is not None: if not isinstance(nxt, list) or idx >= len(nxt): raise PathError(f'{name}[{idx}] 越界或非列表') if last: return nxt, idx cur = nxt[idx] else: cur = nxt raise PathError('路径为空') def locate(data: dict, path: str): """把 path 解析到目标. 返回 (parent, key, proc, field_name). 设置即 parent[key] = value. proc 给校验提供 type_registry 上下文. proc / step 按 id 寻址 (不是下标); inputs/outputs 用 [i] 下标. step id 可能带点 (嵌套步 s2.1) — 用最长前缀匹配消歧 (s2.1 优先于 s2). """ if '.' not in path: raise PathError(f'路径太短 {path!r}, 至少 .<字段> 或 source.<字段>') proc_id, remainder = path.split('.', 1) # --- source.* 分支 (case-level 原帖信息, 无 proc 上下文) --- if proc_id == 'source': src = data.setdefault('source', {}) parent, key = _descend(src, remainder.split('.')) return parent, key, None, (key if isinstance(key, str) else '') proc = next((p for p in (data.get('procedures') or []) if p.get('id') == proc_id), None) if proc is None: ids = [p.get('id') for p in (data.get('procedures') or [])] raise PathError(f'找不到 procedure id={proc_id!r} (现有: {ids})') # --- type_registry 分支 (允许自动建段/条目) --- if remainder == 'type_registry' or remainder.startswith('type_registry.'): parts = remainder.split('.') if len(parts) == 3: reg = proc.setdefault('type_registry', {}) entry = reg.setdefault(parts[1], {}) return entry, parts[2], proc, parts[2] raise PathError('type_registry 路径形如 p1.type_registry.<类型名>.') # --- step 分支 (最长前缀匹配 step id, 兼容带点的嵌套步 id) --- matched = None for s in (proc.get('steps') or []): sid = s.get('id') if not sid: continue if remainder == sid: raise PathError(f'step 路径要带字段, 形如 {proc_id}.{sid}.effect') if remainder.startswith(sid + '.') and (matched is None or len(sid) > len(matched['id'])): matched = s if matched is not None: sid = matched['id'] field_part = remainder[len(sid) + 1:] # 'sid.' 之后 fsegs = field_part.split('.') name2, idx2 = _split_seg(fsegs[0]) if name2 in ('inputs', 'outputs'): if idx2 is None: raise PathError(f'{name2} 要带下标, 形如 {name2}[0]') lst = matched.get(name2) if not isinstance(lst, list) or idx2 >= len(lst): raise PathError(f'{proc_id}.{sid}.{name2}[{idx2}] 越界 (该 step 有 {len(lst or [])} 个 {name2})') if len(fsegs) != 2: raise PathError(f'IO 路径形如 {proc_id}.{sid}.{name2}[{idx2}].anchor') return lst[idx2], fsegs[1], proc, fsegs[1] else: if len(fsegs) != 1: raise PathError(f'step 标量字段形如 {proc_id}.{sid}.{name2}') return matched, name2, proc, name2 # --- proc 内其余路径: 头部字段 / declarations.* / return_row.* 等, 走通用下钻 --- parent, key = _descend(proc, remainder.split('.')) return parent, key, proc, (key if isinstance(key, str) else '') # =========================================================================== # 透传回填: anchor 为纯 ← sN.varname 的 IO, 从源 output 抄 value (逐字回填) # =========================================================================== def _is_fillable(value) -> bool: """该 value 算「还没真正回填」吗 — 空 / 占位符 / 引用文案.""" if value in (None, '', '-'): return True return bool(META_REF.search(str(value))) def _parse_passthrough(anchor, step_ids: list[str]): """把 anchor 解析成纯透传源 (src_step, src_name); 非干净透传返回 None. 只认 `← sN.varname` 形式 (sN 按已知 step id 最长前缀匹配, 兼容 s2.1); `← 工序输入` / `← s6 (链, 上一张)` / 带容器索引等不算 (无法确定唯一源 value). varname 末尾的 [i] / (...) 注释会被剥掉再查. """ m = re.match(r'^\s*←\s*(.+)$', str(anchor or '')) if not m: return None body = m.group(1).strip() for sid in sorted(step_ids, key=len, reverse=True): if body.startswith(sid + '.'): name = body[len(sid) + 1:].strip() name = re.sub(r'\s*[\[((].*$', '', name).strip() # 剥掉 [i] / (注释) return (sid, name) if name else None return None def _extract_ref(text, step_ids: list[str]): """从 directive/文案里抽 (src_step, src_name) 引用; 抽不出返回 None. 认「同 sN.name」「(同 sN.name 全文)」「见 sN.name」等. sN 按已知 step id 最长前缀匹配 (兼容 s2.1). """ m = re.search(r'[同见]\s*([^\s)),,。]+)', str(text or '')) if not m: return None body = m.group(1) for sid in sorted(step_ids, key=len, reverse=True): if body.startswith(sid + '.'): name = re.sub(r'\s*[\[((].*$', '', body[len(sid) + 1:]).strip() return (sid, name) if name else None return None def resolve_passthrough(data: dict): """把 anchor 为纯透传、value/directive 仍空或占位的位置, 用源 output 的 value 逐字填上. 覆盖两类: (a) IO 的 value (anchor=← sN.varname); (b) instruction 的 directive (文案里「同 sN.varname」). 迭代到不动点 (处理链式透传). 返回 (filled_msgs, warn_msgs). """ out_index = {} # (step_id, name) -> output item (读 value) step_ids: list[str] = [] for p in data.get('procedures') or []: for s in p.get('steps') or []: sid = s.get('id') if sid: step_ids.append(sid) for o in s.get('outputs') or []: if isinstance(o, dict) and o.get('name'): out_index[(sid, o['name'])] = o def _src_value(ref): """源存在且自己已填好 → 返回其 value; 否则 None.""" src = out_index.get(ref) if src is None or _is_fillable(src.get('value')): return None return src['value'] filled: list[str] = [] changed, rounds = True, 0 while changed and rounds < 20: changed, rounds = False, rounds + 1 for p in data.get('procedures') or []: for s in p.get('steps') or []: # (a) IO value for kind in ('inputs', 'outputs'): for idx, io in enumerate(s.get(kind) or []): if not isinstance(io, dict) or not _is_fillable(io.get('value')): continue pt = _parse_passthrough(io.get('anchor'), step_ids) val = _src_value(pt) if pt else None if val is None: continue io['value'] = val filled.append( f"{p.get('id')}.{s.get('id')}.{kind}[{idx}].value " f"← 复制自 {pt[0]}.{pt[1]} ({len(str(val))} 字)" ) changed = True # (b) instruction directive (喂给工具的 prompt = 引用的 output 原文) for di, pair in enumerate(s.get('instruction') or []): if not (isinstance(pair, list) and len(pair) == 2 and pair[0] == 'directive'): continue if not _is_fillable(pair[1]): continue ref = _extract_ref(pair[1], step_ids) val = _src_value(ref) if ref else None if val is None: continue pair[1] = val filled.append( f"{p.get('id')}.{s.get('id')}.instruction[{di}](directive) " f"← 复制自 {ref[0]}.{ref[1]} ({len(str(val))} 字)" ) changed = True # 仍填不动的透传 (源找不到) → warn warns: list[str] = [] for p in data.get('procedures') or []: for s in p.get('steps') or []: for kind in ('inputs', 'outputs'): for idx, io in enumerate(s.get(kind) or []): if not isinstance(io, dict) or not _is_fillable(io.get('value')): continue pt = _parse_passthrough(io.get('anchor'), step_ids) if pt and out_index.get(pt) is None: warns.append( f"{p.get('id')}.{s.get('id')}.{kind}[{idx}] anchor 指向 " f"{pt[0]}.{pt[1]} 但找不到该 output (检查 anchor / 变量名)" ) return filled, warns # =========================================================================== # 应用 # =========================================================================== def load_patches(args) -> list[tuple[str, str]]: """汇总 --set、--patch 与 --set-file 成 [(path, value), ...].""" def _norm(v): if isinstance(v, str): # 将中文全角双角/单引号自动归一化为标准半角引号,更利于 AI 生图引擎和 Prompt 语法识别 v = v.replace('“', '"').replace('”', '"').replace('‘', "'").replace('’', "'") return v out: list[tuple[str, str]] = [] for s in args.set or []: if '=' not in s: raise SystemExit(f'wf-patch: --set 缺 "=" : {s!r} (形如 path=value)') path, value = s.split('=', 1) # 只切第一个 '=' out.append((path.strip(), _norm(value))) # 🟢 新增:从外部文件读取值注入 for sf in getattr(args, 'set_file', None) or []: if '=' not in sf: raise SystemExit(f'wf-patch: --set-file 缺 "=" : {sf!r} (形如 path=file_path)') path, fpath_str = sf.split('=', 1) fpath = Path(fpath_str.strip()) if not fpath.exists(): raise SystemExit(f'wf-patch: --set-file 指定的文件不存在: {fpath_str}') try: value = fpath.read_text(encoding='utf-8') except Exception as e: raise SystemExit(f'wf-patch: 无法读取 --set-file 指定的文件 {fpath_str}: {e}') out.append((path.strip(), _norm(value))) if args.patch: if not args.patch.exists(): raise SystemExit(f'wf-patch: --patch 文件不存在 {args.patch}') try: items = json.loads(args.patch.read_text(encoding='utf-8')) except json.JSONDecodeError as e: raise SystemExit(f'wf-patch: --patch 不是合法 JSON: {e}') for it in items: out.append((it['path'], _norm(it['value']))) return out def main() -> None: ap = argparse.ArgumentParser( prog='wf-patch.py', description='workflow.json 安全批量字段设置器 (写入即校验, 任何一条非法整批不写)', ) ap.add_argument('--workflow', type=Path, required=True, help='目标 workflow.json') ap.add_argument('--set', action='append', metavar='PATH=VALUE', help='单条赋值, 可重复. 只在第一个 = 处切; value 可含 = 和空格 (记得整体加引号)') ap.add_argument('--patch', type=Path, default=None, help='批量赋值清单 .json: [{"path":..,"value":..}, ...]') ap.add_argument('--set-file', action='append', metavar='PATH=FILE_PATH', default=None, help='从外部文件读取内容注入指定字段. e.g. p1.s1.outputs[0].value=_scratch/prompt.txt') ap.add_argument('--unset', action='append', metavar='PATH', default=None, help='删字段, 可重复. e.g. p1.declarations.inputs[0].inferred (字段不存在则跳过). 取代手 Edit 删字段') ap.add_argument('--resolve-passthrough', action='store_true', help='把 anchor 为纯透传 (← sN.varname)、value 仍空/占位的 IO, 顺 anchor 从源 output 逐字抄 value. 可单独跑, 也可跟在 --set/--patch 后 (先赋值再解析). 迭代处理链式透传') ap.add_argument('--dry-run', action='store_true', help='只校验/预演, 不写') args = ap.parse_args() wf = args.workflow if not wf.exists(): print(f'wf-patch: 文件不存在 {wf}', file=sys.stderr) sys.exit(2) try: data = json.loads(wf.read_text(encoding='utf-8')) except json.JSONDecodeError as e: print(f'wf-patch: {wf} 不是合法 JSON: {e}', file=sys.stderr) sys.exit(2) patches = load_patches(args) unsets = args.unset or [] if not patches and not unsets and not args.resolve_passthrough: print('wf-patch: 没有 --set / --patch / --unset / --resolve-passthrough, 啥也没干', file=sys.stderr) sys.exit(2) # 解析 + 校验; 任何一条失败 -> 整批不写 pending_types = set() for path, _ in patches: m = re.match(r'^p\d+\.type_registry\.([^.]+)\.(extends|desc)$', path) if m: pending_types.add(m.group(1)) plan = [] # set: (parent, key, normalized_value, path, display) del_plan = [] # unset: (parent, key, path) skipped = [] # unset 跳过 (字段本就不在) errors = [] # (path, msg) for path, value in patches: try: parent, key, proc, field = locate(data, path) except PathError as e: errors.append((path, str(e))) continue ok, norm, msg = validate_field(field, value, proc, pending_types) if not ok: errors.append((path, msg)) continue plan.append((parent, key, norm, path, norm if norm is not None else 'null')) for path in unsets: try: parent, key, _proc, _field = locate(data, path) except PathError as e: errors.append((path, str(e))) continue present = (isinstance(parent, dict) and key in parent) or \ (isinstance(parent, list) and isinstance(key, int) and key < len(parent)) (del_plan if present else skipped).append((parent, key, path) if present else path) if patches or unsets: print(f'[wf-patch] {wf.name} — set {len(plan)}/{len(patches)} 通过, ' f'unset {len(del_plan)} 删/{len(skipped)} 跳过, {len(errors)} 失败') for _p, _k, _n, path, disp in plan: print(f' ✓ set {path} = {disp}') for _p, _k, path in del_plan: print(f' ✓ unset {path}') for path in skipped: print(f' · skip {path} (字段本就不存在)') for path, msg in errors: print(f' ✗ {path} — {msg}') if errors: print(f'\n有 {len(errors)} 条失败, 整批未写入 (修正后重跑).', file=sys.stderr) sys.exit(1) # 应用到内存 data (set 先 unset 后; resolve 要看到这些改动). 是否持久化由 dry-run 决定. for parent, key, norm, _, _ in plan: parent[key] = norm for parent, key, _path in sorted(del_plan, key=lambda d: -d[1] if isinstance(d[1], int) else 0): if isinstance(parent, list): parent.pop(key) else: del parent[key] # 透传回填 filled, warns = [], [] if args.resolve_passthrough: filled, warns = resolve_passthrough(data) print(f'[resolve-passthrough] 回填 {len(filled)} 处透传 value, {len(warns)} 处填不动') for m in filled: print(f' ✓ {m}') for w in warns: print(f' ⚠ {w}') n_changes = len(plan) + len(del_plan) + len(filled) if args.dry_run: print(f'\n--dry-run: 预演 {n_changes} 处改动, 未写入.') sys.exit(0) if n_changes == 0: print('\n没有改动 (透传 value 都已填好 / 无可赋值), 未写文件.') sys.exit(0) # 落盘 (安全序列化, 你从不手写 JSON) wf.write_text(json.dumps(data, ensure_ascii=False, indent=2) + '\n', encoding='utf-8') print(f'\n已写入 {n_changes} 处到 {wf.name}.') sys.exit(0) if __name__ == '__main__': main()