"""Excel -> runtime JSON converter (V2-M1C), byte-equal via overlay-onto-base. Business edits the config Excel; this regenerates the runtime JSON. To guarantee byte-equality (so `policy_bundle_hash` and runtime behaviour never drift on a no-op regen), the converter uses the current JSON as a STRUCTURAL TEMPLATE (preserving key order) and overlays Excel cell values onto the matching base leaves, coercing each cell to the base value's type. Only paths that already exist in the base are overlaid — structurally divergent / non-value Excel columns are skipped, and non-sheet-backed JSON sections pass through unchanged. python scripts/build_config_from_excel.py --check # byte-equal? exit 1 on drift python scripts/build_config_from_excel.py --write # regenerate JSON from Excel This handles VALUE edits (thresholds, scores, gate values, flags). Structural changes (adding/removing rows) need a base update — out of V2-M1 scope. """ from __future__ import annotations import argparse import json import sys from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from openpyxl import load_workbook from content_agent.integrations import config_store from scripts.check_config_json_canonical import canonical_dumps RULE_PACK_XLSX = ROOT / "tech_documents/规则包映射/规则包映射配置表.xlsx" WALK_XLSX = ROOT / "tech_documents/游走策略/游走策略配置表.xlsx" RULE_PACK_JSON = ROOT / "product_documents/规则包/douyin_rule_packs.v1.json" WALK_JSON = ROOT / "product_documents/抖音游走策略/douyin_walk_strategy.v1.json" SENTINEL = "备注" # row 2 of every sheet; skipped SKIP_COLUMN = "注释" # human annotation column with no JSON home # A SheetSpec maps one Excel sheet onto a JSON section by overlay. # top-level: base[section] is a list of objects indexed by id_json (== row[id_excel]). # nested: base["rule_packs"][*][child_path...] grouped by parent_fk (rule_pack_id). class SheetSpec: def __init__(self, sheet, *, section, id_excel, id_json=None, renames=None, parent_fk=None, child_path=None): self.sheet = sheet self.section = section # top-level JSON key (for nested: "rule_packs") self.id_excel = id_excel self.id_json = id_json or id_excel self.renames = renames or {} # excel column -> dotted base path self.parent_fk = parent_fk # excel column holding rule_pack_id (nested only) self.child_path = child_path # dotted path within a rule_pack (nested only) _WHEN = {"field_path": "when.field", "operator": "when.op", "expected_value": "when.value"} RULE_PACK_SPECS = [ SheetSpec("rule_pack_dispatch", section="rule_pack_dispatch", id_excel="dispatch_id"), SheetSpec("decision_reason_codes", section="decision_reason_codes", id_excel="decision_reason_code"), SheetSpec("effect_status_mapping", section="effect_status_mapping", id_excel="mapping_id"), SheetSpec("query_effect_aggregation", section="query_effect_aggregation", id_excel="aggregation_id"), SheetSpec("hard_gate_rules", section="rule_packs", child_path="hard_gates", parent_fk="rule_pack_id", id_excel="gate_id", renames={"gate_label": "label", **_WHEN}), SheetSpec("scorecard_dimensions", section="rule_packs", child_path="scorecard.dimensions", parent_fk="rule_pack_id", id_excel="dimension_key", id_json="key", renames={"dimension_label": "label"}), SheetSpec("scorecard_scoring_rules", section="rule_packs", child_path="scorecard.scoring_rules", parent_fk="rule_pack_id", id_excel="scoring_rule_id"), SheetSpec("threshold_actions", section="rule_packs", child_path="thresholds", parent_fk="rule_pack_id", id_excel="decision_reason_code"), ] # Walk sheets mirror JSON 1:1 (identity columns; only 注释 skipped). # V3 清理: 13 段收窄到 3 个仍被消费的段(其余 10 段已被 walk_graph+walk_policy 取代, # JSON 段与对应 Excel sheet 一并删除)。 _WALK = { "walk_edge_catalog": "edge_id", "walk_rule_pack_binding": "binding_id", "walk_fact_contract": "runtime_file", } WALK_SPECS = [SheetSpec(sheet, section=sheet, id_excel=idc) for sheet, idc in _WALK.items()] def _read_rows(ws) -> list[dict[str, Any]]: headers = [c.value for c in ws[1]] rows = [] for excel_row in ws.iter_rows(min_row=2, values_only=True): if excel_row and str(excel_row[0] or "").startswith(SENTINEL): continue # sentinel comment row rows.append({h: v for h, v in zip(headers, excel_row) if h is not None}) return rows def _coerce(cell: Any, base_value: Any) -> Any: if isinstance(base_value, bool): return str(cell).strip().lower() in {"true", "1", "yes"} if not isinstance(cell, bool) else cell if isinstance(base_value, int): return base_value if cell is None or cell == "" else int(round(float(cell))) if isinstance(base_value, float): return base_value if cell is None or cell == "" else float(cell) if isinstance(base_value, (list, dict)): if isinstance(cell, str) and cell.strip(): try: return json.loads(cell) except json.JSONDecodeError: return base_value # non-JSON cell (e.g. comma-joined) -> keep base return base_value if cell is None or cell == "" else cell if base_value is None: return None if cell is None or cell == "" else cell return base_value if cell is None else str(cell) def _set_if_exists(obj: dict[str, Any], dotted: str, cell: Any) -> None: parts = dotted.split(".") cursor = obj for part in parts[:-1]: if not isinstance(cursor, dict) or part not in cursor: return cursor = cursor[part] leaf = parts[-1] if isinstance(cursor, dict) and leaf in cursor: cursor[leaf] = _coerce(cell, cursor[leaf]) def _overlay_obj(obj: dict[str, Any], row: dict[str, Any], spec: SheetSpec) -> None: for column, value in row.items(): if column == SKIP_COLUMN or column == spec.id_excel or column == spec.parent_fk: continue path = spec.renames.get(column, column) _set_if_exists(obj, path, value) def _nested_targets(base: dict[str, Any], spec: SheetSpec) -> dict[tuple, dict[str, Any]]: index: dict[tuple, dict[str, Any]] = {} for pack in base.get("rule_packs", []): cursor: Any = pack for part in spec.child_path.split("."): cursor = cursor.get(part) if isinstance(cursor, dict) else None for child in cursor or []: index[(pack.get("rule_pack_id"), child.get(spec.id_json))] = child return index def overlay_workbook(xlsx: Path, base: dict[str, Any], specs: list[SheetSpec]) -> dict[str, Any]: wb = load_workbook(xlsx, data_only=True, read_only=True) for spec in specs: rows = _read_rows(wb[spec.sheet]) if spec.child_path: index = _nested_targets(base, spec) for row in rows: obj = index.get((row.get(spec.parent_fk), row.get(spec.id_excel))) if obj is not None: _overlay_obj(obj, row, spec) else: index = {obj.get(spec.id_json): obj for obj in base.get(spec.section, [])} for row in rows: obj = index.get(row.get(spec.id_excel)) if obj is not None: _overlay_obj(obj, row, spec) return base def _get_path(obj: dict[str, Any], dotted: str) -> tuple[bool, Any]: cursor: Any = obj for part in dotted.split("."): if not isinstance(cursor, dict) or part not in cursor: return False, None cursor = cursor[part] return True, cursor def _cell_value(value: Any) -> Any: if isinstance(value, bool): return value if isinstance(value, (list, dict)): return json.dumps(value, ensure_ascii=False) return value def sync_workbook(xlsx: Path, base: dict[str, Any], specs: list[SheetSpec]) -> int: """JSON -> Excel: write base values into the mapped Excel cells (round-trip safe). Used to reconcile a drifted Excel to the authoritative JSON (JSON wins). Only mapped value cells are overwritten; id / FK / 注释 / unmapped columns are left. """ wb = load_workbook(xlsx) changed = 0 for spec in specs: ws = wb[spec.sheet] headers = [c.value for c in ws[1]] col_idx = {h: i + 1 for i, h in enumerate(headers) if h is not None} if spec.child_path: index = _nested_targets(base, spec) else: index = {obj.get(spec.id_json): obj for obj in base.get(spec.section, [])} for r in range(2, ws.max_row + 1): first = ws.cell(row=r, column=1).value if first is not None and str(first).startswith(SENTINEL): continue row = {h: ws.cell(row=r, column=col_idx[h]).value for h in col_idx} if spec.child_path: obj = index.get((row.get(spec.parent_fk), row.get(spec.id_excel))) else: obj = index.get(row.get(spec.id_excel)) if obj is None: continue for col, ci in col_idx.items(): if col in {SKIP_COLUMN, spec.id_excel, spec.parent_fk}: continue exists, value = _get_path(obj, spec.renames.get(col, col)) if not exists: continue new = _cell_value(value) if ws.cell(row=r, column=ci).value != new: ws.cell(row=r, column=ci, value=new) changed += 1 wb.save(xlsx) return changed def build() -> dict[str, dict[str, Any]]: """Return {json_path: regenerated_obj} for both config files.""" rule_pack_base, _ = config_store.load_json(RULE_PACK_JSON) walk_base, _ = config_store.load_json(WALK_JSON) return { str(RULE_PACK_JSON): overlay_workbook(RULE_PACK_XLSX, rule_pack_base, RULE_PACK_SPECS), str(WALK_JSON): overlay_workbook(WALK_XLSX, walk_base, WALK_SPECS), } def _first_diff(expected: str, actual: str) -> str: e, a = expected.splitlines(), actual.splitlines() for i in range(max(len(e), len(a))): el = e[i] if i < len(e) else "" al = a[i] if i < len(a) else "" if el != al: return f"line {i + 1}:\n json: {el}\n excel: {al}" return "" def main() -> int: args = _parse_args() if args.sync_excel: rule_pack_base, _ = config_store.load_json(RULE_PACK_JSON) walk_base, _ = config_store.load_json(WALK_JSON) n1 = sync_workbook(RULE_PACK_XLSX, rule_pack_base, RULE_PACK_SPECS) n2 = sync_workbook(WALK_XLSX, walk_base, WALK_SPECS) print(json.dumps({"status": "synced", "cells_changed": {"rule_pack": n1, "walk": n2}}, ensure_ascii=False, indent=2)) return 0 built = build() findings = [] for path_str, obj in built.items(): path = Path(path_str) generated = canonical_dumps(obj) current = path.read_text(encoding="utf-8") if args.write: path.write_text(generated, encoding="utf-8") findings.append({"config_path": str(path.relative_to(ROOT)), "written": True}) else: ok = generated == current entry = {"config_path": str(path.relative_to(ROOT)), "byte_equal": ok} if not ok: entry["first_diff"] = _first_diff(current, generated) findings.append(entry) status = "written" if args.write else ("pass" if all(f.get("byte_equal") for f in findings) else "fail") print(json.dumps({"status": status, "findings": findings}, ensure_ascii=False, indent=2)) return 0 if status in {"written", "pass"} else 1 def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--check", action="store_true", help="byte-equal check (default)") parser.add_argument("--write", action="store_true", help="regenerate JSON from Excel instead of checking") parser.add_argument("--sync-excel", action="store_true", help="JSON wins: write JSON values back into Excel (one-time reconcile)") return parser.parse_args() if __name__ == "__main__": sys.exit(main())