#!/usr/bin/env python3 """ 数据库迁移 v3:JSONB 软关联 → 关联表(junction tables) 兼容当前混合状态: - tool_table 已被 RENAME 为 tool(之前的 DDL 部分回滚导致) - atomic_capability、requirement_table 仍为旧名 - 4 张关联表已创建但为空 步骤: 1. 创建 4 张关联表(幂等,已存在则跳过) 2. 从现有表的 JSONB 字段迁移数据到关联表 3. 用 CREATE TABLE AS SELECT 创建 capability 和 requirement(变相重命名) 4. 删除 knowledge 表的 JSONB 关联字段 5. 删除旧表 """ import os import json import psycopg2 from psycopg2.extras import RealDictCursor from dotenv import load_dotenv _script_dir = os.path.dirname(os.path.abspath(__file__)) _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..', '..')) load_dotenv(os.path.join(_project_root, '.env')) def get_connection(): conn = psycopg2.connect( host=os.getenv('KNOWHUB_DB'), port=int(os.getenv('KNOWHUB_PORT', 5432)), user=os.getenv('KNOWHUB_USER'), password=os.getenv('KNOWHUB_PASSWORD'), database=os.getenv('KNOWHUB_DB_NAME'), connect_timeout=10 ) conn.autocommit = True return conn def table_exists(cursor, name): cursor.execute("SELECT 1 FROM information_schema.tables WHERE table_name = %s", (name,)) return cursor.fetchone() is not None def column_exists(cursor, table, column): cursor.execute( "SELECT 1 FROM information_schema.columns WHERE table_name = %s AND column_name = %s", (table, column)) return cursor.fetchone() is not None def resolve_table(cursor, new_name, old_name): """找到实际可用的表名(处理混合重命名状态)""" if table_exists(cursor, new_name): return new_name if table_exists(cursor, old_name): return old_name raise RuntimeError(f"Neither {new_name} nor {old_name} exists!") # ─── Step 1: 创建关联表 ────────────────────────────────────────────────────── CREATE_JUNCTION_TABLES = [ """ CREATE TABLE IF NOT EXISTS requirement_capability ( requirement_id VARCHAR NOT NULL, capability_id VARCHAR NOT NULL, PRIMARY KEY (requirement_id, capability_id) ) """, """ CREATE TABLE IF NOT EXISTS capability_tool ( capability_id VARCHAR NOT NULL, tool_id VARCHAR NOT NULL, description TEXT DEFAULT '', PRIMARY KEY (capability_id, tool_id) ) """, """ CREATE TABLE IF NOT EXISTS capability_knowledge ( capability_id VARCHAR NOT NULL, knowledge_id VARCHAR NOT NULL, PRIMARY KEY (capability_id, knowledge_id) ) """, """ CREATE TABLE IF NOT EXISTS tool_knowledge ( tool_id VARCHAR NOT NULL, knowledge_id VARCHAR NOT NULL, PRIMARY KEY (tool_id, knowledge_id) ) """, ] # ─── Step 2: 数据迁移 ──────────────────────────────────────────────────────── def _parse_json(val): if val is None: return [] if isinstance(val, (list, dict)): return val try: return json.loads(val) except (json.JSONDecodeError, TypeError): return [] def _insert_junction(cursor, table, col_a, col_b, val_a, val_b, extra_cols=None): if extra_cols: cols = f"{col_a}, {col_b}, {', '.join(extra_cols.keys())}" placeholders = ', '.join(['%s'] * (2 + len(extra_cols))) values = [val_a, val_b] + list(extra_cols.values()) else: cols = f"{col_a}, {col_b}" placeholders = '%s, %s' values = [val_a, val_b] cursor.execute( f"INSERT INTO {table} ({cols}) VALUES ({placeholders}) ON CONFLICT DO NOTHING", values ) def migrate_data(cursor, tool_tbl, cap_tbl, req_tbl): """从 JSONB 字段迁移数据到关联表。使用实际表名。""" skipped = {'dangling': [], 'implements_unmatched': []} # 预加载有效 ID print(" 加载 ID...", flush=True) cursor.execute(f"SELECT id FROM {req_tbl}") valid_reqs = {r['id'] for r in cursor.fetchall()} cursor.execute(f"SELECT id FROM {cap_tbl}") valid_caps = {r['id'] for r in cursor.fetchall()} cursor.execute(f"SELECT id, name FROM {tool_tbl}") tool_rows = cursor.fetchall() valid_tools = {r['id'] for r in tool_rows} cursor.execute("SELECT id FROM knowledge") valid_knowledge = {r['id'] for r in cursor.fetchall()} print(f" OK: {len(valid_reqs)} reqs, {len(valid_caps)} caps, {len(valid_tools)} tools, {len(valid_knowledge)} knowledge", flush=True) tool_name_to_id = {} for r in tool_rows: if r['name']: tool_name_to_id[r['name'].lower().strip()] = r['id'] # ── requirement_capability ── print(" requirement_capability...", flush=True) cursor.execute(f"SELECT id, atomics FROM {req_tbl}") for row in cursor.fetchall(): for cap_id in _parse_json(row['atomics']): if cap_id in valid_caps: _insert_junction(cursor, 'requirement_capability', 'requirement_id', 'capability_id', row['id'], cap_id) else: skipped['dangling'].append(f"req_cap: req={row['id']} → cap={cap_id}") cursor.execute(f"SELECT id, requirements FROM {cap_tbl}") for row in cursor.fetchall(): for req_id in _parse_json(row['requirements']): if req_id in valid_reqs: _insert_junction(cursor, 'requirement_capability', 'requirement_id', 'capability_id', req_id, row['id']) else: skipped['dangling'].append(f"req_cap: req={req_id} → cap={row['id']}") # ── capability_tool ── print(" capability_tool...", flush=True) cursor.execute(f"SELECT id, tools, implements FROM {cap_tbl}") for row in cursor.fetchall(): tools_list = _parse_json(row['tools']) implements_dict = _parse_json(row['implements']) if not isinstance(implements_dict, dict): implements_dict = {} impl_by_tool_id = {} for tool_name, desc in implements_dict.items(): key = tool_name.lower().strip() matched_id = tool_name_to_id.get(key) if not matched_id: for stored_name, stored_id in tool_name_to_id.items(): if key in stored_name or stored_name in key: matched_id = stored_id break if matched_id: impl_by_tool_id[matched_id] = desc else: skipped['implements_unmatched'].append( f"cap={row['id']}: {tool_name} = {desc[:80]}") for tool_id in tools_list: if tool_id in valid_tools: desc = impl_by_tool_id.pop(tool_id, '') _insert_junction(cursor, 'capability_tool', 'capability_id', 'tool_id', row['id'], tool_id, extra_cols={'description': desc}) else: skipped['dangling'].append(f"cap_tool: cap={row['id']} → tool={tool_id}") for tool_id, desc in impl_by_tool_id.items(): if tool_id in valid_tools: _insert_junction(cursor, 'capability_tool', 'capability_id', 'tool_id', row['id'], tool_id, extra_cols={'description': desc}) # 反向:tool.capabilities(如果列还在) if column_exists(cursor, tool_tbl, 'capabilities'): cursor.execute(f"SELECT id, capabilities FROM {tool_tbl}") for row in cursor.fetchall(): for cap_id in _parse_json(row['capabilities']): if cap_id in valid_caps: _insert_junction(cursor, 'capability_tool', 'capability_id', 'tool_id', cap_id, row['id']) else: skipped['dangling'].append(f"cap_tool: cap={cap_id} → tool={row['id']}") else: print(" (tool.capabilities 列已丢失,仅从 capability 侧迁移)", flush=True) # ── capability_knowledge ── print(" capability_knowledge...", flush=True) cursor.execute(f"SELECT id, source_knowledge FROM {cap_tbl}") for row in cursor.fetchall(): for kid in _parse_json(row['source_knowledge']): if kid in valid_knowledge: _insert_junction(cursor, 'capability_knowledge', 'capability_id', 'knowledge_id', row['id'], kid) else: skipped['dangling'].append(f"cap_know: cap={row['id']} → k={kid}") cursor.execute("SELECT id, support_capability FROM knowledge") for row in cursor.fetchall(): for cap_id in _parse_json(row['support_capability']): if cap_id in valid_caps: _insert_junction(cursor, 'capability_knowledge', 'capability_id', 'knowledge_id', cap_id, row['id']) else: skipped['dangling'].append(f"cap_know: cap={cap_id} → k={row['id']}") # ── tool_knowledge ── print(" tool_knowledge...", flush=True) # 正向:tool.*_knowledge(如果列还在) if column_exists(cursor, tool_tbl, 'tool_knowledge'): cursor.execute(f"SELECT id, tool_knowledge, case_knowledge, process_knowledge FROM {tool_tbl}") for row in cursor.fetchall(): all_kids = set() for field in ('tool_knowledge', 'case_knowledge', 'process_knowledge'): all_kids.update(_parse_json(row[field])) for kid in all_kids: if kid in valid_knowledge: _insert_junction(cursor, 'tool_knowledge', 'tool_id', 'knowledge_id', row['id'], kid) else: skipped['dangling'].append(f"tool_know: tool={row['id']} → k={kid}") else: print(" (tool.*_knowledge 列已丢失,仅从 knowledge 侧迁移)", flush=True) # 反向:knowledge.tools cursor.execute("SELECT id, tools FROM knowledge") for row in cursor.fetchall(): for tool_id in _parse_json(row['tools']): if tool_id in valid_tools: _insert_junction(cursor, 'tool_knowledge', 'tool_id', 'knowledge_id', tool_id, row['id']) else: skipped['dangling'].append(f"tool_know: tool={tool_id} → k={row['id']}") return skipped # ─── 主流程 ─────────────────────────────────────────────────────────────────── def main(): print("=" * 60) print("KnowHub 数据库迁移 v3: JSONB 软关联 → 关联表") print("=" * 60) conn = get_connection() cursor = conn.cursor(cursor_factory=RealDictCursor) # 探测实际表名(处理混合重命名状态) print("\n[0] 探测表名...") tool_tbl = resolve_table(cursor, 'tool', 'tool_table') cap_tbl = resolve_table(cursor, 'capability', 'atomic_capability') req_tbl = resolve_table(cursor, 'requirement', 'requirement_table') print(f" tool: {tool_tbl}") print(f" capability: {cap_tbl}") print(f" requirement: {req_tbl}") # Step 1: 创建关联表 print("\n[1/5] 创建关联表...") for sql in CREATE_JUNCTION_TABLES: cursor.execute(sql) print(" OK") # Step 2: 迁移 JSONB 数据 print("\n[2/5] 迁移 JSONB 数据到关联表...") skipped = migrate_data(cursor, tool_tbl, cap_tbl, req_tbl) if skipped['dangling']: print(f"\n [WARN] 跳过悬空引用 {len(skipped['dangling'])} 条:") for s in skipped['dangling'][:30]: print(f" - {s}") if len(skipped['dangling']) > 30: print(f" ... 还有 {len(skipped['dangling']) - 30} 条") if skipped['implements_unmatched']: print(f"\n [WARN] implements 未匹配 {len(skipped['implements_unmatched'])} 条:") for s in skipped['implements_unmatched']: print(f" - {s}") print("\n 关联表行数:") for t in ('requirement_capability', 'capability_tool', 'capability_knowledge', 'tool_knowledge'): cursor.execute(f"SELECT COUNT(*) as count FROM {t}") print(f" {t}: {cursor.fetchone()['count']}") # Step 3: 创建新表(对需要重命名的表用 CREATE TABLE AS SELECT) print("\n[3/5] 创建新表...") # tool 已经是新名了,只需删除 JSONB 关联列 if tool_tbl == 'tool': print(" tool: 已是新名,删除 JSONB 列...") for col in ('capabilities', 'tool_knowledge', 'case_knowledge', 'process_knowledge'): if column_exists(cursor, 'tool', col): cursor.execute(f"ALTER TABLE tool DROP COLUMN {col}") print(f" DROP tool.{col}") else: # tool_table → tool via copy if not table_exists(cursor, 'tool'): cursor.execute(f""" CREATE TABLE tool AS SELECT id, name, version, introduction, tutorial, input, output, updated_time, status, embedding, implemented_tool_ids FROM tool_table """) cursor.execute("ALTER TABLE tool ADD PRIMARY KEY (id)") cursor.execute("SELECT COUNT(*) as count FROM tool") print(f" tool_table → tool: {cursor.fetchone()['count']} rows") else: print(" tool: 已存在,跳过") # atomic_capability → capability if cap_tbl == 'capability': print(" capability: 已是新名,删除 JSONB 列...") for col in ('requirements', 'implements', 'tools', 'source_knowledge'): if column_exists(cursor, 'capability', col): cursor.execute(f"ALTER TABLE capability DROP COLUMN {col}") print(f" DROP capability.{col}") else: if not table_exists(cursor, 'capability'): cursor.execute(f""" CREATE TABLE capability AS SELECT id, name, criterion, description, embedding FROM atomic_capability """) cursor.execute("ALTER TABLE capability ADD PRIMARY KEY (id)") cursor.execute("SELECT COUNT(*) as count FROM capability") print(f" atomic_capability → capability: {cursor.fetchone()['count']} rows") else: print(" capability: 已存在,跳过") # requirement_table → requirement if req_tbl == 'requirement': print(" requirement: 已是新名,删除 JSONB 列...") for col in ('atomics',): if column_exists(cursor, 'requirement', col): cursor.execute(f"ALTER TABLE requirement DROP COLUMN {col}") print(f" DROP requirement.{col}") else: if not table_exists(cursor, 'requirement'): cursor.execute(f""" CREATE TABLE requirement AS SELECT id, description, source_nodes, status, match_result, embedding FROM requirement_table """) cursor.execute("ALTER TABLE requirement ADD PRIMARY KEY (id)") cursor.execute("SELECT COUNT(*) as count FROM requirement") print(f" requirement_table → requirement: {cursor.fetchone()['count']} rows") else: print(" requirement: 已存在,跳过") # Step 4: 删除 knowledge 的 JSONB 关联字段 print("\n[4/5] 删除 knowledge 表的 JSONB 关联字段...") for col in ('support_capability', 'tools'): if column_exists(cursor, 'knowledge', col): cursor.execute(f"ALTER TABLE knowledge DROP COLUMN {col}") print(f" DROP knowledge.{col}") else: print(f" knowledge.{col} 已不存在") # Step 5: 删除旧表 print("\n[5/5] 删除旧表...") for old_name, new_name in [('tool_table', 'tool'), ('atomic_capability', 'capability'), ('requirement_table', 'requirement')]: if old_name == new_name: continue if table_exists(cursor, old_name) and table_exists(cursor, new_name): cursor.execute(f"DROP TABLE {old_name}") print(f" DROP {old_name}") elif not table_exists(cursor, old_name): print(f" {old_name} 已不存在") else: print(f" [!] {new_name} 不存在,保留 {old_name}") # 最终验证 print("\n" + "=" * 60) print("最终表结构:") print("=" * 60) for t in ['knowledge', 'tool', 'capability', 'requirement', 'resources', 'requirement_capability', 'capability_tool', 'capability_knowledge', 'tool_knowledge']: try: cursor.execute(f""" SELECT column_name FROM information_schema.columns WHERE table_name = %s ORDER BY ordinal_position """, (t,)) cols = [r['column_name'] for r in cursor.fetchall()] cursor.execute(f"SELECT COUNT(*) as count FROM {t}") count = cursor.fetchone()['count'] print(f"\n {t} ({count} rows)") print(f" {', '.join(cols)}") except Exception as e: print(f"\n {t}: ERROR - {e}") print("\n" + "=" * 60) print("迁移成功!") print("=" * 60) cursor.close() conn.close() if __name__ == '__main__': main()