"""V2 历史数据归档清理(2026-06-12 拍板:归档后删)。 把生产库 content_agent_* 全部 21 张表中 created_at < cutoff 的 V2 时代行 导出为 JSON 档案后删除,并删掉 decode 时代的 7 个旧列 + rule_decisions 的 画像列(B1 已砍画像链)。生产库只有一台(.env CONTENT_SUPPLY_DB_*),只跑一次。 python scripts/archive_v2_db_rows.py # dry-run(默认,零写入) python scripts/archive_v2_db_rows.py --execute # 归档→对账→删行→删列 python scripts/archive_v2_db_rows.py --restore # 从档案回灌(删行回滚通道) 任一对账断言失败:回滚当前表、立即退出,绝不进入删列 DDL。 DDL 前 SHOW COLUMNS 探测,列已不存在则跳过——重跑幂等。 """ from __future__ import annotations import argparse import datetime as dt import decimal import json import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from content_agent.integrations.database_runtime import ContentSupplyDbConfig from scripts.validate_content_agent_db import EXPECTED_TABLES DEFAULT_CUTOFF = "2026-06-12 00:00:00" ARCHIVE_ROOT = ROOT / "archive" / "v2_db_archive" # 与 sql/content_agent_schema.sql、database_runtime.TABLE_COLUMNS 的收窄保持一致。 DDL_STATEMENTS = [ ( "content_agent_pattern_recall_evidence", ["decode_status", "decode_task_id", "matched_terms", "matched_category_paths", "decode_elements", "match_paths_request", "match_paths_response"], # 旧索引含 decode_status,显式重建为单列,不留给 MySQL 隐式收缩。 "ALTER TABLE content_agent_pattern_recall_evidence " "DROP INDEX idx_content_agent_pattern_recall_status, " "ADD INDEX idx_content_agent_pattern_recall_status (recall_status), " + ", ".join( f"DROP COLUMN {col}" for col in ["decode_status", "decode_task_id", "matched_terms", "matched_category_paths", "decode_elements", "match_paths_request", "match_paths_response"] ), ), ( "content_agent_rule_decisions", ["age_50_plus_level"], "ALTER TABLE content_agent_rule_decisions DROP COLUMN age_50_plus_level", ), ] def _json_default(value): if isinstance(value, (dt.datetime, dt.date)): return value.isoformat() if isinstance(value, decimal.Decimal): return str(value) if isinstance(value, bytes): return value.decode("utf-8", errors="replace") raise TypeError(f"unserializable: {type(value)}") def _table_columns(cursor, table: str) -> list[str]: cursor.execute(f"SHOW COLUMNS FROM {table}") return [row["Field"] for row in cursor.fetchall()] def dry_run(conn, cutoff: str) -> int: total = 0 with conn.cursor() as cursor: for table in EXPECTED_TABLES: cursor.execute( f"SELECT COUNT(*) AS n, MIN(created_at) AS a, MAX(created_at) AS b " f"FROM {table} WHERE created_at < %s", (cutoff,), ) row = cursor.fetchone() count = row["n"] total += count run_ids = "" if count and "run_id" in _table_columns(cursor, table): cursor.execute( f"SELECT DISTINCT run_id FROM {table} WHERE created_at < %s LIMIT 20", (cutoff,), ) run_ids = " runs=" + ",".join(r["run_id"] for r in cursor.fetchall()) span = f" [{row['a']} → {row['b']}]" if count else "" print(f" {table}: {count} 行{span}{run_ids}") print(f"\ndry-run 合计 {total} 行将被归档删除(cutoff={cutoff});未做任何写入。") return 0 def execute(conn, cutoff: str) -> int: stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ") out_dir = ARCHIVE_ROOT / stamp out_dir.mkdir(parents=True, exist_ok=True) manifest: dict = {"cutoff": cutoff, "executed_at": stamp, "tables": {}, "ddl_applied": False} for table in EXPECTED_TABLES: with conn.cursor() as cursor: cursor.execute(f"SELECT COUNT(*) AS n FROM {table} WHERE created_at < %s", (cutoff,)) count = cursor.fetchone()["n"] cursor.execute(f"SELECT * FROM {table} WHERE created_at < %s", (cutoff,)) rows = cursor.fetchall() if len(rows) != count: conn.rollback() print(f"FAIL {table}: SELECT {len(rows)} 行 != COUNT {count},中止(已回滚,未删任何行)") return 1 (out_dir / f"{table}.json").write_text( json.dumps(rows, ensure_ascii=False, indent=1, default=_json_default), encoding="utf-8", ) deleted = 0 if count: cursor.execute(f"DELETE FROM {table} WHERE created_at < %s", (cutoff,)) deleted = cursor.rowcount if deleted != count: conn.rollback() print(f"FAIL {table}: DELETE {deleted} != 归档 {count},已回滚,中止") return 1 cursor.execute( f"SELECT COUNT(*) AS n FROM {table} WHERE created_at < %s", (cutoff,) ) if cursor.fetchone()["n"] != 0: conn.rollback() print(f"FAIL {table}: 删后复核仍有残留,已回滚,中止") return 1 conn.commit() manifest["tables"][table] = {"archived": count, "deleted": deleted} print(f" {table}: 归档 {count} 行,删除 {deleted} 行 ✓") # 全表通过后才做 DDL。 with conn.cursor() as cursor: for table, columns, ddl in DDL_STATEMENTS: existing = set(_table_columns(cursor, table)) if not (set(columns) & existing): print(f" {table}: 待删列均已不存在,跳过 DDL(幂等重跑)") continue cursor.execute(ddl) print(f" {table}: 已删列 {', '.join(columns)} ✓") conn.commit() manifest["ddl_applied"] = True (out_dir / "manifest.json").write_text( json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8" ) print(f"\n完成。档案与 manifest 写入 {out_dir.relative_to(ROOT)}/") return 0 def restore(conn, archive_dir: Path) -> int: manifest = json.loads((archive_dir / "manifest.json").read_text(encoding="utf-8")) for table in manifest["tables"]: rows = json.loads((archive_dir / f"{table}.json").read_text(encoding="utf-8")) if not rows: continue with conn.cursor() as cursor: existing = set(_table_columns(cursor, table)) dropped = set(rows[0]) - existing if dropped: print(f" {table}: 警告——列 {sorted(dropped)} 已被 DROP,这些值只回 JSON 档案不回库") columns = [col for col in rows[0] if col in existing] placeholders = ", ".join(["%s"] * len(columns)) sql = ( f"INSERT IGNORE INTO {table} (" + ", ".join(f"`{c}`" for c in columns) + ")" f" VALUES ({placeholders})" ) cursor.executemany(sql, [[row.get(c) for c in columns] for row in rows]) print(f" {table}: 回灌 {cursor.rowcount}/{len(rows)} 行") conn.commit() return 0 def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--before", default=DEFAULT_CUTOFF, help="cutoff,默认 2026-06-12 00:00:00") parser.add_argument("--execute", action="store_true", help="真执行:归档→对账→删行→删列") parser.add_argument("--restore", metavar="DIR", help="从归档目录回灌(回滚通道)") args = parser.parse_args() conn = ContentSupplyDbConfig.from_env(ROOT / ".env").connect() try: if args.restore: return restore(conn, Path(args.restore)) if args.execute: return execute(conn, args.before) return dry_run(conn, args.before) finally: conn.close() if __name__ == "__main__": sys.exit(main())