"""归档后删 content_agent_discovered_content_items.content_audience_profile 列。 该列是已废弃的「内容画像」打分维度的落点,V3 行里只是 pattern_match_result 里 fit_senior_50plus 的冗余镜像(2026-06-12 维度清理 C1/C2 已删代码侧)。 照 B4 做法:先把列值归档到 archive/,再 ALTER TABLE DROP COLUMN。 python scripts/drop_content_audience_profile_column.py # dry-run(默认) python scripts/drop_content_audience_profile_column.py --execute # 归档→对账→删列 python scripts/drop_content_audience_profile_column.py --restore # 回滚:加回列+回灌值 """ from __future__ import annotations import argparse import datetime as dt import json import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from content_agent.integrations.database_runtime import ContentSupplyDbConfig TABLE = "content_agent_discovered_content_items" COLUMN = "content_audience_profile" ARCHIVE_ROOT = ROOT / "archive" / "v2_db_archive" def _column_exists(cursor) -> bool: cursor.execute(f"SHOW COLUMNS FROM {TABLE} LIKE %s", (COLUMN,)) return cursor.fetchone() is not None def dry_run(conn) -> int: with conn.cursor() as c: if not _column_exists(c): print(f"{TABLE}.{COLUMN} 已不存在,无需删除(幂等)。") return 0 c.execute(f"SELECT COUNT(*) AS n FROM {TABLE}") total = c.fetchone()["n"] c.execute( f"SELECT {COLUMN} AS v, COUNT(*) AS n FROM {TABLE} GROUP BY {COLUMN}" ) print(f"{TABLE}.{COLUMN}(json,nullable)共 {total} 行,值分布:") for r in c.fetchall(): print(f" {str(r['v'])[:80]} × {r['n']}") print("\ndry-run:未做任何写入。--execute 才会归档并删列。") return 0 def execute(conn) -> int: with conn.cursor() as c: if not _column_exists(c): print(f"{TABLE}.{COLUMN} 已不存在,跳过(幂等)。") return 0 c.execute(f"SELECT COUNT(*) AS n FROM {TABLE}") total = c.fetchone()["n"] c.execute(f"SELECT id, {COLUMN} FROM {TABLE}") rows = c.fetchall() if len(rows) != total: print(f"FAIL: SELECT {len(rows)} != COUNT {total},中止,未删列。") return 1 stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ") out_dir = ARCHIVE_ROOT / stamp out_dir.mkdir(parents=True, exist_ok=True) archive_file = out_dir / f"{TABLE}.{COLUMN}.json" archive_file.write_text( json.dumps( [{"id": r["id"], COLUMN: r[COLUMN]} for r in rows], ensure_ascii=False, indent=1, ), encoding="utf-8", ) print(f"归档 {len(rows)} 行 {COLUMN} 值 → {archive_file.relative_to(ROOT)}") c.execute(f"ALTER TABLE {TABLE} DROP COLUMN {COLUMN}") if _column_exists(c): conn.rollback() print("FAIL: DROP COLUMN 后列仍存在,已回滚。") return 1 conn.commit() print(f"已删列 {TABLE}.{COLUMN} ✓(回滚:--restore {archive_file.relative_to(ROOT)})") return 0 def restore(conn, archive_file: Path) -> int: rows = json.loads(archive_file.read_text(encoding="utf-8")) with conn.cursor() as c: if not _column_exists(c): c.execute(f"ALTER TABLE {TABLE} ADD COLUMN {COLUMN} JSON NULL") print(f"已加回列 {TABLE}.{COLUMN}") for r in rows: value = r[COLUMN] c.execute( f"UPDATE {TABLE} SET {COLUMN} = %s WHERE id = %s", (json.dumps(value, ensure_ascii=False) if value is not None else None, r["id"]), ) conn.commit() print(f"已回灌 {len(rows)} 行 {COLUMN} 值") return 0 def main() -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--execute", action="store_true", help="真执行:归档→删列") parser.add_argument("--restore", metavar="FILE", help="从归档 JSON 回滚(加回列+回灌)") args = parser.parse_args() conn = ContentSupplyDbConfig.from_env(ROOT / ".env").connect() try: if args.restore: return restore(conn, Path(args.restore)) if args.execute: return execute(conn) return dry_run(conn) finally: conn.close() if __name__ == "__main__": sys.exit(main())