#!/usr/bin/env python3 """ 迁移到新数据库 knowhub:从 knowledge_hub 读数据,写入 knowhub。 13 张表:5 实体 + 8 关联。 只使用 CREATE TABLE / INSERT / DROP TABLE,不执行 ALTER。 每步幂等,可安全重跑。 """ import os import json import psycopg2 from psycopg2.extras import RealDictCursor, Json from dotenv import load_dotenv _script_dir = os.path.dirname(os.path.abspath(__file__)) _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..', '..')) load_dotenv(os.path.join(_project_root, '.env')) def connect(db_name): conn = psycopg2.connect( host=os.getenv('KNOWHUB_DB'), port=int(os.getenv('KNOWHUB_PORT', 5432)), user=os.getenv('KNOWHUB_USER'), password=os.getenv('KNOWHUB_PASSWORD'), database=db_name, connect_timeout=10 ) conn.autocommit = True return conn # ─── 13 张表的 CREATE 语句 ──────────────────────────────────────────────────── CREATE_TABLES = [ # === 实体表 (5) === """ CREATE TABLE IF NOT EXISTS knowledge ( id VARCHAR PRIMARY KEY, task_embedding float4[], content_embedding float4[], message_id VARCHAR, task VARCHAR, content TEXT, types TEXT[], tags JSONB DEFAULT '{}', tag_keys TEXT[], scopes TEXT[], owner VARCHAR, source JSONB DEFAULT '{}', eval JSONB DEFAULT '{}', created_at BIGINT, updated_at BIGINT, status VARCHAR DEFAULT 'approved' ) """, """ CREATE TABLE IF NOT EXISTS resource ( id TEXT PRIMARY KEY, title TEXT, body TEXT, secure_body TEXT, content_type TEXT, metadata JSONB DEFAULT '{}', sort_order INTEGER DEFAULT 0, submitted_by TEXT, created_at BIGINT, updated_at BIGINT ) """, """ CREATE TABLE IF NOT EXISTS requirement ( id VARCHAR PRIMARY KEY, description TEXT, source_nodes JSONB DEFAULT '[]', status VARCHAR DEFAULT '未满足', match_result TEXT, embedding float4[] ) """, """ CREATE TABLE IF NOT EXISTS capability ( id VARCHAR PRIMARY KEY, name VARCHAR, criterion TEXT, description TEXT, embedding float4[] ) """, """ CREATE TABLE IF NOT EXISTS tool ( id VARCHAR PRIMARY KEY, name VARCHAR, version VARCHAR, introduction TEXT, tutorial TEXT, input JSONB DEFAULT '""', output JSONB DEFAULT '""', updated_time BIGINT, status VARCHAR DEFAULT '未接入', embedding float4[] ) """, # === 实体链 (2) === """ CREATE TABLE IF NOT EXISTS requirement_capability ( requirement_id VARCHAR NOT NULL, capability_id VARCHAR NOT NULL, PRIMARY KEY (requirement_id, capability_id) ) """, """ CREATE TABLE IF NOT EXISTS capability_tool ( capability_id VARCHAR NOT NULL, tool_id VARCHAR NOT NULL, description TEXT DEFAULT '', PRIMARY KEY (capability_id, tool_id) ) """, # === 知识链 (3) === """ CREATE TABLE IF NOT EXISTS requirement_knowledge ( requirement_id VARCHAR NOT NULL, knowledge_id VARCHAR NOT NULL, PRIMARY KEY (requirement_id, knowledge_id) ) """, """ CREATE TABLE IF NOT EXISTS capability_knowledge ( capability_id VARCHAR NOT NULL, knowledge_id VARCHAR NOT NULL, PRIMARY KEY (capability_id, knowledge_id) ) """, """ CREATE TABLE IF NOT EXISTS tool_knowledge ( tool_id VARCHAR NOT NULL, knowledge_id VARCHAR NOT NULL, PRIMARY KEY (tool_id, knowledge_id) ) """, # === 来源链 (1) === """ CREATE TABLE IF NOT EXISTS knowledge_resource ( knowledge_id VARCHAR NOT NULL, resource_id VARCHAR NOT NULL, PRIMARY KEY (knowledge_id, resource_id) ) """, # === 知识间关系 (1) === """ CREATE TABLE IF NOT EXISTS knowledge_relation ( source_id VARCHAR NOT NULL, target_id VARCHAR NOT NULL, relation_type VARCHAR NOT NULL, PRIMARY KEY (source_id, target_id, relation_type) ) """, # === 执行层索引 (1) === """ CREATE TABLE IF NOT EXISTS tool_provider ( tool_id VARCHAR NOT NULL, provider_id VARCHAR NOT NULL, PRIMARY KEY (tool_id, provider_id) ) """, ] # ─── 数据迁移 ───────────────────────────────────────────────────────────────── def _parse_json(val): if val is None: return [] if not isinstance(val, dict) else {} if isinstance(val, (list, dict)): return val try: return json.loads(val) except (json.JSONDecodeError, TypeError): return [] # JSONB 列名集合(值需要 Json() 包装,否则 psycopg2 当纯文本发送导致类型错误) _JSONB_COLS = {'tags', 'source', 'eval', 'metadata', 'source_nodes', 'input', 'output'} def _adapt(col_name, v): """psycopg2 序列化适配:JSONB 列 → Json(),数组列 → 原生 list""" if v is None: return None if col_name in _JSONB_COLS: return Json(v) if isinstance(v, dict): return Json(v) # 未在 _JSONB_COLS 中的 dict(兜底) return v # float4[] / TEXT[] / 标量 def migrate_entities(old_cur, new_cur): """迁移 5 张实体表""" # knowledge: 排除 resource_ids, relationships, support_capability, tools KNOWLEDGE_COLS = ( 'id, task_embedding, content_embedding, message_id, task, content, ' 'types, tags, tag_keys, scopes, owner, source, eval, ' 'created_at, updated_at, status' ) _migrate_table(old_cur, new_cur, 'knowledge', 'knowledge', KNOWLEDGE_COLS) # resource: 旧表叫 resources,新表叫 resource RESOURCE_COLS = ( 'id, title, body, secure_body, content_type, metadata, ' 'sort_order, submitted_by, created_at, updated_at' ) _migrate_table(old_cur, new_cur, 'resource', 'resources', RESOURCE_COLS) # requirement: 排除 atomics REQ_COLS = 'id, description, source_nodes, status, match_result, embedding' _migrate_table(old_cur, new_cur, 'requirement', 'requirement', REQ_COLS) # capability: 排除 requirements, implements, tools, source_knowledge CAP_COLS = 'id, name, criterion, description, embedding' _migrate_table(old_cur, new_cur, 'capability', 'capability', CAP_COLS) # tool: 排除 implemented_tool_ids (旧表已无 JSONB 关联列) TOOL_COLS = 'id, name, version, introduction, tutorial, input, output, updated_time, status, embedding' _migrate_table(old_cur, new_cur, 'tool', 'tool', TOOL_COLS) def _migrate_table(old_cur, new_cur, new_table, old_table, columns): """通用表迁移:从旧表读指定列,写入新表""" new_cur.execute(f"SELECT COUNT(*) FROM {new_table}") existing = list(new_cur.fetchone().values())[0] if existing > 0: print(f" {new_table}: 已有 {existing} 行,跳过", flush=True) return old_cur.execute(f"SELECT {columns} FROM {old_table}") rows = old_cur.fetchall() if not rows: print(f" {new_table}: 源表为空", flush=True) return col_names = [c.strip() for c in columns.split(',')] placeholders = ', '.join(['%s'] * len(col_names)) total = len(rows) count = 0 errors = 0 for i, row in enumerate(rows): values = [_adapt(c, row[c]) for c in col_names] try: new_cur.execute( f"INSERT INTO {new_table} ({columns}) VALUES ({placeholders})", values) count += 1 except Exception as e: if 'duplicate key' not in str(e): errors += 1 if errors <= 3: # 只打印前 3 个错误 print(f" [!] {new_table} row {row.get('id','?')}: {e}", flush=True) if (i + 1) % 50 == 0 or i + 1 == total: print(f" {new_table}: {i+1}/{total} ({count} ok, {errors} err)", flush=True) print(f" {new_table}: 完成 {count}/{total} 行" + (f" ({errors} 错误)" if errors else ""), flush=True) def migrate_junctions(old_cur, new_cur): """迁移关联表数据""" # 直接复制旧关联表(已有正确数据) for table, cols in [ ('requirement_capability', 'requirement_id, capability_id'), ('capability_tool', 'capability_id, tool_id, description'), ('capability_knowledge', 'capability_id, knowledge_id'), ('tool_knowledge', 'tool_id, knowledge_id'), ]: _migrate_junction(old_cur, new_cur, table, table, cols) # knowledge_resource: 从 knowledge.resource_ids 数组提取 _migrate_knowledge_resource(old_cur, new_cur) # knowledge_relation: 从 knowledge.relationships JSONB 提取 _migrate_knowledge_relation(old_cur, new_cur) # requirement_knowledge: 旧库没有这张表,跳过 print(" requirement_knowledge: 旧库无数据(新增表)", flush=True) # tool_provider: 旧库没有这张表,跳过 print(" tool_provider: 旧库无数据(新增表)", flush=True) def _migrate_junction(old_cur, new_cur, new_table, old_table, cols): """复制关联表""" new_cur.execute(f"SELECT COUNT(*) FROM {new_table}") existing = list(new_cur.fetchone().values())[0] if existing > 0: print(f" {new_table}: 已有 {existing} 行,跳过", flush=True) return try: old_cur.execute(f"SELECT {cols} FROM {old_table}") except Exception as e: print(f" {new_table}: 旧表 {old_table} 不可读 ({e}),跳过", flush=True) return rows = old_cur.fetchall() if not rows: print(f" {new_table}: 源表为空", flush=True) return col_names = [c.strip() for c in cols.split(',')] placeholders = ', '.join(['%s'] * len(col_names)) count = 0 for row in rows: values = [_adapt(c, row[c]) for c in col_names] try: new_cur.execute( f"INSERT INTO {new_table} ({cols}) VALUES ({placeholders}) ON CONFLICT DO NOTHING", values) count += 1 except Exception as e: print(f" [!] {new_table}: {e}", flush=True) print(f" {new_table}: {count}/{len(rows)} 行", flush=True) def _migrate_knowledge_resource(old_cur, new_cur): """从 knowledge.resource_ids 数组提取到 knowledge_resource 关联表""" new_cur.execute("SELECT COUNT(*) FROM knowledge_resource") if list(new_cur.fetchone().values())[0] > 0: print(" knowledge_resource: 已有数据,跳过", flush=True) return # 预加载有效 resource ID new_cur.execute("SELECT id FROM resource") valid_resources = {list(r.values())[0] for r in new_cur.fetchall()} old_cur.execute("SELECT id, resource_ids FROM knowledge WHERE resource_ids IS NOT NULL") rows = old_cur.fetchall() count = 0 skipped = 0 for row in rows: rids = _parse_json(row['resource_ids']) if not isinstance(rids, list): continue for rid in rids: if rid in valid_resources: try: new_cur.execute( "INSERT INTO knowledge_resource (knowledge_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING", (row['id'], rid)) count += 1 except Exception as e: print(f" [!] knowledge_resource: {e}", flush=True) else: skipped += 1 print(f" knowledge_resource: {count} 行 (跳过悬空引用 {skipped})", flush=True) def _migrate_knowledge_relation(old_cur, new_cur): """从 knowledge.relationships JSONB 提取到 knowledge_relation 关联表""" new_cur.execute("SELECT COUNT(*) FROM knowledge_relation") if list(new_cur.fetchone().values())[0] > 0: print(" knowledge_relation: 已有数据,跳过", flush=True) return old_cur.execute("SELECT id, relationships FROM knowledge WHERE relationships IS NOT NULL") rows = old_cur.fetchall() count = 0 for row in rows: rels = _parse_json(row['relationships']) if not isinstance(rels, list): continue for rel in rels: if isinstance(rel, dict) and 'type' in rel and 'target' in rel: try: new_cur.execute( "INSERT INTO knowledge_relation (source_id, target_id, relation_type) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING", (row['id'], rel['target'], rel['type'])) count += 1 except Exception as e: print(f" [!] knowledge_relation: {e}", flush=True) print(f" knowledge_relation: {count} 行", flush=True) # ─── 主流程 ─────────────────────────────────────────────────────────────────── def main(): OLD_DB = os.getenv('KNOWHUB_DB_NAME') NEW_DB = 'knowhub' print("=" * 60, flush=True) print(f"迁移: {OLD_DB} → {NEW_DB}", flush=True) print("=" * 60, flush=True) # Step 1: 建表 print(f"\n[1/3] 在 {NEW_DB} 建表...", flush=True) new_conn = connect(NEW_DB) new_cur = new_conn.cursor() for sql in CREATE_TABLES: new_cur.execute(sql) new_cur.close() # 验证 new_cur = new_conn.cursor() new_cur.execute("SELECT tablename FROM pg_tables WHERE schemaname='public' ORDER BY tablename") tables = [r[0] for r in new_cur.fetchall()] print(f" {len(tables)} 张表: {', '.join(tables)}", flush=True) new_cur.close() # Step 2: 迁移实体数据(每次新建旧库连接,避免长连接超时) print(f"\n[2/3] 迁移实体数据...", flush=True) old_conn = connect(OLD_DB) old_cur = old_conn.cursor(cursor_factory=RealDictCursor) new_cur = new_conn.cursor(cursor_factory=RealDictCursor) migrate_entities(old_cur, new_cur) old_cur.close() old_conn.close() # Step 3: 迁移关联数据(每批独立连接旧库,防止长连接超时) print(f"\n[3/3] 迁移关联数据...", flush=True) # 3a: 复制旧关联表 old_conn = connect(OLD_DB) old_cur = old_conn.cursor(cursor_factory=RealDictCursor) for table, cols in [ ('requirement_capability', 'requirement_id, capability_id'), ('capability_tool', 'capability_id, tool_id, description'), ('capability_knowledge', 'capability_id, knowledge_id'), ('tool_knowledge', 'tool_id, knowledge_id'), ]: _migrate_junction(old_cur, new_cur, table, table, cols) old_cur.close() old_conn.close() # 3b: knowledge_resource(重新连接) old_conn = connect(OLD_DB) old_cur = old_conn.cursor(cursor_factory=RealDictCursor) _migrate_knowledge_resource(old_cur, new_cur) old_cur.close() old_conn.close() # 3c: knowledge_relation(重新连接) old_conn = connect(OLD_DB) old_cur = old_conn.cursor(cursor_factory=RealDictCursor) _migrate_knowledge_relation(old_cur, new_cur) old_cur.close() old_conn.close() # 新增表无旧数据 print(" requirement_knowledge: 新增表,无旧数据", flush=True) print(" tool_provider: 新增表,无旧数据", flush=True) # 验证 print(f"\n{'=' * 60}", flush=True) print(f"验证 {NEW_DB}:", flush=True) print(f"{'=' * 60}", flush=True) for t in tables: new_cur.execute(f"SELECT COUNT(*) as c FROM {t}") cnt = new_cur.fetchone()['c'] print(f" {t}: {cnt} rows", flush=True) print(f"\n{'=' * 60}", flush=True) print("迁移完成!", flush=True) print(f"下一步:将 .env 中 KNOWHUB_DB_NAME 改为 {NEW_DB}", flush=True) print(f"{'=' * 60}", flush=True) new_cur.close() new_conn.close() if __name__ == '__main__': main()