| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- #!/usr/bin/env python3
- """
- 数据库迁移脚本 v2:
- 1. 新建 atomic_capability 表
- 2. ALTER tool_table: 新增 capabilities, embedding; 重命名 knowledge -> tool_knowledge
- 3. ALTER knowledge: 新增 support_capability, tools; 拆分 embedding -> task_embedding + content_embedding
- 4. ALTER requirement_table: 大幅重构(删旧列、加新列)
- 5. 从 JSON/MD 文件导入初始数据
- """
- import os
- import sys
- import json
- import time
- import psycopg2
- from psycopg2.extras import RealDictCursor
- from dotenv import load_dotenv
- # 显式加载项目根目录的 .env
- _script_dir = os.path.dirname(os.path.abspath(__file__))
- _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..'))
- load_dotenv(os.path.join(_project_root, '.env'))
- # ─── 连接 ───────────────────────────────────────────────────────────────────
- def get_connection():
- host = os.getenv('KNOWHUB_DB')
- port = int(os.getenv('KNOWHUB_PORT', 5432))
- user = os.getenv('KNOWHUB_USER')
- password = os.getenv('KNOWHUB_PASSWORD')
- dbname = os.getenv('KNOWHUB_DB_NAME')
- print(f" Connecting to {host}:{port}/{dbname} as {user} ...")
- conn = psycopg2.connect(
- host=host,
- port=port,
- user=user,
- password=password,
- database=dbname,
- connect_timeout=10
- )
- conn.autocommit = True
- print(" Connected.")
- return conn
- # ─── 1. 新建 atomic_capability 表 ──────────────────────────────────────────
- CREATE_ATOMIC_CAPABILITY = """
- CREATE TABLE IF NOT EXISTS atomic_capability (
- id VARCHAR(64) PRIMARY KEY,
- name VARCHAR(255) NOT NULL,
- criterion TEXT,
- description TEXT,
- requirements JSONB DEFAULT '[]',
- implements JSONB DEFAULT '{}',
- tools JSONB DEFAULT '[]',
- source_knowledge JSONB DEFAULT '[]',
- embedding float4[]
- ) WITH (appendoptimized=false);
- """
- # ─── 2. ALTER tool_table ───────────────────────────────────────────────────
- ALTER_TOOL_TABLE = [
- # 新增 capabilities
- "ALTER TABLE tool_table ADD COLUMN IF NOT EXISTS capabilities JSONB DEFAULT '[]'",
- # 新增 embedding
- "ALTER TABLE tool_table ADD COLUMN IF NOT EXISTS embedding float4[]",
- # 重命名 knowledge -> tool_knowledge(需检查是否已重命名)
- """
- DO $$
- BEGIN
- IF EXISTS (
- SELECT 1 FROM information_schema.columns
- WHERE table_name = 'tool_table' AND column_name = 'knowledge'
- ) AND NOT EXISTS (
- SELECT 1 FROM information_schema.columns
- WHERE table_name = 'tool_table' AND column_name = 'tool_knowledge'
- ) THEN
- ALTER TABLE tool_table RENAME COLUMN knowledge TO tool_knowledge;
- END IF;
- END $$;
- """,
- ]
- # ─── 3. ALTER knowledge 表 ─────────────────────────────────────────────────
- ALTER_KNOWLEDGE = [
- # 新增 support_capability
- "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS support_capability JSONB DEFAULT '[]'",
- # 新增 tools
- "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS tools JSONB DEFAULT '[]'",
- # 拆分 embedding -> task_embedding + content_embedding
- # 先加新列,再迁移数据,最后删旧列
- "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS task_embedding float4[]",
- "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS content_embedding float4[]",
- # 将原 embedding 数据复制到 content_embedding(原有的 embedding 是基于 content 的)
- """
- DO $$
- BEGIN
- IF EXISTS (
- SELECT 1 FROM information_schema.columns
- WHERE table_name = 'knowledge' AND column_name = 'embedding'
- ) THEN
- UPDATE knowledge SET content_embedding = embedding
- WHERE content_embedding IS NULL AND embedding IS NOT NULL;
- ALTER TABLE knowledge DROP COLUMN embedding;
- END IF;
- END $$;
- """,
- ]
- # ─── 4. ALTER requirement_table ────────────────────────────────────────────
- ALTER_REQUIREMENT = [
- # 新增字段
- "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS description TEXT",
- "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS atomics JSONB DEFAULT '[]'",
- "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS source_nodes JSONB DEFAULT '[]'",
- "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS status VARCHAR(32) DEFAULT '未满足'",
- "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS match_result TEXT",
- # 删除旧字段(检查存在再删)
- """
- DO $$
- BEGIN
- IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'task') THEN
- ALTER TABLE requirement_table DROP COLUMN task;
- END IF;
- IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'type') THEN
- ALTER TABLE requirement_table DROP COLUMN type;
- END IF;
- IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'source_type') THEN
- ALTER TABLE requirement_table DROP COLUMN source_type;
- END IF;
- IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'source_itemset_id') THEN
- ALTER TABLE requirement_table DROP COLUMN source_itemset_id;
- END IF;
- IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'source_items') THEN
- ALTER TABLE requirement_table DROP COLUMN source_items;
- END IF;
- IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'tools') THEN
- ALTER TABLE requirement_table DROP COLUMN tools;
- END IF;
- IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'knowledge') THEN
- ALTER TABLE requirement_table DROP COLUMN knowledge;
- END IF;
- IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'case_knowledge') THEN
- ALTER TABLE requirement_table DROP COLUMN case_knowledge;
- END IF;
- IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'process_knowledge') THEN
- ALTER TABLE requirement_table DROP COLUMN process_knowledge;
- END IF;
- IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'trace') THEN
- ALTER TABLE requirement_table DROP COLUMN trace;
- END IF;
- IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'body') THEN
- ALTER TABLE requirement_table DROP COLUMN body;
- END IF;
- END $$;
- """,
- # embedding 字段保留,仍为 float4[]
- ]
- # ─── 5. 导入原子能力初始数据 ──────────────────────────────────────────────
- def parse_atomic_capabilities(md_path):
- """从 atomic_capabilities.md 解析出原子能力列表"""
- caps = []
- current = None
- with open(md_path, 'r', encoding='utf-8') as f:
- lines = f.readlines()
- for line in lines:
- line = line.rstrip()
- # 匹配 ### CAP-XXX: 名称
- if line.startswith('### CAP-'):
- if current:
- caps.append(current)
- parts = line.split(':', 1)
- cap_id = parts[0].replace('### ', '').strip()
- cap_name = parts[1].strip() if len(parts) > 1 else ''
- current = {
- 'id': cap_id,
- 'name': cap_name,
- 'criterion': '',
- 'description': '',
- 'implements': {},
- 'tools': [],
- 'source_knowledge': [],
- 'requirements': [],
- }
- elif current:
- if line.startswith('- **功能描述**:'):
- current['description'] = line.replace('- **功能描述**:', '').strip()
- elif line.startswith('- **判定标准**:'):
- current['criterion'] = line.replace('- **判定标准**:', '').strip()
- elif line.startswith(' - ComfyUI:') or line.startswith(' - FLUX') or \
- line.startswith(' - Midjourney') or line.startswith(' - Nano Banana') or \
- line.startswith(' - Seedream'):
- # 提取工具名
- tool_name = line.strip().lstrip('- ').split(':')[0].strip()
- impl_desc = line.strip().lstrip('- ').split(':', 1)[1].strip() if ':' in line else ''
- current['implements'][tool_name] = impl_desc
- if current:
- caps.append(current)
- return caps
- def import_atomic_capabilities(cursor, caps):
- """插入原子能力数据"""
- count = 0
- for cap in caps:
- try:
- cursor.execute("""
- INSERT INTO atomic_capability (id, name, criterion, description, requirements, implements, tools, source_knowledge)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
- ON CONFLICT (id) DO UPDATE SET
- name = EXCLUDED.name,
- criterion = EXCLUDED.criterion,
- description = EXCLUDED.description,
- implements = EXCLUDED.implements
- """, (
- cap['id'],
- cap['name'],
- cap['criterion'],
- cap['description'],
- json.dumps(cap['requirements']),
- json.dumps(cap['implements']),
- json.dumps(cap['tools']),
- json.dumps(cap['source_knowledge']),
- ))
- count += 1
- except Exception as e:
- print(f" [!] 插入 {cap['id']} 失败: {e}")
- return count
- # ─── 6. 导入需求初始数据 ──────────────────────────────────────────────────
- def import_requirements(cursor, json_path):
- """从 requirements_sorted.json 导入需求数据"""
- with open(json_path, 'r', encoding='utf-8') as f:
- data = json.load(f)
- reqs = data.get('requirements', [])
- count = 0
- for req in reqs:
- req_id = req['requirement_id']
- description = req['requirement_text']
- atomics = req.get('matched_capabilities', [])
- # 构建 source_nodes:从 source_nodes + source_posts 合成
- source_nodes_data = []
- for node_name in req.get('source_nodes', []):
- source_nodes_data.append({
- 'node_name': node_name,
- 'posts': req.get('source_posts', [])
- })
- # 状态映射
- match_status = req.get('match_status', '未满足')
- if match_status == '完全满足':
- status = '已满足'
- elif match_status == '需要调研':
- status = '未满足'
- else:
- status = '未满足'
- # match_result: 组合 capability_combination + research_note
- match_parts = []
- if req.get('capability_combination'):
- match_parts.append(req['capability_combination'])
- if req.get('research_note'):
- match_parts.append(req['research_note'])
- match_result = '\n'.join(match_parts) if match_parts else ''
- try:
- cursor.execute("""
- INSERT INTO requirement_table (id, description, atomics, source_nodes, status, match_result)
- VALUES (%s, %s, %s, %s, %s, %s)
- ON CONFLICT (id) DO UPDATE SET
- description = EXCLUDED.description,
- atomics = EXCLUDED.atomics,
- source_nodes = EXCLUDED.source_nodes,
- status = EXCLUDED.status,
- match_result = EXCLUDED.match_result
- """, (
- req_id,
- description,
- json.dumps(atomics),
- json.dumps(source_nodes_data),
- status,
- match_result,
- ))
- count += 1
- except Exception as e:
- print(f" [!] 插入 {req_id} 失败: {e}")
- return count
- # ─── 主流程 ───────────────────────────────────────────────────────────────
- def main():
- print("=" * 60)
- print("KnowHub 数据库迁移 v2")
- print("=" * 60)
- conn = get_connection()
- cursor = conn.cursor(cursor_factory=RealDictCursor)
- # Step 1: 新建 atomic_capability 表
- print("\n[1/6] 新建 atomic_capability 表...")
- try:
- cursor.execute(CREATE_ATOMIC_CAPABILITY)
- print(" OK: atomic_capability 表创建成功")
- except Exception as e:
- print(f" FAIL: {e}")
- # Step 2: ALTER tool_table
- print("\n[2/6] 更新 tool_table...")
- for sql in ALTER_TOOL_TABLE:
- try:
- cursor.execute(sql)
- except Exception as e:
- print(f" WARN: {e}")
- print(" OK: tool_table 更新完成 (capabilities, embedding, tool_knowledge)")
- # Step 3: ALTER knowledge
- print("\n[3/6] 更新 knowledge 表...")
- for sql in ALTER_KNOWLEDGE:
- try:
- cursor.execute(sql)
- except Exception as e:
- print(f" WARN: {e}")
- print(" OK: knowledge 更新完成 (support_capability, tools, task_embedding, content_embedding)")
- # Step 4: ALTER requirement_table
- print("\n[4/6] 更新 requirement_table...")
- for sql in ALTER_REQUIREMENT:
- try:
- cursor.execute(sql)
- except Exception as e:
- print(f" WARN: {e}")
- print(" OK: requirement_table 更新完成 (description, atomics, source_nodes, status, match_result)")
- # Step 5: 导入原子能力
- print("\n[5/6] 导入原子能力数据...")
- md_path = os.path.join(os.path.dirname(__file__), '..', '..', 'examples', 'tool_research', 'atomic_capabilities.md')
- md_path = os.path.normpath(md_path)
- if os.path.exists(md_path):
- caps = parse_atomic_capabilities(md_path)
- count = import_atomic_capabilities(cursor, caps)
- print(f" OK: 导入 {count} 条原子能力")
- else:
- print(f" SKIP: 文件不存在 {md_path}")
- # Step 6: 导入需求数据
- print("\n[6/6] 导入需求数据...")
- json_path = os.path.join(os.path.dirname(__file__), '..', '..', 'examples', 'tool_research', 'requirements_sorted.json')
- json_path = os.path.normpath(json_path)
- if os.path.exists(json_path):
- count = import_requirements(cursor, json_path)
- print(f" OK: 导入 {count} 条需求")
- else:
- print(f" SKIP: 文件不存在 {json_path}")
- # 验证
- print("\n" + "=" * 60)
- print("验证结果:")
- print("=" * 60)
- for table in ['atomic_capability', 'tool_table', 'knowledge', 'requirement_table', 'resources']:
- try:
- cursor.execute(f"SELECT COUNT(*) as count FROM {table}")
- row = cursor.fetchone()
- count = row['count'] if row else 0
- cursor.execute(f"""
- SELECT column_name FROM information_schema.columns
- WHERE table_name = '{table}'
- ORDER BY ordinal_position
- """)
- cols = [r['column_name'] for r in cursor.fetchall()]
- print(f"\n {table} ({count} rows)")
- print(f" 字段: {', '.join(cols)}")
- except Exception as e:
- print(f"\n {table}: ERROR - {e}")
- cursor.close()
- conn.close()
- print("\n迁移完成!")
- if __name__ == '__main__':
- main()
|