| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- """V2 历史数据归档清理(2026-06-12 拍板:归档后删)。
- 把生产库 content_agent_* 全部 21 张表中 created_at < cutoff 的 V2 时代行
- 导出为 JSON 档案后删除,并删掉 decode 时代的 7 个旧列 + rule_decisions 的
- 画像列(B1 已砍画像链)。生产库只有一台(.env CONTENT_SUPPLY_DB_*),只跑一次。
- python scripts/archive_v2_db_rows.py # dry-run(默认,零写入)
- python scripts/archive_v2_db_rows.py --execute # 归档→对账→删行→删列
- python scripts/archive_v2_db_rows.py --restore <dir> # 从档案回灌(删行回滚通道)
- 任一对账断言失败:回滚当前表、立即退出,绝不进入删列 DDL。
- DDL 前 SHOW COLUMNS 探测,列已不存在则跳过——重跑幂等。
- """
- from __future__ import annotations
- import argparse
- import datetime as dt
- import decimal
- import json
- import sys
- from pathlib import Path
- ROOT = Path(__file__).resolve().parents[1]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- from content_agent.integrations.database_runtime import ContentSupplyDbConfig
- from scripts.validate_content_agent_db import EXPECTED_TABLES
- DEFAULT_CUTOFF = "2026-06-12 00:00:00"
- ARCHIVE_ROOT = ROOT / "archive" / "v2_db_archive"
- # 与 sql/content_agent_schema.sql、database_runtime.TABLE_COLUMNS 的收窄保持一致。
- DDL_STATEMENTS = [
- (
- "content_agent_pattern_recall_evidence",
- ["decode_status", "decode_task_id", "matched_terms", "matched_category_paths",
- "decode_elements", "match_paths_request", "match_paths_response"],
- # 旧索引含 decode_status,显式重建为单列,不留给 MySQL 隐式收缩。
- "ALTER TABLE content_agent_pattern_recall_evidence "
- "DROP INDEX idx_content_agent_pattern_recall_status, "
- "ADD INDEX idx_content_agent_pattern_recall_status (recall_status), "
- + ", ".join(
- f"DROP COLUMN {col}"
- for col in ["decode_status", "decode_task_id", "matched_terms",
- "matched_category_paths", "decode_elements",
- "match_paths_request", "match_paths_response"]
- ),
- ),
- (
- "content_agent_rule_decisions",
- ["age_50_plus_level"],
- "ALTER TABLE content_agent_rule_decisions DROP COLUMN age_50_plus_level",
- ),
- ]
- def _json_default(value):
- if isinstance(value, (dt.datetime, dt.date)):
- return value.isoformat()
- if isinstance(value, decimal.Decimal):
- return str(value)
- if isinstance(value, bytes):
- return value.decode("utf-8", errors="replace")
- raise TypeError(f"unserializable: {type(value)}")
- def _table_columns(cursor, table: str) -> list[str]:
- cursor.execute(f"SHOW COLUMNS FROM {table}")
- return [row["Field"] for row in cursor.fetchall()]
- def dry_run(conn, cutoff: str) -> int:
- total = 0
- with conn.cursor() as cursor:
- for table in EXPECTED_TABLES:
- cursor.execute(
- f"SELECT COUNT(*) AS n, MIN(created_at) AS a, MAX(created_at) AS b "
- f"FROM {table} WHERE created_at < %s",
- (cutoff,),
- )
- row = cursor.fetchone()
- count = row["n"]
- total += count
- run_ids = ""
- if count and "run_id" in _table_columns(cursor, table):
- cursor.execute(
- f"SELECT DISTINCT run_id FROM {table} WHERE created_at < %s LIMIT 20",
- (cutoff,),
- )
- run_ids = " runs=" + ",".join(r["run_id"] for r in cursor.fetchall())
- span = f" [{row['a']} → {row['b']}]" if count else ""
- print(f" {table}: {count} 行{span}{run_ids}")
- print(f"\ndry-run 合计 {total} 行将被归档删除(cutoff={cutoff});未做任何写入。")
- return 0
- def execute(conn, cutoff: str) -> int:
- stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
- out_dir = ARCHIVE_ROOT / stamp
- out_dir.mkdir(parents=True, exist_ok=True)
- manifest: dict = {"cutoff": cutoff, "executed_at": stamp, "tables": {}, "ddl_applied": False}
- for table in EXPECTED_TABLES:
- with conn.cursor() as cursor:
- cursor.execute(f"SELECT COUNT(*) AS n FROM {table} WHERE created_at < %s", (cutoff,))
- count = cursor.fetchone()["n"]
- cursor.execute(f"SELECT * FROM {table} WHERE created_at < %s", (cutoff,))
- rows = cursor.fetchall()
- if len(rows) != count:
- conn.rollback()
- print(f"FAIL {table}: SELECT {len(rows)} 行 != COUNT {count},中止(已回滚,未删任何行)")
- return 1
- (out_dir / f"{table}.json").write_text(
- json.dumps(rows, ensure_ascii=False, indent=1, default=_json_default),
- encoding="utf-8",
- )
- deleted = 0
- if count:
- cursor.execute(f"DELETE FROM {table} WHERE created_at < %s", (cutoff,))
- deleted = cursor.rowcount
- if deleted != count:
- conn.rollback()
- print(f"FAIL {table}: DELETE {deleted} != 归档 {count},已回滚,中止")
- return 1
- cursor.execute(
- f"SELECT COUNT(*) AS n FROM {table} WHERE created_at < %s", (cutoff,)
- )
- if cursor.fetchone()["n"] != 0:
- conn.rollback()
- print(f"FAIL {table}: 删后复核仍有残留,已回滚,中止")
- return 1
- conn.commit()
- manifest["tables"][table] = {"archived": count, "deleted": deleted}
- print(f" {table}: 归档 {count} 行,删除 {deleted} 行 ✓")
- # 全表通过后才做 DDL。
- with conn.cursor() as cursor:
- for table, columns, ddl in DDL_STATEMENTS:
- existing = set(_table_columns(cursor, table))
- if not (set(columns) & existing):
- print(f" {table}: 待删列均已不存在,跳过 DDL(幂等重跑)")
- continue
- cursor.execute(ddl)
- print(f" {table}: 已删列 {', '.join(columns)} ✓")
- conn.commit()
- manifest["ddl_applied"] = True
- (out_dir / "manifest.json").write_text(
- json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
- )
- print(f"\n完成。档案与 manifest 写入 {out_dir.relative_to(ROOT)}/")
- return 0
- def restore(conn, archive_dir: Path) -> int:
- manifest = json.loads((archive_dir / "manifest.json").read_text(encoding="utf-8"))
- for table in manifest["tables"]:
- rows = json.loads((archive_dir / f"{table}.json").read_text(encoding="utf-8"))
- if not rows:
- continue
- with conn.cursor() as cursor:
- existing = set(_table_columns(cursor, table))
- dropped = set(rows[0]) - existing
- if dropped:
- print(f" {table}: 警告——列 {sorted(dropped)} 已被 DROP,这些值只回 JSON 档案不回库")
- columns = [col for col in rows[0] if col in existing]
- placeholders = ", ".join(["%s"] * len(columns))
- sql = (
- f"INSERT IGNORE INTO {table} (" + ", ".join(f"`{c}`" for c in columns) + ")"
- f" VALUES ({placeholders})"
- )
- cursor.executemany(sql, [[row.get(c) for c in columns] for row in rows])
- print(f" {table}: 回灌 {cursor.rowcount}/{len(rows)} 行")
- conn.commit()
- return 0
- def main() -> int:
- parser = argparse.ArgumentParser(description=__doc__)
- parser.add_argument("--before", default=DEFAULT_CUTOFF, help="cutoff,默认 2026-06-12 00:00:00")
- parser.add_argument("--execute", action="store_true", help="真执行:归档→对账→删行→删列")
- parser.add_argument("--restore", metavar="DIR", help="从归档目录回灌(回滚通道)")
- args = parser.parse_args()
- conn = ContentSupplyDbConfig.from_env(ROOT / ".env").connect()
- try:
- if args.restore:
- return restore(conn, Path(args.restore))
- if args.execute:
- return execute(conn, args.before)
- return dry_run(conn, args.before)
- finally:
- conn.close()
- if __name__ == "__main__":
- sys.exit(main())
|