| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- """归档后删 content_agent_discovered_content_items.content_audience_profile 列。
- 该列是已废弃的「内容画像」打分维度的落点,V3 行里只是 pattern_match_result
- 里 fit_senior_50plus 的冗余镜像(2026-06-12 维度清理 C1/C2 已删代码侧)。
- 照 B4 做法:先把列值归档到 archive/,再 ALTER TABLE DROP COLUMN。
- python scripts/drop_content_audience_profile_column.py # dry-run(默认)
- python scripts/drop_content_audience_profile_column.py --execute # 归档→对账→删列
- python scripts/drop_content_audience_profile_column.py --restore <file> # 回滚:加回列+回灌值
- """
- from __future__ import annotations
- import argparse
- import datetime as dt
- import json
- import sys
- from pathlib import Path
- ROOT = Path(__file__).resolve().parents[1]
- if str(ROOT) not in sys.path:
- sys.path.insert(0, str(ROOT))
- from content_agent.integrations.database_runtime import ContentSupplyDbConfig
- TABLE = "content_agent_discovered_content_items"
- COLUMN = "content_audience_profile"
- ARCHIVE_ROOT = ROOT / "archive" / "v2_db_archive"
- def _column_exists(cursor) -> bool:
- cursor.execute(f"SHOW COLUMNS FROM {TABLE} LIKE %s", (COLUMN,))
- return cursor.fetchone() is not None
- def dry_run(conn) -> int:
- with conn.cursor() as c:
- if not _column_exists(c):
- print(f"{TABLE}.{COLUMN} 已不存在,无需删除(幂等)。")
- return 0
- c.execute(f"SELECT COUNT(*) AS n FROM {TABLE}")
- total = c.fetchone()["n"]
- c.execute(
- f"SELECT {COLUMN} AS v, COUNT(*) AS n FROM {TABLE} GROUP BY {COLUMN}"
- )
- print(f"{TABLE}.{COLUMN}(json,nullable)共 {total} 行,值分布:")
- for r in c.fetchall():
- print(f" {str(r['v'])[:80]} × {r['n']}")
- print("\ndry-run:未做任何写入。--execute 才会归档并删列。")
- return 0
- def execute(conn) -> int:
- with conn.cursor() as c:
- if not _column_exists(c):
- print(f"{TABLE}.{COLUMN} 已不存在,跳过(幂等)。")
- return 0
- c.execute(f"SELECT COUNT(*) AS n FROM {TABLE}")
- total = c.fetchone()["n"]
- c.execute(f"SELECT id, {COLUMN} FROM {TABLE}")
- rows = c.fetchall()
- if len(rows) != total:
- print(f"FAIL: SELECT {len(rows)} != COUNT {total},中止,未删列。")
- return 1
- stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
- out_dir = ARCHIVE_ROOT / stamp
- out_dir.mkdir(parents=True, exist_ok=True)
- archive_file = out_dir / f"{TABLE}.{COLUMN}.json"
- archive_file.write_text(
- json.dumps(
- [{"id": r["id"], COLUMN: r[COLUMN]} for r in rows],
- ensure_ascii=False,
- indent=1,
- ),
- encoding="utf-8",
- )
- print(f"归档 {len(rows)} 行 {COLUMN} 值 → {archive_file.relative_to(ROOT)}")
- c.execute(f"ALTER TABLE {TABLE} DROP COLUMN {COLUMN}")
- if _column_exists(c):
- conn.rollback()
- print("FAIL: DROP COLUMN 后列仍存在,已回滚。")
- return 1
- conn.commit()
- print(f"已删列 {TABLE}.{COLUMN} ✓(回滚:--restore {archive_file.relative_to(ROOT)})")
- return 0
- def restore(conn, archive_file: Path) -> int:
- rows = json.loads(archive_file.read_text(encoding="utf-8"))
- with conn.cursor() as c:
- if not _column_exists(c):
- c.execute(f"ALTER TABLE {TABLE} ADD COLUMN {COLUMN} JSON NULL")
- print(f"已加回列 {TABLE}.{COLUMN}")
- for r in rows:
- value = r[COLUMN]
- c.execute(
- f"UPDATE {TABLE} SET {COLUMN} = %s WHERE id = %s",
- (json.dumps(value, ensure_ascii=False) if value is not None else None, r["id"]),
- )
- conn.commit()
- print(f"已回灌 {len(rows)} 行 {COLUMN} 值")
- return 0
- def main() -> int:
- parser = argparse.ArgumentParser(description=__doc__)
- parser.add_argument("--execute", action="store_true", help="真执行:归档→删列")
- parser.add_argument("--restore", metavar="FILE", help="从归档 JSON 回滚(加回列+回灌)")
- args = parser.parse_args()
- conn = ContentSupplyDbConfig.from_env(ROOT / ".env").connect()
- try:
- if args.restore:
- return restore(conn, Path(args.restore))
- if args.execute:
- return execute(conn)
- return dry_run(conn)
- finally:
- conn.close()
- if __name__ == "__main__":
- sys.exit(main())
|