| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- """Excel -> runtime JSON converter (V2-M1C), byte-equal via overlay-onto-base.
- Business edits the config Excel; this regenerates the runtime JSON. To guarantee
- byte-equality (so `policy_bundle_hash` and runtime behaviour never drift on a
- no-op regen), the converter uses the current JSON as a STRUCTURAL TEMPLATE
- (preserving key order) and overlays Excel cell values onto the matching base
- leaves, coercing each cell to the base value's type. Only paths that already
- exist in the base are overlaid — structurally divergent / non-value Excel
- columns are skipped, and non-sheet-backed JSON sections pass through unchanged.
- python scripts/build_config_from_excel.py --check # byte-equal? exit 1 on drift
- python scripts/build_config_from_excel.py --write # regenerate JSON from Excel
- This handles VALUE edits (thresholds, scores, gate values, flags). Structural
- changes (adding/removing rows) need a base update — out of V2-M1 scope.
- """
- from __future__ import annotations
- import argparse
- import json
- import sys
- from pathlib import Path
- from typing import Any
- ROOT = Path(__file__).resolve().parents[1]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- from openpyxl import load_workbook
- from content_agent.integrations import config_store
- from scripts.check_config_json_canonical import canonical_dumps
- RULE_PACK_XLSX = ROOT / "tech_documents/规则包映射/规则包映射配置表.xlsx"
- WALK_XLSX = ROOT / "tech_documents/游走策略/游走策略配置表.xlsx"
- RULE_PACK_JSON = ROOT / "product_documents/规则包/douyin_rule_packs.v1.json"
- WALK_JSON = ROOT / "product_documents/抖音游走策略/douyin_walk_strategy.v1.json"
- SENTINEL = "备注" # row 2 of every sheet; skipped
- SKIP_COLUMN = "注释" # human annotation column with no JSON home
- # A SheetSpec maps one Excel sheet onto a JSON section by overlay.
- # top-level: base[section] is a list of objects indexed by id_json (== row[id_excel]).
- # nested: base["rule_packs"][*][child_path...] grouped by parent_fk (rule_pack_id).
- class SheetSpec:
- def __init__(self, sheet, *, section, id_excel, id_json=None, renames=None, parent_fk=None, child_path=None):
- self.sheet = sheet
- self.section = section # top-level JSON key (for nested: "rule_packs")
- self.id_excel = id_excel
- self.id_json = id_json or id_excel
- self.renames = renames or {} # excel column -> dotted base path
- self.parent_fk = parent_fk # excel column holding rule_pack_id (nested only)
- self.child_path = child_path # dotted path within a rule_pack (nested only)
- _WHEN = {"field_path": "when.field", "operator": "when.op", "expected_value": "when.value"}
- RULE_PACK_SPECS = [
- SheetSpec("rule_pack_dispatch", section="rule_pack_dispatch", id_excel="dispatch_id"),
- SheetSpec("decision_reason_codes", section="decision_reason_codes", id_excel="decision_reason_code"),
- SheetSpec("effect_status_mapping", section="effect_status_mapping", id_excel="mapping_id"),
- SheetSpec("query_effect_aggregation", section="query_effect_aggregation", id_excel="aggregation_id"),
- SheetSpec("hard_gate_rules", section="rule_packs", child_path="hard_gates", parent_fk="rule_pack_id",
- id_excel="gate_id", renames={"gate_label": "label", **_WHEN}),
- SheetSpec("scorecard_dimensions", section="rule_packs", child_path="scorecard.dimensions", parent_fk="rule_pack_id",
- id_excel="dimension_key", id_json="key", renames={"dimension_label": "label"}),
- SheetSpec("scorecard_scoring_rules", section="rule_packs", child_path="scorecard.scoring_rules", parent_fk="rule_pack_id",
- id_excel="scoring_rule_id"),
- SheetSpec("threshold_actions", section="rule_packs", child_path="thresholds", parent_fk="rule_pack_id",
- id_excel="decision_reason_code"),
- ]
- # Walk sheets mirror JSON 1:1 (identity columns; only 注释 skipped).
- # V3 清理: 13 段收窄到 3 个仍被消费的段(其余 10 段已被 walk_graph+walk_policy 取代,
- # JSON 段与对应 Excel sheet 一并删除)。
- _WALK = {
- "walk_edge_catalog": "edge_id",
- "walk_rule_pack_binding": "binding_id",
- "walk_fact_contract": "runtime_file",
- }
- WALK_SPECS = [SheetSpec(sheet, section=sheet, id_excel=idc) for sheet, idc in _WALK.items()]
- def _read_rows(ws) -> list[dict[str, Any]]:
- headers = [c.value for c in ws[1]]
- rows = []
- for excel_row in ws.iter_rows(min_row=2, values_only=True):
- if excel_row and str(excel_row[0] or "").startswith(SENTINEL):
- continue # sentinel comment row
- rows.append({h: v for h, v in zip(headers, excel_row) if h is not None})
- return rows
- def _coerce(cell: Any, base_value: Any) -> Any:
- if isinstance(base_value, bool):
- return str(cell).strip().lower() in {"true", "1", "yes"} if not isinstance(cell, bool) else cell
- if isinstance(base_value, int):
- return base_value if cell is None or cell == "" else int(round(float(cell)))
- if isinstance(base_value, float):
- return base_value if cell is None or cell == "" else float(cell)
- if isinstance(base_value, (list, dict)):
- if isinstance(cell, str) and cell.strip():
- try:
- return json.loads(cell)
- except json.JSONDecodeError:
- return base_value # non-JSON cell (e.g. comma-joined) -> keep base
- return base_value if cell is None or cell == "" else cell
- if base_value is None:
- return None if cell is None or cell == "" else cell
- return base_value if cell is None else str(cell)
- def _set_if_exists(obj: dict[str, Any], dotted: str, cell: Any) -> None:
- parts = dotted.split(".")
- cursor = obj
- for part in parts[:-1]:
- if not isinstance(cursor, dict) or part not in cursor:
- return
- cursor = cursor[part]
- leaf = parts[-1]
- if isinstance(cursor, dict) and leaf in cursor:
- cursor[leaf] = _coerce(cell, cursor[leaf])
- def _overlay_obj(obj: dict[str, Any], row: dict[str, Any], spec: SheetSpec) -> None:
- for column, value in row.items():
- if column == SKIP_COLUMN or column == spec.id_excel or column == spec.parent_fk:
- continue
- path = spec.renames.get(column, column)
- _set_if_exists(obj, path, value)
- def _nested_targets(base: dict[str, Any], spec: SheetSpec) -> dict[tuple, dict[str, Any]]:
- index: dict[tuple, dict[str, Any]] = {}
- for pack in base.get("rule_packs", []):
- cursor: Any = pack
- for part in spec.child_path.split("."):
- cursor = cursor.get(part) if isinstance(cursor, dict) else None
- for child in cursor or []:
- index[(pack.get("rule_pack_id"), child.get(spec.id_json))] = child
- return index
- def overlay_workbook(xlsx: Path, base: dict[str, Any], specs: list[SheetSpec]) -> dict[str, Any]:
- wb = load_workbook(xlsx, data_only=True, read_only=True)
- for spec in specs:
- rows = _read_rows(wb[spec.sheet])
- if spec.child_path:
- index = _nested_targets(base, spec)
- for row in rows:
- obj = index.get((row.get(spec.parent_fk), row.get(spec.id_excel)))
- if obj is not None:
- _overlay_obj(obj, row, spec)
- else:
- index = {obj.get(spec.id_json): obj for obj in base.get(spec.section, [])}
- for row in rows:
- obj = index.get(row.get(spec.id_excel))
- if obj is not None:
- _overlay_obj(obj, row, spec)
- return base
- def _get_path(obj: dict[str, Any], dotted: str) -> tuple[bool, Any]:
- cursor: Any = obj
- for part in dotted.split("."):
- if not isinstance(cursor, dict) or part not in cursor:
- return False, None
- cursor = cursor[part]
- return True, cursor
- def _cell_value(value: Any) -> Any:
- if isinstance(value, bool):
- return value
- if isinstance(value, (list, dict)):
- return json.dumps(value, ensure_ascii=False)
- return value
- def sync_workbook(xlsx: Path, base: dict[str, Any], specs: list[SheetSpec]) -> int:
- """JSON -> Excel: write base values into the mapped Excel cells (round-trip safe).
- Used to reconcile a drifted Excel to the authoritative JSON (JSON wins). Only
- mapped value cells are overwritten; id / FK / 注释 / unmapped columns are left.
- """
- wb = load_workbook(xlsx)
- changed = 0
- for spec in specs:
- ws = wb[spec.sheet]
- headers = [c.value for c in ws[1]]
- col_idx = {h: i + 1 for i, h in enumerate(headers) if h is not None}
- if spec.child_path:
- index = _nested_targets(base, spec)
- else:
- index = {obj.get(spec.id_json): obj for obj in base.get(spec.section, [])}
- for r in range(2, ws.max_row + 1):
- first = ws.cell(row=r, column=1).value
- if first is not None and str(first).startswith(SENTINEL):
- continue
- row = {h: ws.cell(row=r, column=col_idx[h]).value for h in col_idx}
- if spec.child_path:
- obj = index.get((row.get(spec.parent_fk), row.get(spec.id_excel)))
- else:
- obj = index.get(row.get(spec.id_excel))
- if obj is None:
- continue
- for col, ci in col_idx.items():
- if col in {SKIP_COLUMN, spec.id_excel, spec.parent_fk}:
- continue
- exists, value = _get_path(obj, spec.renames.get(col, col))
- if not exists:
- continue
- new = _cell_value(value)
- if ws.cell(row=r, column=ci).value != new:
- ws.cell(row=r, column=ci, value=new)
- changed += 1
- wb.save(xlsx)
- return changed
- def build() -> dict[str, dict[str, Any]]:
- """Return {json_path: regenerated_obj} for both config files."""
- rule_pack_base, _ = config_store.load_json(RULE_PACK_JSON)
- walk_base, _ = config_store.load_json(WALK_JSON)
- return {
- str(RULE_PACK_JSON): overlay_workbook(RULE_PACK_XLSX, rule_pack_base, RULE_PACK_SPECS),
- str(WALK_JSON): overlay_workbook(WALK_XLSX, walk_base, WALK_SPECS),
- }
- def _first_diff(expected: str, actual: str) -> str:
- e, a = expected.splitlines(), actual.splitlines()
- for i in range(max(len(e), len(a))):
- el = e[i] if i < len(e) else "<missing>"
- al = a[i] if i < len(a) else "<missing>"
- if el != al:
- return f"line {i + 1}:\n json: {el}\n excel: {al}"
- return "<no line diff>"
- def main() -> int:
- args = _parse_args()
- if args.sync_excel:
- rule_pack_base, _ = config_store.load_json(RULE_PACK_JSON)
- walk_base, _ = config_store.load_json(WALK_JSON)
- n1 = sync_workbook(RULE_PACK_XLSX, rule_pack_base, RULE_PACK_SPECS)
- n2 = sync_workbook(WALK_XLSX, walk_base, WALK_SPECS)
- print(json.dumps({"status": "synced", "cells_changed": {"rule_pack": n1, "walk": n2}}, ensure_ascii=False, indent=2))
- return 0
- built = build()
- findings = []
- for path_str, obj in built.items():
- path = Path(path_str)
- generated = canonical_dumps(obj)
- current = path.read_text(encoding="utf-8")
- if args.write:
- path.write_text(generated, encoding="utf-8")
- findings.append({"config_path": str(path.relative_to(ROOT)), "written": True})
- else:
- ok = generated == current
- entry = {"config_path": str(path.relative_to(ROOT)), "byte_equal": ok}
- if not ok:
- entry["first_diff"] = _first_diff(current, generated)
- findings.append(entry)
- status = "written" if args.write else ("pass" if all(f.get("byte_equal") for f in findings) else "fail")
- print(json.dumps({"status": status, "findings": findings}, ensure_ascii=False, indent=2))
- return 0 if status in {"written", "pass"} else 1
- def _parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description=__doc__)
- parser.add_argument("--check", action="store_true", help="byte-equal check (default)")
- parser.add_argument("--write", action="store_true", help="regenerate JSON from Excel instead of checking")
- parser.add_argument("--sync-excel", action="store_true", help="JSON wins: write JSON values back into Excel (one-time reconcile)")
- return parser.parse_args()
- if __name__ == "__main__":
- sys.exit(main())
|