#!/usr/bin/env python3 """ 数据库迁移脚本 v2: 1. 新建 atomic_capability 表 2. ALTER tool_table: 新增 capabilities, embedding; 重命名 knowledge -> tool_knowledge 3. ALTER knowledge: 新增 support_capability, tools; 拆分 embedding -> task_embedding + content_embedding 4. ALTER requirement_table: 大幅重构(删旧列、加新列) 5. 从 JSON/MD 文件导入初始数据 """ import os import sys import json import time import psycopg2 from psycopg2.extras import RealDictCursor from dotenv import load_dotenv # 显式加载项目根目录的 .env _script_dir = os.path.dirname(os.path.abspath(__file__)) _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..')) load_dotenv(os.path.join(_project_root, '.env')) # ─── 连接 ─────────────────────────────────────────────────────────────────── def get_connection(): host = os.getenv('KNOWHUB_DB') port = int(os.getenv('KNOWHUB_PORT', 5432)) user = os.getenv('KNOWHUB_USER') password = os.getenv('KNOWHUB_PASSWORD') dbname = os.getenv('KNOWHUB_DB_NAME') print(f" Connecting to {host}:{port}/{dbname} as {user} ...") conn = psycopg2.connect( host=host, port=port, user=user, password=password, database=dbname, connect_timeout=10 ) conn.autocommit = True print(" Connected.") return conn # ─── 1. 新建 atomic_capability 表 ────────────────────────────────────────── CREATE_ATOMIC_CAPABILITY = """ CREATE TABLE IF NOT EXISTS atomic_capability ( id VARCHAR(64) PRIMARY KEY, name VARCHAR(255) NOT NULL, criterion TEXT, description TEXT, requirements JSONB DEFAULT '[]', implements JSONB DEFAULT '{}', tools JSONB DEFAULT '[]', source_knowledge JSONB DEFAULT '[]', embedding float4[] ) WITH (appendoptimized=false); """ # ─── 2. ALTER tool_table ─────────────────────────────────────────────────── ALTER_TOOL_TABLE = [ # 新增 capabilities "ALTER TABLE tool_table ADD COLUMN IF NOT EXISTS capabilities JSONB DEFAULT '[]'", # 新增 embedding "ALTER TABLE tool_table ADD COLUMN IF NOT EXISTS embedding float4[]", # 重命名 knowledge -> tool_knowledge(需检查是否已重命名) """ DO $$ BEGIN IF EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'tool_table' AND column_name = 'knowledge' ) AND NOT EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'tool_table' AND column_name = 'tool_knowledge' ) THEN ALTER TABLE tool_table RENAME COLUMN knowledge TO tool_knowledge; END IF; END $$; """, ] # ─── 3. ALTER knowledge 表 ───────────────────────────────────────────────── ALTER_KNOWLEDGE = [ # 新增 support_capability "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS support_capability JSONB DEFAULT '[]'", # 新增 tools "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS tools JSONB DEFAULT '[]'", # 拆分 embedding -> task_embedding + content_embedding # 先加新列,再迁移数据,最后删旧列 "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS task_embedding float4[]", "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS content_embedding float4[]", # 将原 embedding 数据复制到 content_embedding(原有的 embedding 是基于 content 的) """ DO $$ BEGIN IF EXISTS ( SELECT 1 FROM information_schema.columns WHERE table_name = 'knowledge' AND column_name = 'embedding' ) THEN UPDATE knowledge SET content_embedding = embedding WHERE content_embedding IS NULL AND embedding IS NOT NULL; ALTER TABLE knowledge DROP COLUMN embedding; END IF; END $$; """, ] # ─── 4. ALTER requirement_table ──────────────────────────────────────────── ALTER_REQUIREMENT = [ # 新增字段 "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS description TEXT", "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS atomics JSONB DEFAULT '[]'", "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS source_nodes JSONB DEFAULT '[]'", "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS status VARCHAR(32) DEFAULT '未满足'", "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS match_result TEXT", # 删除旧字段(检查存在再删) """ DO $$ BEGIN IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'task') THEN ALTER TABLE requirement_table DROP COLUMN task; END IF; IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'type') THEN ALTER TABLE requirement_table DROP COLUMN type; END IF; IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'source_type') THEN ALTER TABLE requirement_table DROP COLUMN source_type; END IF; IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'source_itemset_id') THEN ALTER TABLE requirement_table DROP COLUMN source_itemset_id; END IF; IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'source_items') THEN ALTER TABLE requirement_table DROP COLUMN source_items; END IF; IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'tools') THEN ALTER TABLE requirement_table DROP COLUMN tools; END IF; IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'knowledge') THEN ALTER TABLE requirement_table DROP COLUMN knowledge; END IF; IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'case_knowledge') THEN ALTER TABLE requirement_table DROP COLUMN case_knowledge; END IF; IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'process_knowledge') THEN ALTER TABLE requirement_table DROP COLUMN process_knowledge; END IF; IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'trace') THEN ALTER TABLE requirement_table DROP COLUMN trace; END IF; IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'body') THEN ALTER TABLE requirement_table DROP COLUMN body; END IF; END $$; """, # embedding 字段保留,仍为 float4[] ] # ─── 5. 导入原子能力初始数据 ────────────────────────────────────────────── def parse_atomic_capabilities(md_path): """从 atomic_capabilities.md 解析出原子能力列表""" caps = [] current = None with open(md_path, 'r', encoding='utf-8') as f: lines = f.readlines() for line in lines: line = line.rstrip() # 匹配 ### CAP-XXX: 名称 if line.startswith('### CAP-'): if current: caps.append(current) parts = line.split(':', 1) cap_id = parts[0].replace('### ', '').strip() cap_name = parts[1].strip() if len(parts) > 1 else '' current = { 'id': cap_id, 'name': cap_name, 'criterion': '', 'description': '', 'implements': {}, 'tools': [], 'source_knowledge': [], 'requirements': [], } elif current: if line.startswith('- **功能描述**:'): current['description'] = line.replace('- **功能描述**:', '').strip() elif line.startswith('- **判定标准**:'): current['criterion'] = line.replace('- **判定标准**:', '').strip() elif line.startswith(' - ComfyUI:') or line.startswith(' - FLUX') or \ line.startswith(' - Midjourney') or line.startswith(' - Nano Banana') or \ line.startswith(' - Seedream'): # 提取工具名 tool_name = line.strip().lstrip('- ').split(':')[0].strip() impl_desc = line.strip().lstrip('- ').split(':', 1)[1].strip() if ':' in line else '' current['implements'][tool_name] = impl_desc if current: caps.append(current) return caps def import_atomic_capabilities(cursor, caps): """插入原子能力数据""" count = 0 for cap in caps: try: cursor.execute(""" INSERT INTO atomic_capability (id, name, criterion, description, requirements, implements, tools, source_knowledge) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, criterion = EXCLUDED.criterion, description = EXCLUDED.description, implements = EXCLUDED.implements """, ( cap['id'], cap['name'], cap['criterion'], cap['description'], json.dumps(cap['requirements']), json.dumps(cap['implements']), json.dumps(cap['tools']), json.dumps(cap['source_knowledge']), )) count += 1 except Exception as e: print(f" [!] 插入 {cap['id']} 失败: {e}") return count # ─── 6. 导入需求初始数据 ────────────────────────────────────────────────── def import_requirements(cursor, json_path): """从 requirements_sorted.json 导入需求数据""" with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) reqs = data.get('requirements', []) count = 0 for req in reqs: req_id = req['requirement_id'] description = req['requirement_text'] atomics = req.get('matched_capabilities', []) # 构建 source_nodes:从 source_nodes + source_posts 合成 source_nodes_data = [] for node_name in req.get('source_nodes', []): source_nodes_data.append({ 'node_name': node_name, 'posts': req.get('source_posts', []) }) # 状态映射 match_status = req.get('match_status', '未满足') if match_status == '完全满足': status = '已满足' elif match_status == '需要调研': status = '未满足' else: status = '未满足' # match_result: 组合 capability_combination + research_note match_parts = [] if req.get('capability_combination'): match_parts.append(req['capability_combination']) if req.get('research_note'): match_parts.append(req['research_note']) match_result = '\n'.join(match_parts) if match_parts else '' try: cursor.execute(""" INSERT INTO requirement_table (id, description, atomics, source_nodes, status, match_result) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET description = EXCLUDED.description, atomics = EXCLUDED.atomics, source_nodes = EXCLUDED.source_nodes, status = EXCLUDED.status, match_result = EXCLUDED.match_result """, ( req_id, description, json.dumps(atomics), json.dumps(source_nodes_data), status, match_result, )) count += 1 except Exception as e: print(f" [!] 插入 {req_id} 失败: {e}") return count # ─── 主流程 ─────────────────────────────────────────────────────────────── def main(): print("=" * 60) print("KnowHub 数据库迁移 v2") print("=" * 60) conn = get_connection() cursor = conn.cursor(cursor_factory=RealDictCursor) # Step 1: 新建 atomic_capability 表 print("\n[1/6] 新建 atomic_capability 表...") try: cursor.execute(CREATE_ATOMIC_CAPABILITY) print(" OK: atomic_capability 表创建成功") except Exception as e: print(f" FAIL: {e}") # Step 2: ALTER tool_table print("\n[2/6] 更新 tool_table...") for sql in ALTER_TOOL_TABLE: try: cursor.execute(sql) except Exception as e: print(f" WARN: {e}") print(" OK: tool_table 更新完成 (capabilities, embedding, tool_knowledge)") # Step 3: ALTER knowledge print("\n[3/6] 更新 knowledge 表...") for sql in ALTER_KNOWLEDGE: try: cursor.execute(sql) except Exception as e: print(f" WARN: {e}") print(" OK: knowledge 更新完成 (support_capability, tools, task_embedding, content_embedding)") # Step 4: ALTER requirement_table print("\n[4/6] 更新 requirement_table...") for sql in ALTER_REQUIREMENT: try: cursor.execute(sql) except Exception as e: print(f" WARN: {e}") print(" OK: requirement_table 更新完成 (description, atomics, source_nodes, status, match_result)") # Step 5: 导入原子能力 print("\n[5/6] 导入原子能力数据...") md_path = os.path.join(os.path.dirname(__file__), '..', '..', 'examples', 'tool_research', 'atomic_capabilities.md') md_path = os.path.normpath(md_path) if os.path.exists(md_path): caps = parse_atomic_capabilities(md_path) count = import_atomic_capabilities(cursor, caps) print(f" OK: 导入 {count} 条原子能力") else: print(f" SKIP: 文件不存在 {md_path}") # Step 6: 导入需求数据 print("\n[6/6] 导入需求数据...") json_path = os.path.join(os.path.dirname(__file__), '..', '..', 'examples', 'tool_research', 'requirements_sorted.json') json_path = os.path.normpath(json_path) if os.path.exists(json_path): count = import_requirements(cursor, json_path) print(f" OK: 导入 {count} 条需求") else: print(f" SKIP: 文件不存在 {json_path}") # 验证 print("\n" + "=" * 60) print("验证结果:") print("=" * 60) for table in ['atomic_capability', 'tool_table', 'knowledge', 'requirement_table', 'resources']: try: cursor.execute(f"SELECT COUNT(*) as count FROM {table}") row = cursor.fetchone() count = row['count'] if row else 0 cursor.execute(f""" SELECT column_name FROM information_schema.columns WHERE table_name = '{table}' ORDER BY ordinal_position """) cols = [r['column_name'] for r in cursor.fetchall()] print(f"\n {table} ({count} rows)") print(f" 字段: {', '.join(cols)}") except Exception as e: print(f"\n {table}: ERROR - {e}") cursor.close() conn.close() print("\n迁移完成!") if __name__ == '__main__': main()