| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- #!/usr/bin/env python3
- """
- 迁移到新数据库 knowhub:从 knowledge_hub 读数据,写入 knowhub。
- 13 张表:5 实体 + 8 关联。
- 只使用 CREATE TABLE / INSERT / DROP TABLE,不执行 ALTER。
- 每步幂等,可安全重跑。
- """
- import os
- import json
- import psycopg2
- from psycopg2.extras import RealDictCursor, Json
- from dotenv import load_dotenv
- _script_dir = os.path.dirname(os.path.abspath(__file__))
- _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..', '..'))
- load_dotenv(os.path.join(_project_root, '.env'))
- def connect(db_name):
- conn = psycopg2.connect(
- host=os.getenv('KNOWHUB_DB'),
- port=int(os.getenv('KNOWHUB_PORT', 5432)),
- user=os.getenv('KNOWHUB_USER'),
- password=os.getenv('KNOWHUB_PASSWORD'),
- database=db_name,
- connect_timeout=10
- )
- conn.autocommit = True
- return conn
- # ─── 13 张表的 CREATE 语句 ────────────────────────────────────────────────────
- CREATE_TABLES = [
- # === 实体表 (5) ===
- """
- CREATE TABLE IF NOT EXISTS knowledge (
- id VARCHAR PRIMARY KEY,
- task_embedding float4[],
- content_embedding float4[],
- message_id VARCHAR,
- task VARCHAR,
- content TEXT,
- types TEXT[],
- tags JSONB DEFAULT '{}',
- tag_keys TEXT[],
- scopes TEXT[],
- owner VARCHAR,
- source JSONB DEFAULT '{}',
- eval JSONB DEFAULT '{}',
- created_at BIGINT,
- updated_at BIGINT,
- status VARCHAR DEFAULT 'approved'
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS resource (
- id TEXT PRIMARY KEY,
- title TEXT,
- body TEXT,
- secure_body TEXT,
- content_type TEXT,
- metadata JSONB DEFAULT '{}',
- sort_order INTEGER DEFAULT 0,
- submitted_by TEXT,
- created_at BIGINT,
- updated_at BIGINT
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS requirement (
- id VARCHAR PRIMARY KEY,
- description TEXT,
- source_nodes JSONB DEFAULT '[]',
- status VARCHAR DEFAULT '未满足',
- match_result TEXT,
- embedding float4[]
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS capability (
- id VARCHAR PRIMARY KEY,
- name VARCHAR,
- criterion TEXT,
- description TEXT,
- embedding float4[]
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS tool (
- id VARCHAR PRIMARY KEY,
- name VARCHAR,
- version VARCHAR,
- introduction TEXT,
- tutorial TEXT,
- input JSONB DEFAULT '""',
- output JSONB DEFAULT '""',
- updated_time BIGINT,
- status VARCHAR DEFAULT '未接入',
- embedding float4[]
- )
- """,
- # === 实体链 (2) ===
- """
- CREATE TABLE IF NOT EXISTS requirement_capability (
- requirement_id VARCHAR NOT NULL,
- capability_id VARCHAR NOT NULL,
- PRIMARY KEY (requirement_id, capability_id)
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS capability_tool (
- capability_id VARCHAR NOT NULL,
- tool_id VARCHAR NOT NULL,
- description TEXT DEFAULT '',
- PRIMARY KEY (capability_id, tool_id)
- )
- """,
- # === 知识链 (3) ===
- """
- CREATE TABLE IF NOT EXISTS requirement_knowledge (
- requirement_id VARCHAR NOT NULL,
- knowledge_id VARCHAR NOT NULL,
- PRIMARY KEY (requirement_id, knowledge_id)
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS capability_knowledge (
- capability_id VARCHAR NOT NULL,
- knowledge_id VARCHAR NOT NULL,
- PRIMARY KEY (capability_id, knowledge_id)
- )
- """,
- """
- CREATE TABLE IF NOT EXISTS tool_knowledge (
- tool_id VARCHAR NOT NULL,
- knowledge_id VARCHAR NOT NULL,
- PRIMARY KEY (tool_id, knowledge_id)
- )
- """,
- # === 来源链 (1) ===
- """
- CREATE TABLE IF NOT EXISTS knowledge_resource (
- knowledge_id VARCHAR NOT NULL,
- resource_id VARCHAR NOT NULL,
- PRIMARY KEY (knowledge_id, resource_id)
- )
- """,
- # === 知识间关系 (1) ===
- """
- CREATE TABLE IF NOT EXISTS knowledge_relation (
- source_id VARCHAR NOT NULL,
- target_id VARCHAR NOT NULL,
- relation_type VARCHAR NOT NULL,
- PRIMARY KEY (source_id, target_id, relation_type)
- )
- """,
- # === 执行层索引 (1) ===
- """
- CREATE TABLE IF NOT EXISTS tool_provider (
- tool_id VARCHAR NOT NULL,
- provider_id VARCHAR NOT NULL,
- PRIMARY KEY (tool_id, provider_id)
- )
- """,
- ]
- # ─── 数据迁移 ─────────────────────────────────────────────────────────────────
- def _parse_json(val):
- if val is None:
- return [] if not isinstance(val, dict) else {}
- if isinstance(val, (list, dict)):
- return val
- try:
- return json.loads(val)
- except (json.JSONDecodeError, TypeError):
- return []
- # JSONB 列名集合(值需要 Json() 包装,否则 psycopg2 当纯文本发送导致类型错误)
- _JSONB_COLS = {'tags', 'source', 'eval', 'metadata', 'source_nodes', 'input', 'output'}
- def _adapt(col_name, v):
- """psycopg2 序列化适配:JSONB 列 → Json(),数组列 → 原生 list"""
- if v is None:
- return None
- if col_name in _JSONB_COLS:
- return Json(v)
- if isinstance(v, dict):
- return Json(v) # 未在 _JSONB_COLS 中的 dict(兜底)
- return v # float4[] / TEXT[] / 标量
- def migrate_entities(old_cur, new_cur):
- """迁移 5 张实体表"""
- # knowledge: 排除 resource_ids, relationships, support_capability, tools
- KNOWLEDGE_COLS = (
- 'id, task_embedding, content_embedding, message_id, task, content, '
- 'types, tags, tag_keys, scopes, owner, source, eval, '
- 'created_at, updated_at, status'
- )
- _migrate_table(old_cur, new_cur, 'knowledge', 'knowledge', KNOWLEDGE_COLS)
- # resource: 旧表叫 resources,新表叫 resource
- RESOURCE_COLS = (
- 'id, title, body, secure_body, content_type, metadata, '
- 'sort_order, submitted_by, created_at, updated_at'
- )
- _migrate_table(old_cur, new_cur, 'resource', 'resources', RESOURCE_COLS)
- # requirement: 排除 atomics
- REQ_COLS = 'id, description, source_nodes, status, match_result, embedding'
- _migrate_table(old_cur, new_cur, 'requirement', 'requirement', REQ_COLS)
- # capability: 排除 requirements, implements, tools, source_knowledge
- CAP_COLS = 'id, name, criterion, description, embedding'
- _migrate_table(old_cur, new_cur, 'capability', 'capability', CAP_COLS)
- # tool: 排除 implemented_tool_ids (旧表已无 JSONB 关联列)
- TOOL_COLS = 'id, name, version, introduction, tutorial, input, output, updated_time, status, embedding'
- _migrate_table(old_cur, new_cur, 'tool', 'tool', TOOL_COLS)
- def _migrate_table(old_cur, new_cur, new_table, old_table, columns):
- """通用表迁移:从旧表读指定列,写入新表"""
- new_cur.execute(f"SELECT COUNT(*) FROM {new_table}")
- existing = list(new_cur.fetchone().values())[0]
- if existing > 0:
- print(f" {new_table}: 已有 {existing} 行,跳过", flush=True)
- return
- old_cur.execute(f"SELECT {columns} FROM {old_table}")
- rows = old_cur.fetchall()
- if not rows:
- print(f" {new_table}: 源表为空", flush=True)
- return
- col_names = [c.strip() for c in columns.split(',')]
- placeholders = ', '.join(['%s'] * len(col_names))
- total = len(rows)
- count = 0
- errors = 0
- for i, row in enumerate(rows):
- values = [_adapt(c, row[c]) for c in col_names]
- try:
- new_cur.execute(
- f"INSERT INTO {new_table} ({columns}) VALUES ({placeholders})",
- values)
- count += 1
- except Exception as e:
- if 'duplicate key' not in str(e):
- errors += 1
- if errors <= 3: # 只打印前 3 个错误
- print(f" [!] {new_table} row {row.get('id','?')}: {e}", flush=True)
- if (i + 1) % 50 == 0 or i + 1 == total:
- print(f" {new_table}: {i+1}/{total} ({count} ok, {errors} err)", flush=True)
- print(f" {new_table}: 完成 {count}/{total} 行" + (f" ({errors} 错误)" if errors else ""), flush=True)
- def migrate_junctions(old_cur, new_cur):
- """迁移关联表数据"""
- # 直接复制旧关联表(已有正确数据)
- for table, cols in [
- ('requirement_capability', 'requirement_id, capability_id'),
- ('capability_tool', 'capability_id, tool_id, description'),
- ('capability_knowledge', 'capability_id, knowledge_id'),
- ('tool_knowledge', 'tool_id, knowledge_id'),
- ]:
- _migrate_junction(old_cur, new_cur, table, table, cols)
- # knowledge_resource: 从 knowledge.resource_ids 数组提取
- _migrate_knowledge_resource(old_cur, new_cur)
- # knowledge_relation: 从 knowledge.relationships JSONB 提取
- _migrate_knowledge_relation(old_cur, new_cur)
- # requirement_knowledge: 旧库没有这张表,跳过
- print(" requirement_knowledge: 旧库无数据(新增表)", flush=True)
- # tool_provider: 旧库没有这张表,跳过
- print(" tool_provider: 旧库无数据(新增表)", flush=True)
- def _migrate_junction(old_cur, new_cur, new_table, old_table, cols):
- """复制关联表"""
- new_cur.execute(f"SELECT COUNT(*) FROM {new_table}")
- existing = list(new_cur.fetchone().values())[0]
- if existing > 0:
- print(f" {new_table}: 已有 {existing} 行,跳过", flush=True)
- return
- try:
- old_cur.execute(f"SELECT {cols} FROM {old_table}")
- except Exception as e:
- print(f" {new_table}: 旧表 {old_table} 不可读 ({e}),跳过", flush=True)
- return
- rows = old_cur.fetchall()
- if not rows:
- print(f" {new_table}: 源表为空", flush=True)
- return
- col_names = [c.strip() for c in cols.split(',')]
- placeholders = ', '.join(['%s'] * len(col_names))
- count = 0
- for row in rows:
- values = [_adapt(c, row[c]) for c in col_names]
- try:
- new_cur.execute(
- f"INSERT INTO {new_table} ({cols}) VALUES ({placeholders}) ON CONFLICT DO NOTHING",
- values)
- count += 1
- except Exception as e:
- print(f" [!] {new_table}: {e}", flush=True)
- print(f" {new_table}: {count}/{len(rows)} 行", flush=True)
- def _migrate_knowledge_resource(old_cur, new_cur):
- """从 knowledge.resource_ids 数组提取到 knowledge_resource 关联表"""
- new_cur.execute("SELECT COUNT(*) FROM knowledge_resource")
- if list(new_cur.fetchone().values())[0] > 0:
- print(" knowledge_resource: 已有数据,跳过", flush=True)
- return
- # 预加载有效 resource ID
- new_cur.execute("SELECT id FROM resource")
- valid_resources = {list(r.values())[0] for r in new_cur.fetchall()}
- old_cur.execute("SELECT id, resource_ids FROM knowledge WHERE resource_ids IS NOT NULL")
- rows = old_cur.fetchall()
- count = 0
- skipped = 0
- for row in rows:
- rids = _parse_json(row['resource_ids'])
- if not isinstance(rids, list):
- continue
- for rid in rids:
- if rid in valid_resources:
- try:
- new_cur.execute(
- "INSERT INTO knowledge_resource (knowledge_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
- (row['id'], rid))
- count += 1
- except Exception as e:
- print(f" [!] knowledge_resource: {e}", flush=True)
- else:
- skipped += 1
- print(f" knowledge_resource: {count} 行 (跳过悬空引用 {skipped})", flush=True)
- def _migrate_knowledge_relation(old_cur, new_cur):
- """从 knowledge.relationships JSONB 提取到 knowledge_relation 关联表"""
- new_cur.execute("SELECT COUNT(*) FROM knowledge_relation")
- if list(new_cur.fetchone().values())[0] > 0:
- print(" knowledge_relation: 已有数据,跳过", flush=True)
- return
- old_cur.execute("SELECT id, relationships FROM knowledge WHERE relationships IS NOT NULL")
- rows = old_cur.fetchall()
- count = 0
- for row in rows:
- rels = _parse_json(row['relationships'])
- if not isinstance(rels, list):
- continue
- for rel in rels:
- if isinstance(rel, dict) and 'type' in rel and 'target' in rel:
- try:
- new_cur.execute(
- "INSERT INTO knowledge_relation (source_id, target_id, relation_type) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
- (row['id'], rel['target'], rel['type']))
- count += 1
- except Exception as e:
- print(f" [!] knowledge_relation: {e}", flush=True)
- print(f" knowledge_relation: {count} 行", flush=True)
- # ─── 主流程 ───────────────────────────────────────────────────────────────────
- def main():
- OLD_DB = os.getenv('KNOWHUB_DB_NAME')
- NEW_DB = 'knowhub'
- print("=" * 60, flush=True)
- print(f"迁移: {OLD_DB} → {NEW_DB}", flush=True)
- print("=" * 60, flush=True)
- # Step 1: 建表
- print(f"\n[1/3] 在 {NEW_DB} 建表...", flush=True)
- new_conn = connect(NEW_DB)
- new_cur = new_conn.cursor()
- for sql in CREATE_TABLES:
- new_cur.execute(sql)
- new_cur.close()
- # 验证
- new_cur = new_conn.cursor()
- new_cur.execute("SELECT tablename FROM pg_tables WHERE schemaname='public' ORDER BY tablename")
- tables = [r[0] for r in new_cur.fetchall()]
- print(f" {len(tables)} 张表: {', '.join(tables)}", flush=True)
- new_cur.close()
- # Step 2: 迁移实体数据(每次新建旧库连接,避免长连接超时)
- print(f"\n[2/3] 迁移实体数据...", flush=True)
- old_conn = connect(OLD_DB)
- old_cur = old_conn.cursor(cursor_factory=RealDictCursor)
- new_cur = new_conn.cursor(cursor_factory=RealDictCursor)
- migrate_entities(old_cur, new_cur)
- old_cur.close()
- old_conn.close()
- # Step 3: 迁移关联数据(每批独立连接旧库,防止长连接超时)
- print(f"\n[3/3] 迁移关联数据...", flush=True)
- # 3a: 复制旧关联表
- old_conn = connect(OLD_DB)
- old_cur = old_conn.cursor(cursor_factory=RealDictCursor)
- for table, cols in [
- ('requirement_capability', 'requirement_id, capability_id'),
- ('capability_tool', 'capability_id, tool_id, description'),
- ('capability_knowledge', 'capability_id, knowledge_id'),
- ('tool_knowledge', 'tool_id, knowledge_id'),
- ]:
- _migrate_junction(old_cur, new_cur, table, table, cols)
- old_cur.close()
- old_conn.close()
- # 3b: knowledge_resource(重新连接)
- old_conn = connect(OLD_DB)
- old_cur = old_conn.cursor(cursor_factory=RealDictCursor)
- _migrate_knowledge_resource(old_cur, new_cur)
- old_cur.close()
- old_conn.close()
- # 3c: knowledge_relation(重新连接)
- old_conn = connect(OLD_DB)
- old_cur = old_conn.cursor(cursor_factory=RealDictCursor)
- _migrate_knowledge_relation(old_cur, new_cur)
- old_cur.close()
- old_conn.close()
- # 新增表无旧数据
- print(" requirement_knowledge: 新增表,无旧数据", flush=True)
- print(" tool_provider: 新增表,无旧数据", flush=True)
- # 验证
- print(f"\n{'=' * 60}", flush=True)
- print(f"验证 {NEW_DB}:", flush=True)
- print(f"{'=' * 60}", flush=True)
- for t in tables:
- new_cur.execute(f"SELECT COUNT(*) as c FROM {t}")
- cnt = new_cur.fetchone()['c']
- print(f" {t}: {cnt} rows", flush=True)
- print(f"\n{'=' * 60}", flush=True)
- print("迁移完成!", flush=True)
- print(f"下一步:将 .env 中 KNOWHUB_DB_NAME 改为 {NEW_DB}", flush=True)
- print(f"{'=' * 60}", flush=True)
- new_cur.close()
- new_conn.close()
- if __name__ == '__main__':
- main()
|