#!/usr/bin/env python3 """ 数据库迁移 v5:多租户版本字段 + resource 图片 本次变更: 1. 给 6 张实体表加 version 列(VARCHAR(32) DEFAULT 'v0'): knowledge, resource, requirement, capability, tool, strategy 2. 给 resource 表加 images 列(JSONB DEFAULT '[]') 注意:经 v4 踩坑后的做法—— - 合并 `NOT NULL DEFAULT` 会挂;拆两步:先 ADD COLUMN DEFAULT,再 SET NOT NULL - SET NOT NULL 可能因 idle-in-tx 锁冲突失败;失败不致命,DEFAULT 对新行已生效 - 跑 DDL 前先 kill idle-in-tx,防止 ALTER 被 AccessShareLock 阻塞 幂等:反复执行不破坏已有数据。 """ import os import time import psycopg2 from psycopg2.extras import RealDictCursor from dotenv import load_dotenv _script_dir = os.path.dirname(os.path.abspath(__file__)) _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..', '..')) load_dotenv(os.path.join(_project_root, '.env')) def log(msg): print(f"[{time.strftime('%H:%M:%S')}] {msg}", flush=True) def get_connection(): conn = psycopg2.connect( host=os.getenv('KNOWHUB_DB'), port=int(os.getenv('KNOWHUB_PORT', 5432)), user=os.getenv('KNOWHUB_USER'), password=os.getenv('KNOWHUB_PASSWORD'), database=os.getenv('KNOWHUB_DB_NAME'), connect_timeout=10 ) conn.autocommit = True return conn def column_exists(cursor, table, column): cursor.execute( "SELECT 1 FROM information_schema.columns WHERE table_name = %s AND column_name = %s", (table, column)) return cursor.fetchone() is not None def is_not_null(cursor, table, column): cursor.execute( "SELECT is_nullable FROM information_schema.columns WHERE table_name=%s AND column_name=%s", (table, column)) row = cursor.fetchone() return row and row['is_nullable'] == 'NO' def kill_idle_in_tx(cursor): """跑 DDL 前清掉所有 idle in transaction 会话(会阻塞 DDL 等锁)""" cursor.execute("""SELECT pid FROM pg_stat_activity WHERE state='idle in transaction' AND pid != pg_backend_pid() AND datname=current_database()""") pids = [r['pid'] for r in cursor.fetchall()] if pids: log(f" killing {len(pids)} idle-in-tx: {pids}") for p in pids: cursor.execute("SELECT pg_terminate_backend(%s)", (p,)) time.sleep(1) return len(pids) # ─── 主流程 ─────────────────────────────────────────────────────────────────── ENTITIES = ['knowledge', 'resource', 'requirement', 'capability', 'tool', 'strategy'] def main(): log("=" * 60) log("KnowHub 迁移 v5: version + resource.images") log("=" * 60) conn = get_connection() cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute("SET statement_timeout = '30s'") # Step 0: 清理可能阻塞 DDL 的会话 log("\n[0/3] 清理 idle-in-transaction 会话...") kill_idle_in_tx(cursor) # Step 1: 给每个实体表加 version 列 log("\n[1/3] 添加 version 列(DEFAULT 'v0')...") for t in ENTITIES: if column_exists(cursor, t, 'version'): log(f" {t}.version 已存在,跳过") continue try: cursor.execute(f"ALTER TABLE {t} ADD COLUMN version VARCHAR(32) DEFAULT 'v0'") log(f" ✓ {t}.version 已添加") except Exception as e: log(f" ✗ {t}.version 失败: {type(e).__name__}: {str(e)[:150]}") # Step 2: 给 resource 加 images 列 log("\n[2/3] resource.images(JSONB DEFAULT '[]')...") if column_exists(cursor, 'resource', 'images'): log(" resource.images 已存在,跳过") else: try: cursor.execute("ALTER TABLE resource ADD COLUMN images JSONB DEFAULT '[]'") log(" ✓ resource.images 已添加") except Exception as e: log(f" ✗ resource.images 失败: {type(e).__name__}: {str(e)[:150]}") # Step 3: 尝试给 version 列设 NOT NULL(非关键,失败可接受) log("\n[3/3] 尝试给 version 列加 NOT NULL(失败不致命)...") kill_idle_in_tx(cursor) for t in ENTITIES: if is_not_null(cursor, t, 'version'): log(f" {t}.version 已 NOT NULL") continue try: cursor.execute(f"ALTER TABLE {t} ALTER COLUMN version SET NOT NULL") log(f" ✓ {t}.version SET NOT NULL") except Exception as e: log(f" ⚠️ {t}.version SET NOT NULL 未完成: {type(e).__name__}: {str(e)[:120]}") # 最终验证 log("\n" + "=" * 60) log("最终状态:") log("=" * 60) for t in ENTITIES: cursor.execute("""SELECT column_default, is_nullable FROM information_schema.columns WHERE table_name=%s AND column_name='version'""", (t,)) row = cursor.fetchone() cursor.execute(f"SELECT COUNT(*) AS c FROM {t}") total = cursor.fetchone()['c'] if row: log(f" {t}: version default={row['column_default']!r}, nullable={row['is_nullable']}, rows={total}") else: log(f" {t}: version MISSING, rows={total}") cursor.execute("""SELECT column_default FROM information_schema.columns WHERE table_name='resource' AND column_name='images'""") row = cursor.fetchone() log(f" resource.images: {'default=' + repr(row['column_default']) if row else 'MISSING'}") log("\n迁移完成") cursor.close() conn.close() if __name__ == '__main__': main()