"""V2 历史数据归档清理(2026-06-12 拍板:归档后删)。
把生产库 content_agent_* 全部 21 张表中 created_at < cutoff 的 V2 时代行
导出为 JSON 档案后删除,并删掉 decode 时代的 7 个旧列 + rule_decisions 的
画像列(B1 已砍画像链)。生产库只有一台(.env CONTENT_SUPPLY_DB_*),只跑一次。
python scripts/archive_v2_db_rows.py # dry-run(默认,零写入)
python scripts/archive_v2_db_rows.py --execute # 归档→对账→删行→删列
python scripts/archive_v2_db_rows.py --restore
# 从档案回灌(删行回滚通道)
任一对账断言失败:回滚当前表、立即退出,绝不进入删列 DDL。
DDL 前 SHOW COLUMNS 探测,列已不存在则跳过——重跑幂等。
"""
from __future__ import annotations
import argparse
import datetime as dt
import decimal
import json
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from content_agent.integrations.database_runtime import ContentSupplyDbConfig
from scripts.validate_content_agent_db import EXPECTED_TABLES
DEFAULT_CUTOFF = "2026-06-12 00:00:00"
ARCHIVE_ROOT = ROOT / "archive" / "v2_db_archive"
# 与 sql/content_agent_schema.sql、database_runtime.TABLE_COLUMNS 的收窄保持一致。
DDL_STATEMENTS = [
(
"content_agent_pattern_recall_evidence",
["decode_status", "decode_task_id", "matched_terms", "matched_category_paths",
"decode_elements", "match_paths_request", "match_paths_response"],
# 旧索引含 decode_status,显式重建为单列,不留给 MySQL 隐式收缩。
"ALTER TABLE content_agent_pattern_recall_evidence "
"DROP INDEX idx_content_agent_pattern_recall_status, "
"ADD INDEX idx_content_agent_pattern_recall_status (recall_status), "
+ ", ".join(
f"DROP COLUMN {col}"
for col in ["decode_status", "decode_task_id", "matched_terms",
"matched_category_paths", "decode_elements",
"match_paths_request", "match_paths_response"]
),
),
(
"content_agent_rule_decisions",
["age_50_plus_level"],
"ALTER TABLE content_agent_rule_decisions DROP COLUMN age_50_plus_level",
),
]
def _json_default(value):
if isinstance(value, (dt.datetime, dt.date)):
return value.isoformat()
if isinstance(value, decimal.Decimal):
return str(value)
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace")
raise TypeError(f"unserializable: {type(value)}")
def _table_columns(cursor, table: str) -> list[str]:
cursor.execute(f"SHOW COLUMNS FROM {table}")
return [row["Field"] for row in cursor.fetchall()]
def dry_run(conn, cutoff: str) -> int:
total = 0
with conn.cursor() as cursor:
for table in EXPECTED_TABLES:
cursor.execute(
f"SELECT COUNT(*) AS n, MIN(created_at) AS a, MAX(created_at) AS b "
f"FROM {table} WHERE created_at < %s",
(cutoff,),
)
row = cursor.fetchone()
count = row["n"]
total += count
run_ids = ""
if count and "run_id" in _table_columns(cursor, table):
cursor.execute(
f"SELECT DISTINCT run_id FROM {table} WHERE created_at < %s LIMIT 20",
(cutoff,),
)
run_ids = " runs=" + ",".join(r["run_id"] for r in cursor.fetchall())
span = f" [{row['a']} → {row['b']}]" if count else ""
print(f" {table}: {count} 行{span}{run_ids}")
print(f"\ndry-run 合计 {total} 行将被归档删除(cutoff={cutoff});未做任何写入。")
return 0
def execute(conn, cutoff: str) -> int:
stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
out_dir = ARCHIVE_ROOT / stamp
out_dir.mkdir(parents=True, exist_ok=True)
manifest: dict = {"cutoff": cutoff, "executed_at": stamp, "tables": {}, "ddl_applied": False}
for table in EXPECTED_TABLES:
with conn.cursor() as cursor:
cursor.execute(f"SELECT COUNT(*) AS n FROM {table} WHERE created_at < %s", (cutoff,))
count = cursor.fetchone()["n"]
cursor.execute(f"SELECT * FROM {table} WHERE created_at < %s", (cutoff,))
rows = cursor.fetchall()
if len(rows) != count:
conn.rollback()
print(f"FAIL {table}: SELECT {len(rows)} 行 != COUNT {count},中止(已回滚,未删任何行)")
return 1
(out_dir / f"{table}.json").write_text(
json.dumps(rows, ensure_ascii=False, indent=1, default=_json_default),
encoding="utf-8",
)
deleted = 0
if count:
cursor.execute(f"DELETE FROM {table} WHERE created_at < %s", (cutoff,))
deleted = cursor.rowcount
if deleted != count:
conn.rollback()
print(f"FAIL {table}: DELETE {deleted} != 归档 {count},已回滚,中止")
return 1
cursor.execute(
f"SELECT COUNT(*) AS n FROM {table} WHERE created_at < %s", (cutoff,)
)
if cursor.fetchone()["n"] != 0:
conn.rollback()
print(f"FAIL {table}: 删后复核仍有残留,已回滚,中止")
return 1
conn.commit()
manifest["tables"][table] = {"archived": count, "deleted": deleted}
print(f" {table}: 归档 {count} 行,删除 {deleted} 行 ✓")
# 全表通过后才做 DDL。
with conn.cursor() as cursor:
for table, columns, ddl in DDL_STATEMENTS:
existing = set(_table_columns(cursor, table))
if not (set(columns) & existing):
print(f" {table}: 待删列均已不存在,跳过 DDL(幂等重跑)")
continue
cursor.execute(ddl)
print(f" {table}: 已删列 {', '.join(columns)} ✓")
conn.commit()
manifest["ddl_applied"] = True
(out_dir / "manifest.json").write_text(
json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8"
)
print(f"\n完成。档案与 manifest 写入 {out_dir.relative_to(ROOT)}/")
return 0
def restore(conn, archive_dir: Path) -> int:
manifest = json.loads((archive_dir / "manifest.json").read_text(encoding="utf-8"))
for table in manifest["tables"]:
rows = json.loads((archive_dir / f"{table}.json").read_text(encoding="utf-8"))
if not rows:
continue
with conn.cursor() as cursor:
existing = set(_table_columns(cursor, table))
dropped = set(rows[0]) - existing
if dropped:
print(f" {table}: 警告——列 {sorted(dropped)} 已被 DROP,这些值只回 JSON 档案不回库")
columns = [col for col in rows[0] if col in existing]
placeholders = ", ".join(["%s"] * len(columns))
sql = (
f"INSERT IGNORE INTO {table} (" + ", ".join(f"`{c}`" for c in columns) + ")"
f" VALUES ({placeholders})"
)
cursor.executemany(sql, [[row.get(c) for c in columns] for row in rows])
print(f" {table}: 回灌 {cursor.rowcount}/{len(rows)} 行")
conn.commit()
return 0
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--before", default=DEFAULT_CUTOFF, help="cutoff,默认 2026-06-12 00:00:00")
parser.add_argument("--execute", action="store_true", help="真执行:归档→对账→删行→删列")
parser.add_argument("--restore", metavar="DIR", help="从归档目录回灌(回滚通道)")
args = parser.parse_args()
conn = ContentSupplyDbConfig.from_env(ROOT / ".env").connect()
try:
if args.restore:
return restore(conn, Path(args.restore))
if args.execute:
return execute(conn, args.before)
return dry_run(conn, args.before)
finally:
conn.close()
if __name__ == "__main__":
sys.exit(main())