build_config_from_excel.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. """Excel -> runtime JSON converter (V2-M1C), byte-equal via overlay-onto-base.
  2. Business edits the config Excel; this regenerates the runtime JSON. To guarantee
  3. byte-equality (so `policy_bundle_hash` and runtime behaviour never drift on a
  4. no-op regen), the converter uses the current JSON as a STRUCTURAL TEMPLATE
  5. (preserving key order) and overlays Excel cell values onto the matching base
  6. leaves, coercing each cell to the base value's type. Only paths that already
  7. exist in the base are overlaid — structurally divergent / non-value Excel
  8. columns are skipped, and non-sheet-backed JSON sections pass through unchanged.
  9. python scripts/build_config_from_excel.py --check # byte-equal? exit 1 on drift
  10. python scripts/build_config_from_excel.py --write # regenerate JSON from Excel
  11. This handles VALUE edits (thresholds, scores, gate values, flags). Structural
  12. changes (adding/removing rows) need a base update — out of V2-M1 scope.
  13. """
  14. from __future__ import annotations
  15. import argparse
  16. import json
  17. import sys
  18. from pathlib import Path
  19. from typing import Any
  20. ROOT = Path(__file__).resolve().parents[1]
  21. if str(ROOT) not in sys.path:
  22. sys.path.insert(0, str(ROOT))
  23. from openpyxl import load_workbook
  24. from content_agent.integrations import config_store
  25. from scripts.check_config_json_canonical import canonical_dumps
  26. RULE_PACK_XLSX = ROOT / "tech_documents/规则包映射/规则包映射配置表.xlsx"
  27. WALK_XLSX = ROOT / "tech_documents/游走策略/游走策略配置表.xlsx"
  28. RULE_PACK_JSON = ROOT / "product_documents/规则包/douyin_rule_packs.v1.json"
  29. WALK_JSON = ROOT / "product_documents/抖音游走策略/douyin_walk_strategy.v1.json"
  30. SENTINEL = "备注" # row 2 of every sheet; skipped
  31. SKIP_COLUMN = "注释" # human annotation column with no JSON home
  32. # A SheetSpec maps one Excel sheet onto a JSON section by overlay.
  33. # top-level: base[section] is a list of objects indexed by id_json (== row[id_excel]).
  34. # nested: base["rule_packs"][*][child_path...] grouped by parent_fk (rule_pack_id).
  35. class SheetSpec:
  36. def __init__(self, sheet, *, section, id_excel, id_json=None, renames=None, parent_fk=None, child_path=None):
  37. self.sheet = sheet
  38. self.section = section # top-level JSON key (for nested: "rule_packs")
  39. self.id_excel = id_excel
  40. self.id_json = id_json or id_excel
  41. self.renames = renames or {} # excel column -> dotted base path
  42. self.parent_fk = parent_fk # excel column holding rule_pack_id (nested only)
  43. self.child_path = child_path # dotted path within a rule_pack (nested only)
  44. _WHEN = {"field_path": "when.field", "operator": "when.op", "expected_value": "when.value"}
  45. RULE_PACK_SPECS = [
  46. SheetSpec("rule_pack_dispatch", section="rule_pack_dispatch", id_excel="dispatch_id"),
  47. SheetSpec("decision_reason_codes", section="decision_reason_codes", id_excel="decision_reason_code"),
  48. SheetSpec("effect_status_mapping", section="effect_status_mapping", id_excel="mapping_id"),
  49. SheetSpec("query_effect_aggregation", section="query_effect_aggregation", id_excel="aggregation_id"),
  50. SheetSpec("hard_gate_rules", section="rule_packs", child_path="hard_gates", parent_fk="rule_pack_id",
  51. id_excel="gate_id", renames={"gate_label": "label", **_WHEN}),
  52. SheetSpec("scorecard_dimensions", section="rule_packs", child_path="scorecard.dimensions", parent_fk="rule_pack_id",
  53. id_excel="dimension_key", id_json="key", renames={"dimension_label": "label"}),
  54. SheetSpec("scorecard_scoring_rules", section="rule_packs", child_path="scorecard.scoring_rules", parent_fk="rule_pack_id",
  55. id_excel="scoring_rule_id"),
  56. SheetSpec("threshold_actions", section="rule_packs", child_path="thresholds", parent_fk="rule_pack_id",
  57. id_excel="decision_reason_code"),
  58. ]
  59. # Walk sheets mirror JSON 1:1 (identity columns; only 注释 skipped).
  60. # V3 清理: 13 段收窄到 3 个仍被消费的段(其余 10 段已被 walk_graph+walk_policy 取代,
  61. # JSON 段与对应 Excel sheet 一并删除)。
  62. _WALK = {
  63. "walk_edge_catalog": "edge_id",
  64. "walk_rule_pack_binding": "binding_id",
  65. "walk_fact_contract": "runtime_file",
  66. }
  67. WALK_SPECS = [SheetSpec(sheet, section=sheet, id_excel=idc) for sheet, idc in _WALK.items()]
  68. def _read_rows(ws) -> list[dict[str, Any]]:
  69. headers = [c.value for c in ws[1]]
  70. rows = []
  71. for excel_row in ws.iter_rows(min_row=2, values_only=True):
  72. if excel_row and str(excel_row[0] or "").startswith(SENTINEL):
  73. continue # sentinel comment row
  74. rows.append({h: v for h, v in zip(headers, excel_row) if h is not None})
  75. return rows
  76. def _coerce(cell: Any, base_value: Any) -> Any:
  77. if isinstance(base_value, bool):
  78. return str(cell).strip().lower() in {"true", "1", "yes"} if not isinstance(cell, bool) else cell
  79. if isinstance(base_value, int):
  80. return base_value if cell is None or cell == "" else int(round(float(cell)))
  81. if isinstance(base_value, float):
  82. return base_value if cell is None or cell == "" else float(cell)
  83. if isinstance(base_value, (list, dict)):
  84. if isinstance(cell, str) and cell.strip():
  85. try:
  86. return json.loads(cell)
  87. except json.JSONDecodeError:
  88. return base_value # non-JSON cell (e.g. comma-joined) -> keep base
  89. return base_value if cell is None or cell == "" else cell
  90. if base_value is None:
  91. return None if cell is None or cell == "" else cell
  92. return base_value if cell is None else str(cell)
  93. def _set_if_exists(obj: dict[str, Any], dotted: str, cell: Any) -> None:
  94. parts = dotted.split(".")
  95. cursor = obj
  96. for part in parts[:-1]:
  97. if not isinstance(cursor, dict) or part not in cursor:
  98. return
  99. cursor = cursor[part]
  100. leaf = parts[-1]
  101. if isinstance(cursor, dict) and leaf in cursor:
  102. cursor[leaf] = _coerce(cell, cursor[leaf])
  103. def _overlay_obj(obj: dict[str, Any], row: dict[str, Any], spec: SheetSpec) -> None:
  104. for column, value in row.items():
  105. if column == SKIP_COLUMN or column == spec.id_excel or column == spec.parent_fk:
  106. continue
  107. path = spec.renames.get(column, column)
  108. _set_if_exists(obj, path, value)
  109. def _nested_targets(base: dict[str, Any], spec: SheetSpec) -> dict[tuple, dict[str, Any]]:
  110. index: dict[tuple, dict[str, Any]] = {}
  111. for pack in base.get("rule_packs", []):
  112. cursor: Any = pack
  113. for part in spec.child_path.split("."):
  114. cursor = cursor.get(part) if isinstance(cursor, dict) else None
  115. for child in cursor or []:
  116. index[(pack.get("rule_pack_id"), child.get(spec.id_json))] = child
  117. return index
  118. def overlay_workbook(xlsx: Path, base: dict[str, Any], specs: list[SheetSpec]) -> dict[str, Any]:
  119. wb = load_workbook(xlsx, data_only=True, read_only=True)
  120. for spec in specs:
  121. rows = _read_rows(wb[spec.sheet])
  122. if spec.child_path:
  123. index = _nested_targets(base, spec)
  124. for row in rows:
  125. obj = index.get((row.get(spec.parent_fk), row.get(spec.id_excel)))
  126. if obj is not None:
  127. _overlay_obj(obj, row, spec)
  128. else:
  129. index = {obj.get(spec.id_json): obj for obj in base.get(spec.section, [])}
  130. for row in rows:
  131. obj = index.get(row.get(spec.id_excel))
  132. if obj is not None:
  133. _overlay_obj(obj, row, spec)
  134. return base
  135. def _get_path(obj: dict[str, Any], dotted: str) -> tuple[bool, Any]:
  136. cursor: Any = obj
  137. for part in dotted.split("."):
  138. if not isinstance(cursor, dict) or part not in cursor:
  139. return False, None
  140. cursor = cursor[part]
  141. return True, cursor
  142. def _cell_value(value: Any) -> Any:
  143. if isinstance(value, bool):
  144. return value
  145. if isinstance(value, (list, dict)):
  146. return json.dumps(value, ensure_ascii=False)
  147. return value
  148. def sync_workbook(xlsx: Path, base: dict[str, Any], specs: list[SheetSpec]) -> int:
  149. """JSON -> Excel: write base values into the mapped Excel cells (round-trip safe).
  150. Used to reconcile a drifted Excel to the authoritative JSON (JSON wins). Only
  151. mapped value cells are overwritten; id / FK / 注释 / unmapped columns are left.
  152. """
  153. wb = load_workbook(xlsx)
  154. changed = 0
  155. for spec in specs:
  156. ws = wb[spec.sheet]
  157. headers = [c.value for c in ws[1]]
  158. col_idx = {h: i + 1 for i, h in enumerate(headers) if h is not None}
  159. if spec.child_path:
  160. index = _nested_targets(base, spec)
  161. else:
  162. index = {obj.get(spec.id_json): obj for obj in base.get(spec.section, [])}
  163. for r in range(2, ws.max_row + 1):
  164. first = ws.cell(row=r, column=1).value
  165. if first is not None and str(first).startswith(SENTINEL):
  166. continue
  167. row = {h: ws.cell(row=r, column=col_idx[h]).value for h in col_idx}
  168. if spec.child_path:
  169. obj = index.get((row.get(spec.parent_fk), row.get(spec.id_excel)))
  170. else:
  171. obj = index.get(row.get(spec.id_excel))
  172. if obj is None:
  173. continue
  174. for col, ci in col_idx.items():
  175. if col in {SKIP_COLUMN, spec.id_excel, spec.parent_fk}:
  176. continue
  177. exists, value = _get_path(obj, spec.renames.get(col, col))
  178. if not exists:
  179. continue
  180. new = _cell_value(value)
  181. if ws.cell(row=r, column=ci).value != new:
  182. ws.cell(row=r, column=ci, value=new)
  183. changed += 1
  184. wb.save(xlsx)
  185. return changed
  186. def build() -> dict[str, dict[str, Any]]:
  187. """Return {json_path: regenerated_obj} for both config files."""
  188. rule_pack_base, _ = config_store.load_json(RULE_PACK_JSON)
  189. walk_base, _ = config_store.load_json(WALK_JSON)
  190. return {
  191. str(RULE_PACK_JSON): overlay_workbook(RULE_PACK_XLSX, rule_pack_base, RULE_PACK_SPECS),
  192. str(WALK_JSON): overlay_workbook(WALK_XLSX, walk_base, WALK_SPECS),
  193. }
  194. def _first_diff(expected: str, actual: str) -> str:
  195. e, a = expected.splitlines(), actual.splitlines()
  196. for i in range(max(len(e), len(a))):
  197. el = e[i] if i < len(e) else "<missing>"
  198. al = a[i] if i < len(a) else "<missing>"
  199. if el != al:
  200. return f"line {i + 1}:\n json: {el}\n excel: {al}"
  201. return "<no line diff>"
  202. def main() -> int:
  203. args = _parse_args()
  204. if args.sync_excel:
  205. rule_pack_base, _ = config_store.load_json(RULE_PACK_JSON)
  206. walk_base, _ = config_store.load_json(WALK_JSON)
  207. n1 = sync_workbook(RULE_PACK_XLSX, rule_pack_base, RULE_PACK_SPECS)
  208. n2 = sync_workbook(WALK_XLSX, walk_base, WALK_SPECS)
  209. print(json.dumps({"status": "synced", "cells_changed": {"rule_pack": n1, "walk": n2}}, ensure_ascii=False, indent=2))
  210. return 0
  211. built = build()
  212. findings = []
  213. for path_str, obj in built.items():
  214. path = Path(path_str)
  215. generated = canonical_dumps(obj)
  216. current = path.read_text(encoding="utf-8")
  217. if args.write:
  218. path.write_text(generated, encoding="utf-8")
  219. findings.append({"config_path": str(path.relative_to(ROOT)), "written": True})
  220. else:
  221. ok = generated == current
  222. entry = {"config_path": str(path.relative_to(ROOT)), "byte_equal": ok}
  223. if not ok:
  224. entry["first_diff"] = _first_diff(current, generated)
  225. findings.append(entry)
  226. status = "written" if args.write else ("pass" if all(f.get("byte_equal") for f in findings) else "fail")
  227. print(json.dumps({"status": status, "findings": findings}, ensure_ascii=False, indent=2))
  228. return 0 if status in {"written", "pass"} else 1
  229. def _parse_args() -> argparse.Namespace:
  230. parser = argparse.ArgumentParser(description=__doc__)
  231. parser.add_argument("--check", action="store_true", help="byte-equal check (default)")
  232. parser.add_argument("--write", action="store_true", help="regenerate JSON from Excel instead of checking")
  233. parser.add_argument("--sync-excel", action="store_true", help="JSON wins: write JSON values back into Excel (one-time reconcile)")
  234. return parser.parse_args()
  235. if __name__ == "__main__":
  236. sys.exit(main())