drop_content_audience_profile_column.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. """归档后删 content_agent_discovered_content_items.content_audience_profile 列。
  2. 该列是已废弃的「内容画像」打分维度的落点,V3 行里只是 pattern_match_result
  3. 里 fit_senior_50plus 的冗余镜像(2026-06-12 维度清理 C1/C2 已删代码侧)。
  4. 照 B4 做法:先把列值归档到 archive/,再 ALTER TABLE DROP COLUMN。
  5. python scripts/drop_content_audience_profile_column.py # dry-run(默认)
  6. python scripts/drop_content_audience_profile_column.py --execute # 归档→对账→删列
  7. python scripts/drop_content_audience_profile_column.py --restore <file> # 回滚:加回列+回灌值
  8. """
  9. from __future__ import annotations
  10. import argparse
  11. import datetime as dt
  12. import json
  13. import sys
  14. from pathlib import Path
  15. ROOT = Path(__file__).resolve().parents[1]
  16. if str(ROOT) not in sys.path:
  17. sys.path.insert(0, str(ROOT))
  18. from content_agent.integrations.database_runtime import ContentSupplyDbConfig
  19. TABLE = "content_agent_discovered_content_items"
  20. COLUMN = "content_audience_profile"
  21. ARCHIVE_ROOT = ROOT / "archive" / "v2_db_archive"
  22. def _column_exists(cursor) -> bool:
  23. cursor.execute(f"SHOW COLUMNS FROM {TABLE} LIKE %s", (COLUMN,))
  24. return cursor.fetchone() is not None
  25. def dry_run(conn) -> int:
  26. with conn.cursor() as c:
  27. if not _column_exists(c):
  28. print(f"{TABLE}.{COLUMN} 已不存在,无需删除(幂等)。")
  29. return 0
  30. c.execute(f"SELECT COUNT(*) AS n FROM {TABLE}")
  31. total = c.fetchone()["n"]
  32. c.execute(
  33. f"SELECT {COLUMN} AS v, COUNT(*) AS n FROM {TABLE} GROUP BY {COLUMN}"
  34. )
  35. print(f"{TABLE}.{COLUMN}(json,nullable)共 {total} 行,值分布:")
  36. for r in c.fetchall():
  37. print(f" {str(r['v'])[:80]} × {r['n']}")
  38. print("\ndry-run:未做任何写入。--execute 才会归档并删列。")
  39. return 0
  40. def execute(conn) -> int:
  41. with conn.cursor() as c:
  42. if not _column_exists(c):
  43. print(f"{TABLE}.{COLUMN} 已不存在,跳过(幂等)。")
  44. return 0
  45. c.execute(f"SELECT COUNT(*) AS n FROM {TABLE}")
  46. total = c.fetchone()["n"]
  47. c.execute(f"SELECT id, {COLUMN} FROM {TABLE}")
  48. rows = c.fetchall()
  49. if len(rows) != total:
  50. print(f"FAIL: SELECT {len(rows)} != COUNT {total},中止,未删列。")
  51. return 1
  52. stamp = dt.datetime.now(dt.timezone.utc).strftime("%Y%m%dT%H%M%SZ")
  53. out_dir = ARCHIVE_ROOT / stamp
  54. out_dir.mkdir(parents=True, exist_ok=True)
  55. archive_file = out_dir / f"{TABLE}.{COLUMN}.json"
  56. archive_file.write_text(
  57. json.dumps(
  58. [{"id": r["id"], COLUMN: r[COLUMN]} for r in rows],
  59. ensure_ascii=False,
  60. indent=1,
  61. ),
  62. encoding="utf-8",
  63. )
  64. print(f"归档 {len(rows)} 行 {COLUMN} 值 → {archive_file.relative_to(ROOT)}")
  65. c.execute(f"ALTER TABLE {TABLE} DROP COLUMN {COLUMN}")
  66. if _column_exists(c):
  67. conn.rollback()
  68. print("FAIL: DROP COLUMN 后列仍存在,已回滚。")
  69. return 1
  70. conn.commit()
  71. print(f"已删列 {TABLE}.{COLUMN} ✓(回滚:--restore {archive_file.relative_to(ROOT)})")
  72. return 0
  73. def restore(conn, archive_file: Path) -> int:
  74. rows = json.loads(archive_file.read_text(encoding="utf-8"))
  75. with conn.cursor() as c:
  76. if not _column_exists(c):
  77. c.execute(f"ALTER TABLE {TABLE} ADD COLUMN {COLUMN} JSON NULL")
  78. print(f"已加回列 {TABLE}.{COLUMN}")
  79. for r in rows:
  80. value = r[COLUMN]
  81. c.execute(
  82. f"UPDATE {TABLE} SET {COLUMN} = %s WHERE id = %s",
  83. (json.dumps(value, ensure_ascii=False) if value is not None else None, r["id"]),
  84. )
  85. conn.commit()
  86. print(f"已回灌 {len(rows)} 行 {COLUMN} 值")
  87. return 0
  88. def main() -> int:
  89. parser = argparse.ArgumentParser(description=__doc__)
  90. parser.add_argument("--execute", action="store_true", help="真执行:归档→删列")
  91. parser.add_argument("--restore", metavar="FILE", help="从归档 JSON 回滚(加回列+回灌)")
  92. args = parser.parse_args()
  93. conn = ContentSupplyDbConfig.from_env(ROOT / ".env").connect()
  94. try:
  95. if args.restore:
  96. return restore(conn, Path(args.restore))
  97. if args.execute:
  98. return execute(conn)
  99. return dry_run(conn)
  100. finally:
  101. conn.close()
  102. if __name__ == "__main__":
  103. sys.exit(main())