| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- #!/usr/bin/env python3
- """
- 数据库迁移 v5:多租户版本字段 + resource 图片
- 本次变更:
- 1. 给 6 张实体表加 version 列(VARCHAR(32) DEFAULT 'v0'):
- knowledge, resource, requirement, capability, tool, strategy
- 2. 给 resource 表加 images 列(JSONB DEFAULT '[]')
- 注意:经 v4 踩坑后的做法——
- - 合并 `NOT NULL DEFAULT` 会挂;拆两步:先 ADD COLUMN DEFAULT,再 SET NOT NULL
- - SET NOT NULL 可能因 idle-in-tx 锁冲突失败;失败不致命,DEFAULT 对新行已生效
- - 跑 DDL 前先 kill idle-in-tx,防止 ALTER 被 AccessShareLock 阻塞
- 幂等:反复执行不破坏已有数据。
- """
- import os
- import time
- import psycopg2
- from psycopg2.extras import RealDictCursor
- from dotenv import load_dotenv
- _script_dir = os.path.dirname(os.path.abspath(__file__))
- _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..', '..'))
- load_dotenv(os.path.join(_project_root, '.env'))
- def log(msg):
- print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True)
- def get_connection():
- conn = psycopg2.connect(
- host=os.getenv('KNOWHUB_DB'),
- port=int(os.getenv('KNOWHUB_PORT', 5432)),
- user=os.getenv('KNOWHUB_USER'),
- password=os.getenv('KNOWHUB_PASSWORD'),
- database=os.getenv('KNOWHUB_DB_NAME'),
- connect_timeout=10
- )
- conn.autocommit = True
- return conn
- def column_exists(cursor, table, column):
- cursor.execute(
- "SELECT 1 FROM information_schema.columns WHERE table_name = %s AND column_name = %s",
- (table, column))
- return cursor.fetchone() is not None
- def is_not_null(cursor, table, column):
- cursor.execute(
- "SELECT is_nullable FROM information_schema.columns WHERE table_name=%s AND column_name=%s",
- (table, column))
- row = cursor.fetchone()
- return row and row['is_nullable'] == 'NO'
- def kill_idle_in_tx(cursor):
- """跑 DDL 前清掉所有 idle in transaction 会话(会阻塞 DDL 等锁)"""
- cursor.execute("""SELECT pid FROM pg_stat_activity
- WHERE state='idle in transaction' AND pid != pg_backend_pid()
- AND datname=current_database()""")
- pids = [r['pid'] for r in cursor.fetchall()]
- if pids:
- log(f" killing {len(pids)} idle-in-tx: {pids}")
- for p in pids:
- cursor.execute("SELECT pg_terminate_backend(%s)", (p,))
- time.sleep(1)
- return len(pids)
- # ─── 主流程 ───────────────────────────────────────────────────────────────────
- ENTITIES = ['knowledge', 'resource', 'requirement', 'capability', 'tool', 'strategy']
- def main():
- log("=" * 60)
- log("KnowHub 迁移 v5: version + resource.images")
- log("=" * 60)
- conn = get_connection()
- cursor = conn.cursor(cursor_factory=RealDictCursor)
- cursor.execute("SET statement_timeout = '30s'")
- # Step 0: 清理可能阻塞 DDL 的会话
- log("\n[0/3] 清理 idle-in-transaction 会话...")
- kill_idle_in_tx(cursor)
- # Step 1: 给每个实体表加 version 列
- log("\n[1/3] 添加 version 列(DEFAULT 'v0')...")
- for t in ENTITIES:
- if column_exists(cursor, t, 'version'):
- log(f" {t}.version 已存在,跳过")
- continue
- try:
- cursor.execute(f"ALTER TABLE {t} ADD COLUMN version VARCHAR(32) DEFAULT 'v0'")
- log(f" ✓ {t}.version 已添加")
- except Exception as e:
- log(f" ✗ {t}.version 失败: {type(e).__name__}: {str(e)[:150]}")
- # Step 2: 给 resource 加 images 列
- log("\n[2/3] resource.images(JSONB DEFAULT '[]')...")
- if column_exists(cursor, 'resource', 'images'):
- log(" resource.images 已存在,跳过")
- else:
- try:
- cursor.execute("ALTER TABLE resource ADD COLUMN images JSONB DEFAULT '[]'")
- log(" ✓ resource.images 已添加")
- except Exception as e:
- log(f" ✗ resource.images 失败: {type(e).__name__}: {str(e)[:150]}")
- # Step 3: 尝试给 version 列设 NOT NULL(非关键,失败可接受)
- log("\n[3/3] 尝试给 version 列加 NOT NULL(失败不致命)...")
- kill_idle_in_tx(cursor)
- for t in ENTITIES:
- if is_not_null(cursor, t, 'version'):
- log(f" {t}.version 已 NOT NULL")
- continue
- try:
- cursor.execute(f"ALTER TABLE {t} ALTER COLUMN version SET NOT NULL")
- log(f" ✓ {t}.version SET NOT NULL")
- except Exception as e:
- log(f" ⚠️ {t}.version SET NOT NULL 未完成: {type(e).__name__}: {str(e)[:120]}")
- # 最终验证
- log("\n" + "=" * 60)
- log("最终状态:")
- log("=" * 60)
- for t in ENTITIES:
- cursor.execute("""SELECT column_default, is_nullable FROM information_schema.columns
- WHERE table_name=%s AND column_name='version'""", (t,))
- row = cursor.fetchone()
- cursor.execute(f"SELECT COUNT(*) AS c FROM {t}")
- total = cursor.fetchone()['c']
- if row:
- log(f" {t}: version default={row['column_default']!r}, nullable={row['is_nullable']}, rows={total}")
- else:
- log(f" {t}: version MISSING, rows={total}")
- cursor.execute("""SELECT column_default FROM information_schema.columns
- WHERE table_name='resource' AND column_name='images'""")
- row = cursor.fetchone()
- log(f" resource.images: {'default=' + repr(row['column_default']) if row else 'MISSING'}")
- log("\n迁移完成")
- cursor.close()
- conn.close()
- if __name__ == '__main__':
- main()
|