migrate_v5_version_and_images.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. #!/usr/bin/env python3
  2. """
  3. 数据库迁移 v5:多租户版本字段 + resource 图片
  4. 本次变更:
  5. 1. 给 6 张实体表加 version 列(VARCHAR(32) DEFAULT 'v0'):
  6. knowledge, resource, requirement, capability, tool, strategy
  7. 2. 给 resource 表加 images 列(JSONB DEFAULT '[]')
  8. 注意:经 v4 踩坑后的做法——
  9. - 合并 `NOT NULL DEFAULT` 会挂;拆两步:先 ADD COLUMN DEFAULT,再 SET NOT NULL
  10. - SET NOT NULL 可能因 idle-in-tx 锁冲突失败;失败不致命,DEFAULT 对新行已生效
  11. - 跑 DDL 前先 kill idle-in-tx,防止 ALTER 被 AccessShareLock 阻塞
  12. 幂等:反复执行不破坏已有数据。
  13. """
  14. import os
  15. import time
  16. import psycopg2
  17. from psycopg2.extras import RealDictCursor
  18. from dotenv import load_dotenv
  19. _script_dir = os.path.dirname(os.path.abspath(__file__))
  20. _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..', '..'))
  21. load_dotenv(os.path.join(_project_root, '.env'))
  22. def log(msg):
  23. print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)
  24. def get_connection():
  25. conn = psycopg2.connect(
  26. host=os.getenv('KNOWHUB_DB'),
  27. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  28. user=os.getenv('KNOWHUB_USER'),
  29. password=os.getenv('KNOWHUB_PASSWORD'),
  30. database=os.getenv('KNOWHUB_DB_NAME'),
  31. connect_timeout=10
  32. )
  33. conn.autocommit = True
  34. return conn
  35. def column_exists(cursor, table, column):
  36. cursor.execute(
  37. "SELECT 1 FROM information_schema.columns WHERE table_name = %s AND column_name = %s",
  38. (table, column))
  39. return cursor.fetchone() is not None
  40. def is_not_null(cursor, table, column):
  41. cursor.execute(
  42. "SELECT is_nullable FROM information_schema.columns WHERE table_name=%s AND column_name=%s",
  43. (table, column))
  44. row = cursor.fetchone()
  45. return row and row['is_nullable'] == 'NO'
  46. def kill_idle_in_tx(cursor):
  47. """跑 DDL 前清掉所有 idle in transaction 会话(会阻塞 DDL 等锁)"""
  48. cursor.execute("""SELECT pid FROM pg_stat_activity
  49. WHERE state='idle in transaction' AND pid != pg_backend_pid()
  50. AND datname=current_database()""")
  51. pids = [r['pid'] for r in cursor.fetchall()]
  52. if pids:
  53. log(f" killing {len(pids)} idle-in-tx: {pids}")
  54. for p in pids:
  55. cursor.execute("SELECT pg_terminate_backend(%s)", (p,))
  56. time.sleep(1)
  57. return len(pids)
  58. # ─── 主流程 ───────────────────────────────────────────────────────────────────
  59. ENTITIES = ['knowledge', 'resource', 'requirement', 'capability', 'tool', 'strategy']
  60. def main():
  61. log("=" * 60)
  62. log("KnowHub 迁移 v5: version + resource.images")
  63. log("=" * 60)
  64. conn = get_connection()
  65. cursor = conn.cursor(cursor_factory=RealDictCursor)
  66. cursor.execute("SET statement_timeout = '30s'")
  67. # Step 0: 清理可能阻塞 DDL 的会话
  68. log("\n[0/3] 清理 idle-in-transaction 会话...")
  69. kill_idle_in_tx(cursor)
  70. # Step 1: 给每个实体表加 version 列
  71. log("\n[1/3] 添加 version 列(DEFAULT 'v0')...")
  72. for t in ENTITIES:
  73. if column_exists(cursor, t, 'version'):
  74. log(f" {t}.version 已存在,跳过")
  75. continue
  76. try:
  77. cursor.execute(f"ALTER TABLE {t} ADD COLUMN version VARCHAR(32) DEFAULT 'v0'")
  78. log(f" ✓ {t}.version 已添加")
  79. except Exception as e:
  80. log(f" ✗ {t}.version 失败: {type(e).__name__}: {str(e)[:150]}")
  81. # Step 2: 给 resource 加 images 列
  82. log("\n[2/3] resource.images(JSONB DEFAULT '[]')...")
  83. if column_exists(cursor, 'resource', 'images'):
  84. log(" resource.images 已存在,跳过")
  85. else:
  86. try:
  87. cursor.execute("ALTER TABLE resource ADD COLUMN images JSONB DEFAULT '[]'")
  88. log(" ✓ resource.images 已添加")
  89. except Exception as e:
  90. log(f" ✗ resource.images 失败: {type(e).__name__}: {str(e)[:150]}")
  91. # Step 3: 尝试给 version 列设 NOT NULL(非关键,失败可接受)
  92. log("\n[3/3] 尝试给 version 列加 NOT NULL(失败不致命)...")
  93. kill_idle_in_tx(cursor)
  94. for t in ENTITIES:
  95. if is_not_null(cursor, t, 'version'):
  96. log(f" {t}.version 已 NOT NULL")
  97. continue
  98. try:
  99. cursor.execute(f"ALTER TABLE {t} ALTER COLUMN version SET NOT NULL")
  100. log(f" ✓ {t}.version SET NOT NULL")
  101. except Exception as e:
  102. log(f" ⚠️ {t}.version SET NOT NULL 未完成: {type(e).__name__}: {str(e)[:120]}")
  103. # 最终验证
  104. log("\n" + "=" * 60)
  105. log("最终状态:")
  106. log("=" * 60)
  107. for t in ENTITIES:
  108. cursor.execute("""SELECT column_default, is_nullable FROM information_schema.columns
  109. WHERE table_name=%s AND column_name='version'""", (t,))
  110. row = cursor.fetchone()
  111. cursor.execute(f"SELECT COUNT(*) AS c FROM {t}")
  112. total = cursor.fetchone()['c']
  113. if row:
  114. log(f" {t}: version default={row['column_default']!r}, nullable={row['is_nullable']}, rows={total}")
  115. else:
  116. log(f" {t}: version MISSING, rows={total}")
  117. cursor.execute("""SELECT column_default FROM information_schema.columns
  118. WHERE table_name='resource' AND column_name='images'""")
  119. row = cursor.fetchone()
  120. log(f" resource.images: {'default=' + repr(row['column_default']) if row else 'MISSING'}")
  121. log("\n迁移完成")
  122. cursor.close()
  123. conn.close()
  124. if __name__ == '__main__':
  125. main()