archive_v2_db_rows.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. """V2 历史数据归档清理(2026-06-12 拍板:归档后删)。
  2. 把生产库 content_agent_* 全部 21 张表中 created_at < cutoff 的 V2 时代行
  3. 导出为 JSON 档案后删除,并删掉 decode 时代的 7 个旧列 + rule_decisions 的
  4. 画像列(B1 已砍画像链)。生产库只有一台(.env CONTENT_SUPPLY_DB_*),只跑一次。
  5. python scripts/archive_v2_db_rows.py # dry-run(默认,零写入)
  6. python scripts/archive_v2_db_rows.py --execute # 归档→对账→删行→删列
  7. python scripts/archive_v2_db_rows.py --restore <dir> # 从档案回灌(删行回滚通道)
  8. 任一对账断言失败:回滚当前表、立即退出,绝不进入删列 DDL。
  9. DDL 前 SHOW COLUMNS 探测,列已不存在则跳过——重跑幂等。
  10. """
  11. from __future__ import annotations
  12. import argparse
  13. import datetime as dt
  14. import decimal
  15. import json
  16. import sys
  17. from pathlib import Path
  18. ROOT = Path(__file__).resolve().parents[1]
  19. if str(ROOT) not in sys.path:
  20. sys.path.insert(0, str(ROOT))
  21. from content_agent.integrations.database_runtime import ContentSupplyDbConfig
  22. from scripts.validate_content_agent_db import EXPECTED_TABLES
  23. DEFAULT_CUTOFF = "2026-06-12 00:00:00"
  24. ARCHIVE_ROOT = ROOT / "archive" / "v2_db_archive"
  25. # 与 sql/content_agent_schema.sql、database_runtime.TABLE_COLUMNS 的收窄保持一致。
  26. DDL_STATEMENTS = [
  27. (
  28. "content_agent_pattern_recall_evidence",
  29. ["decode_status", "decode_task_id", "matched_terms", "matched_category_paths",
  30. "decode_elements", "match_paths_request", "match_paths_response"],
  31. # 旧索引含 decode_status,显式重建为单列,不留给 MySQL 隐式收缩。
  32. "ALTER TABLE content_agent_pattern_recall_evidence "
  33. "DROP INDEX idx_content_agent_pattern_recall_status, "
  34. "ADD INDEX idx_content_agent_pattern_recall_status (recall_status), "
  35. + ", ".join(
  36. f"DROP COLUMN {col}"
  37. for col in ["decode_status", "decode_task_id", "matched_terms",
  38. "matched_category_paths", "decode_elements",
  39. "match_paths_request", "match_paths_response"]
  40. ),
  41. ),
  42. (
  43. "content_agent_rule_decisions",
  44. ["age_50_plus_level"],
  45. "ALTER TABLE content_agent_rule_decisions DROP COLUMN age_50_plus_level",
  46. ),
  47. ]
  48. def _json_default(value):
  49. if isinstance(value, (dt.datetime, dt.date)):
  50. return value.isoformat()
  51. if isinstance(value, decimal.Decimal):
  52. return str(value)
  53. if isinstance(value, bytes):
  54. return value.decode("utf-8", errors="replace")
  55. raise TypeError(f"unserializable: {type(value)}")
  56. def _table_columns(cursor, table: str) -> list[str]:
  57. cursor.execute(f"SHOW COLUMNS FROM {table}")
  58. return [row["Field"] for row in cursor.fetchall()]
  59. def dry_run(conn, cutoff: str) -> int:
  60. total = 0
  61. with conn.cursor() as cursor:
  62. for table in EXPECTED_TABLES:
  63. cursor.execute(
  64. f"SELECT COUNT(*) AS n, MIN(created_at) AS a, MAX(created_at) AS b "
  65. f"FROM {table} WHERE created_at < %s",
  66. (cutoff,),
  67. )
  68. row = cursor.fetchone()
  69. count = row["n"]
  70. total += count
  71. run_ids = ""
  72. if count and "run_id" in _table_columns(cursor, table):
  73. cursor.execute(
  74. f"SELECT DISTINCT run_id FROM {table} WHERE created_at < %s LIMIT 20",
  75. (cutoff,),
  76. )
  77. run_ids = " runs=" + ",".join(r["run_id"] for r in cursor.fetchall())
  78. span = f" [{row['a']} → {row['b']}]" if count else ""
  79. print(f" {table}: {count} 行{span}{run_ids}")
  80. print(f"\ndry-run 合计 {total} 行将被归档删除(cutoff={cutoff});未做任何写入。")
  81. return 0
  82. def execute(conn, cutoff: str) -> int:
  83. stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
  84. out_dir = ARCHIVE_ROOT / stamp
  85. out_dir.mkdir(parents=True, exist_ok=True)
  86. manifest: dict = {"cutoff": cutoff, "executed_at": stamp, "tables": {}, "ddl_applied": False}
  87. for table in EXPECTED_TABLES:
  88. with conn.cursor() as cursor:
  89. cursor.execute(f"SELECT COUNT(*) AS n FROM {table} WHERE created_at < %s", (cutoff,))
  90. count = cursor.fetchone()["n"]
  91. cursor.execute(f"SELECT * FROM {table} WHERE created_at < %s", (cutoff,))
  92. rows = cursor.fetchall()
  93. if len(rows) != count:
  94. conn.rollback()
  95. print(f"FAIL {table}: SELECT {len(rows)} 行 != COUNT {count},中止(已回滚,未删任何行)")
  96. return 1
  97. (out_dir / f"{table}.json").write_text(
  98. json.dumps(rows, ensure_ascii=False, indent=1, default=_json_default),
  99. encoding="utf-8",
  100. )
  101. deleted = 0
  102. if count:
  103. cursor.execute(f"DELETE FROM {table} WHERE created_at < %s", (cutoff,))
  104. deleted = cursor.rowcount
  105. if deleted != count:
  106. conn.rollback()
  107. print(f"FAIL {table}: DELETE {deleted} != 归档 {count},已回滚,中止")
  108. return 1
  109. cursor.execute(
  110. f"SELECT COUNT(*) AS n FROM {table} WHERE created_at < %s", (cutoff,)
  111. )
  112. if cursor.fetchone()["n"] != 0:
  113. conn.rollback()
  114. print(f"FAIL {table}: 删后复核仍有残留,已回滚,中止")
  115. return 1
  116. conn.commit()
  117. manifest["tables"][table] = {"archived": count, "deleted": deleted}
  118. print(f" {table}: 归档 {count} 行,删除 {deleted} 行 ✓")
  119. # 全表通过后才做 DDL。
  120. with conn.cursor() as cursor:
  121. for table, columns, ddl in DDL_STATEMENTS:
  122. existing = set(_table_columns(cursor, table))
  123. if not (set(columns) & existing):
  124. print(f" {table}: 待删列均已不存在,跳过 DDL(幂等重跑)")
  125. continue
  126. cursor.execute(ddl)
  127. print(f" {table}: 已删列 {', '.join(columns)} ✓")
  128. conn.commit()
  129. manifest["ddl_applied"] = True
  130. (out_dir / "manifest.json").write_text(
  131. json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
  132. )
  133. print(f"\n完成。档案与 manifest 写入 {out_dir.relative_to(ROOT)}/")
  134. return 0
  135. def restore(conn, archive_dir: Path) -> int:
  136. manifest = json.loads((archive_dir / "manifest.json").read_text(encoding="utf-8"))
  137. for table in manifest["tables"]:
  138. rows = json.loads((archive_dir / f"{table}.json").read_text(encoding="utf-8"))
  139. if not rows:
  140. continue
  141. with conn.cursor() as cursor:
  142. existing = set(_table_columns(cursor, table))
  143. dropped = set(rows[0]) - existing
  144. if dropped:
  145. print(f" {table}: 警告——列 {sorted(dropped)} 已被 DROP,这些值只回 JSON 档案不回库")
  146. columns = [col for col in rows[0] if col in existing]
  147. placeholders = ", ".join(["%s"] * len(columns))
  148. sql = (
  149. f"INSERT IGNORE INTO {table} (" + ", ".join(f"`{c}`" for c in columns) + ")"
  150. f" VALUES ({placeholders})"
  151. )
  152. cursor.executemany(sql, [[row.get(c) for c in columns] for row in rows])
  153. print(f" {table}: 回灌 {cursor.rowcount}/{len(rows)} 行")
  154. conn.commit()
  155. return 0
  156. def main() -> int:
  157. parser = argparse.ArgumentParser(description=__doc__)
  158. parser.add_argument("--before", default=DEFAULT_CUTOFF, help="cutoff,默认 2026-06-12 00:00:00")
  159. parser.add_argument("--execute", action="store_true", help="真执行:归档→对账→删行→删列")
  160. parser.add_argument("--restore", metavar="DIR", help="从归档目录回灌(回滚通道)")
  161. args = parser.parse_args()
  162. conn = ContentSupplyDbConfig.from_env(ROOT / ".env").connect()
  163. try:
  164. if args.restore:
  165. return restore(conn, Path(args.restore))
  166. if args.execute:
  167. return execute(conn, args.before)
  168. return dry_run(conn, args.before)
  169. finally:
  170. conn.close()
  171. if __name__ == "__main__":
  172. sys.exit(main())