migrate_to_new_db.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. #!/usr/bin/env python3
  2. """
  3. 迁移到新数据库 knowhub:从 knowledge_hub 读数据,写入 knowhub。
  4. 13 张表:5 实体 + 8 关联。
  5. 只使用 CREATE TABLE / INSERT / DROP TABLE,不执行 ALTER。
  6. 每步幂等,可安全重跑。
  7. """
  8. import os
  9. import json
  10. import psycopg2
  11. from psycopg2.extras import RealDictCursor, Json
  12. from dotenv import load_dotenv
  13. _script_dir = os.path.dirname(os.path.abspath(__file__))
  14. _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..', '..'))
  15. load_dotenv(os.path.join(_project_root, '.env'))
  16. def connect(db_name):
  17. conn = psycopg2.connect(
  18. host=os.getenv('KNOWHUB_DB'),
  19. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  20. user=os.getenv('KNOWHUB_USER'),
  21. password=os.getenv('KNOWHUB_PASSWORD'),
  22. database=db_name,
  23. connect_timeout=10
  24. )
  25. conn.autocommit = True
  26. return conn
  27. # ─── 13 张表的 CREATE 语句 ────────────────────────────────────────────────────
  28. CREATE_TABLES = [
  29. # === 实体表 (5) ===
  30. """
  31. CREATE TABLE IF NOT EXISTS knowledge (
  32. id VARCHAR PRIMARY KEY,
  33. task_embedding float4[],
  34. content_embedding float4[],
  35. message_id VARCHAR,
  36. task VARCHAR,
  37. content TEXT,
  38. types TEXT[],
  39. tags JSONB DEFAULT '{}',
  40. tag_keys TEXT[],
  41. scopes TEXT[],
  42. owner VARCHAR,
  43. source JSONB DEFAULT '{}',
  44. eval JSONB DEFAULT '{}',
  45. created_at BIGINT,
  46. updated_at BIGINT,
  47. status VARCHAR DEFAULT 'approved'
  48. )
  49. """,
  50. """
  51. CREATE TABLE IF NOT EXISTS resource (
  52. id TEXT PRIMARY KEY,
  53. title TEXT,
  54. body TEXT,
  55. secure_body TEXT,
  56. content_type TEXT,
  57. metadata JSONB DEFAULT '{}',
  58. sort_order INTEGER DEFAULT 0,
  59. submitted_by TEXT,
  60. created_at BIGINT,
  61. updated_at BIGINT
  62. )
  63. """,
  64. """
  65. CREATE TABLE IF NOT EXISTS requirement (
  66. id VARCHAR PRIMARY KEY,
  67. description TEXT,
  68. source_nodes JSONB DEFAULT '[]',
  69. status VARCHAR DEFAULT '未满足',
  70. match_result TEXT,
  71. embedding float4[]
  72. )
  73. """,
  74. """
  75. CREATE TABLE IF NOT EXISTS capability (
  76. id VARCHAR PRIMARY KEY,
  77. name VARCHAR,
  78. criterion TEXT,
  79. description TEXT,
  80. embedding float4[]
  81. )
  82. """,
  83. """
  84. CREATE TABLE IF NOT EXISTS tool (
  85. id VARCHAR PRIMARY KEY,
  86. name VARCHAR,
  87. version VARCHAR,
  88. introduction TEXT,
  89. tutorial TEXT,
  90. input JSONB DEFAULT '""',
  91. output JSONB DEFAULT '""',
  92. updated_time BIGINT,
  93. status VARCHAR DEFAULT '未接入',
  94. embedding float4[]
  95. )
  96. """,
  97. # === 实体链 (2) ===
  98. """
  99. CREATE TABLE IF NOT EXISTS requirement_capability (
  100. requirement_id VARCHAR NOT NULL,
  101. capability_id VARCHAR NOT NULL,
  102. PRIMARY KEY (requirement_id, capability_id)
  103. )
  104. """,
  105. """
  106. CREATE TABLE IF NOT EXISTS capability_tool (
  107. capability_id VARCHAR NOT NULL,
  108. tool_id VARCHAR NOT NULL,
  109. description TEXT DEFAULT '',
  110. PRIMARY KEY (capability_id, tool_id)
  111. )
  112. """,
  113. # === 知识链 (3) ===
  114. """
  115. CREATE TABLE IF NOT EXISTS requirement_knowledge (
  116. requirement_id VARCHAR NOT NULL,
  117. knowledge_id VARCHAR NOT NULL,
  118. PRIMARY KEY (requirement_id, knowledge_id)
  119. )
  120. """,
  121. """
  122. CREATE TABLE IF NOT EXISTS capability_knowledge (
  123. capability_id VARCHAR NOT NULL,
  124. knowledge_id VARCHAR NOT NULL,
  125. PRIMARY KEY (capability_id, knowledge_id)
  126. )
  127. """,
  128. """
  129. CREATE TABLE IF NOT EXISTS tool_knowledge (
  130. tool_id VARCHAR NOT NULL,
  131. knowledge_id VARCHAR NOT NULL,
  132. PRIMARY KEY (tool_id, knowledge_id)
  133. )
  134. """,
  135. # === 来源链 (1) ===
  136. """
  137. CREATE TABLE IF NOT EXISTS knowledge_resource (
  138. knowledge_id VARCHAR NOT NULL,
  139. resource_id VARCHAR NOT NULL,
  140. PRIMARY KEY (knowledge_id, resource_id)
  141. )
  142. """,
  143. # === 知识间关系 (1) ===
  144. """
  145. CREATE TABLE IF NOT EXISTS knowledge_relation (
  146. source_id VARCHAR NOT NULL,
  147. target_id VARCHAR NOT NULL,
  148. relation_type VARCHAR NOT NULL,
  149. PRIMARY KEY (source_id, target_id, relation_type)
  150. )
  151. """,
  152. # === 执行层索引 (1) ===
  153. """
  154. CREATE TABLE IF NOT EXISTS tool_provider (
  155. tool_id VARCHAR NOT NULL,
  156. provider_id VARCHAR NOT NULL,
  157. PRIMARY KEY (tool_id, provider_id)
  158. )
  159. """,
  160. ]
  161. # ─── 数据迁移 ─────────────────────────────────────────────────────────────────
  162. def _parse_json(val):
  163. if val is None:
  164. return [] if not isinstance(val, dict) else {}
  165. if isinstance(val, (list, dict)):
  166. return val
  167. try:
  168. return json.loads(val)
  169. except (json.JSONDecodeError, TypeError):
  170. return []
  171. # JSONB 列名集合(值需要 Json() 包装,否则 psycopg2 当纯文本发送导致类型错误)
  172. _JSONB_COLS = {'tags', 'source', 'eval', 'metadata', 'source_nodes', 'input', 'output'}
  173. def _adapt(col_name, v):
  174. """psycopg2 序列化适配:JSONB 列 → Json(),数组列 → 原生 list"""
  175. if v is None:
  176. return None
  177. if col_name in _JSONB_COLS:
  178. return Json(v)
  179. if isinstance(v, dict):
  180. return Json(v) # 未在 _JSONB_COLS 中的 dict(兜底)
  181. return v # float4[] / TEXT[] / 标量
  182. def migrate_entities(old_cur, new_cur):
  183. """迁移 5 张实体表"""
  184. # knowledge: 排除 resource_ids, relationships, support_capability, tools
  185. KNOWLEDGE_COLS = (
  186. 'id, task_embedding, content_embedding, message_id, task, content, '
  187. 'types, tags, tag_keys, scopes, owner, source, eval, '
  188. 'created_at, updated_at, status'
  189. )
  190. _migrate_table(old_cur, new_cur, 'knowledge', 'knowledge', KNOWLEDGE_COLS)
  191. # resource: 旧表叫 resources,新表叫 resource
  192. RESOURCE_COLS = (
  193. 'id, title, body, secure_body, content_type, metadata, '
  194. 'sort_order, submitted_by, created_at, updated_at'
  195. )
  196. _migrate_table(old_cur, new_cur, 'resource', 'resources', RESOURCE_COLS)
  197. # requirement: 排除 atomics
  198. REQ_COLS = 'id, description, source_nodes, status, match_result, embedding'
  199. _migrate_table(old_cur, new_cur, 'requirement', 'requirement', REQ_COLS)
  200. # capability: 排除 requirements, implements, tools, source_knowledge
  201. CAP_COLS = 'id, name, criterion, description, embedding'
  202. _migrate_table(old_cur, new_cur, 'capability', 'capability', CAP_COLS)
  203. # tool: 排除 implemented_tool_ids (旧表已无 JSONB 关联列)
  204. TOOL_COLS = 'id, name, version, introduction, tutorial, input, output, updated_time, status, embedding'
  205. _migrate_table(old_cur, new_cur, 'tool', 'tool', TOOL_COLS)
  206. def _migrate_table(old_cur, new_cur, new_table, old_table, columns):
  207. """通用表迁移:从旧表读指定列,写入新表"""
  208. new_cur.execute(f"SELECT COUNT(*) FROM {new_table}")
  209. existing = list(new_cur.fetchone().values())[0]
  210. if existing > 0:
  211. print(f" {new_table}: 已有 {existing} 行,跳过", flush=True)
  212. return
  213. old_cur.execute(f"SELECT {columns} FROM {old_table}")
  214. rows = old_cur.fetchall()
  215. if not rows:
  216. print(f" {new_table}: 源表为空", flush=True)
  217. return
  218. col_names = [c.strip() for c in columns.split(',')]
  219. placeholders = ', '.join(['%s'] * len(col_names))
  220. total = len(rows)
  221. count = 0
  222. errors = 0
  223. for i, row in enumerate(rows):
  224. values = [_adapt(c, row[c]) for c in col_names]
  225. try:
  226. new_cur.execute(
  227. f"INSERT INTO {new_table} ({columns}) VALUES ({placeholders})",
  228. values)
  229. count += 1
  230. except Exception as e:
  231. if 'duplicate key' not in str(e):
  232. errors += 1
  233. if errors <= 3: # 只打印前 3 个错误
  234. print(f" [!] {new_table} row {row.get('id','?')}: {e}", flush=True)
  235. if (i + 1) % 50 == 0 or i + 1 == total:
  236. print(f" {new_table}: {i+1}/{total} ({count} ok, {errors} err)", flush=True)
  237. print(f" {new_table}: 完成 {count}/{total} 行" + (f" ({errors} 错误)" if errors else ""), flush=True)
  238. def migrate_junctions(old_cur, new_cur):
  239. """迁移关联表数据"""
  240. # 直接复制旧关联表(已有正确数据)
  241. for table, cols in [
  242. ('requirement_capability', 'requirement_id, capability_id'),
  243. ('capability_tool', 'capability_id, tool_id, description'),
  244. ('capability_knowledge', 'capability_id, knowledge_id'),
  245. ('tool_knowledge', 'tool_id, knowledge_id'),
  246. ]:
  247. _migrate_junction(old_cur, new_cur, table, table, cols)
  248. # knowledge_resource: 从 knowledge.resource_ids 数组提取
  249. _migrate_knowledge_resource(old_cur, new_cur)
  250. # knowledge_relation: 从 knowledge.relationships JSONB 提取
  251. _migrate_knowledge_relation(old_cur, new_cur)
  252. # requirement_knowledge: 旧库没有这张表,跳过
  253. print(" requirement_knowledge: 旧库无数据(新增表)", flush=True)
  254. # tool_provider: 旧库没有这张表,跳过
  255. print(" tool_provider: 旧库无数据(新增表)", flush=True)
  256. def _migrate_junction(old_cur, new_cur, new_table, old_table, cols):
  257. """复制关联表"""
  258. new_cur.execute(f"SELECT COUNT(*) FROM {new_table}")
  259. existing = list(new_cur.fetchone().values())[0]
  260. if existing > 0:
  261. print(f" {new_table}: 已有 {existing} 行,跳过", flush=True)
  262. return
  263. try:
  264. old_cur.execute(f"SELECT {cols} FROM {old_table}")
  265. except Exception as e:
  266. print(f" {new_table}: 旧表 {old_table} 不可读 ({e}),跳过", flush=True)
  267. return
  268. rows = old_cur.fetchall()
  269. if not rows:
  270. print(f" {new_table}: 源表为空", flush=True)
  271. return
  272. col_names = [c.strip() for c in cols.split(',')]
  273. placeholders = ', '.join(['%s'] * len(col_names))
  274. count = 0
  275. for row in rows:
  276. values = [_adapt(c, row[c]) for c in col_names]
  277. try:
  278. new_cur.execute(
  279. f"INSERT INTO {new_table} ({cols}) VALUES ({placeholders}) ON CONFLICT DO NOTHING",
  280. values)
  281. count += 1
  282. except Exception as e:
  283. print(f" [!] {new_table}: {e}", flush=True)
  284. print(f" {new_table}: {count}/{len(rows)} 行", flush=True)
  285. def _migrate_knowledge_resource(old_cur, new_cur):
  286. """从 knowledge.resource_ids 数组提取到 knowledge_resource 关联表"""
  287. new_cur.execute("SELECT COUNT(*) FROM knowledge_resource")
  288. if list(new_cur.fetchone().values())[0] > 0:
  289. print(" knowledge_resource: 已有数据,跳过", flush=True)
  290. return
  291. # 预加载有效 resource ID
  292. new_cur.execute("SELECT id FROM resource")
  293. valid_resources = {list(r.values())[0] for r in new_cur.fetchall()}
  294. old_cur.execute("SELECT id, resource_ids FROM knowledge WHERE resource_ids IS NOT NULL")
  295. rows = old_cur.fetchall()
  296. count = 0
  297. skipped = 0
  298. for row in rows:
  299. rids = _parse_json(row['resource_ids'])
  300. if not isinstance(rids, list):
  301. continue
  302. for rid in rids:
  303. if rid in valid_resources:
  304. try:
  305. new_cur.execute(
  306. "INSERT INTO knowledge_resource (knowledge_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING",
  307. (row['id'], rid))
  308. count += 1
  309. except Exception as e:
  310. print(f" [!] knowledge_resource: {e}", flush=True)
  311. else:
  312. skipped += 1
  313. print(f" knowledge_resource: {count} 行 (跳过悬空引用 {skipped})", flush=True)
  314. def _migrate_knowledge_relation(old_cur, new_cur):
  315. """从 knowledge.relationships JSONB 提取到 knowledge_relation 关联表"""
  316. new_cur.execute("SELECT COUNT(*) FROM knowledge_relation")
  317. if list(new_cur.fetchone().values())[0] > 0:
  318. print(" knowledge_relation: 已有数据,跳过", flush=True)
  319. return
  320. old_cur.execute("SELECT id, relationships FROM knowledge WHERE relationships IS NOT NULL")
  321. rows = old_cur.fetchall()
  322. count = 0
  323. for row in rows:
  324. rels = _parse_json(row['relationships'])
  325. if not isinstance(rels, list):
  326. continue
  327. for rel in rels:
  328. if isinstance(rel, dict) and 'type' in rel and 'target' in rel:
  329. try:
  330. new_cur.execute(
  331. "INSERT INTO knowledge_relation (source_id, target_id, relation_type) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING",
  332. (row['id'], rel['target'], rel['type']))
  333. count += 1
  334. except Exception as e:
  335. print(f" [!] knowledge_relation: {e}", flush=True)
  336. print(f" knowledge_relation: {count} 行", flush=True)
  337. # ─── 主流程 ───────────────────────────────────────────────────────────────────
  338. def main():
  339. OLD_DB = os.getenv('KNOWHUB_DB_NAME')
  340. NEW_DB = 'knowhub'
  341. print("=" * 60, flush=True)
  342. print(f"迁移: {OLD_DB} → {NEW_DB}", flush=True)
  343. print("=" * 60, flush=True)
  344. # Step 1: 建表
  345. print(f"\n[1/3] 在 {NEW_DB} 建表...", flush=True)
  346. new_conn = connect(NEW_DB)
  347. new_cur = new_conn.cursor()
  348. for sql in CREATE_TABLES:
  349. new_cur.execute(sql)
  350. new_cur.close()
  351. # 验证
  352. new_cur = new_conn.cursor()
  353. new_cur.execute("SELECT tablename FROM pg_tables WHERE schemaname='public' ORDER BY tablename")
  354. tables = [r[0] for r in new_cur.fetchall()]
  355. print(f" {len(tables)} 张表: {', '.join(tables)}", flush=True)
  356. new_cur.close()
  357. # Step 2: 迁移实体数据(每次新建旧库连接,避免长连接超时)
  358. print(f"\n[2/3] 迁移实体数据...", flush=True)
  359. old_conn = connect(OLD_DB)
  360. old_cur = old_conn.cursor(cursor_factory=RealDictCursor)
  361. new_cur = new_conn.cursor(cursor_factory=RealDictCursor)
  362. migrate_entities(old_cur, new_cur)
  363. old_cur.close()
  364. old_conn.close()
  365. # Step 3: 迁移关联数据(每批独立连接旧库,防止长连接超时)
  366. print(f"\n[3/3] 迁移关联数据...", flush=True)
  367. # 3a: 复制旧关联表
  368. old_conn = connect(OLD_DB)
  369. old_cur = old_conn.cursor(cursor_factory=RealDictCursor)
  370. for table, cols in [
  371. ('requirement_capability', 'requirement_id, capability_id'),
  372. ('capability_tool', 'capability_id, tool_id, description'),
  373. ('capability_knowledge', 'capability_id, knowledge_id'),
  374. ('tool_knowledge', 'tool_id, knowledge_id'),
  375. ]:
  376. _migrate_junction(old_cur, new_cur, table, table, cols)
  377. old_cur.close()
  378. old_conn.close()
  379. # 3b: knowledge_resource(重新连接)
  380. old_conn = connect(OLD_DB)
  381. old_cur = old_conn.cursor(cursor_factory=RealDictCursor)
  382. _migrate_knowledge_resource(old_cur, new_cur)
  383. old_cur.close()
  384. old_conn.close()
  385. # 3c: knowledge_relation(重新连接)
  386. old_conn = connect(OLD_DB)
  387. old_cur = old_conn.cursor(cursor_factory=RealDictCursor)
  388. _migrate_knowledge_relation(old_cur, new_cur)
  389. old_cur.close()
  390. old_conn.close()
  391. # 新增表无旧数据
  392. print(" requirement_knowledge: 新增表,无旧数据", flush=True)
  393. print(" tool_provider: 新增表,无旧数据", flush=True)
  394. # 验证
  395. print(f"\n{'=' * 60}", flush=True)
  396. print(f"验证 {NEW_DB}:", flush=True)
  397. print(f"{'=' * 60}", flush=True)
  398. for t in tables:
  399. new_cur.execute(f"SELECT COUNT(*) as c FROM {t}")
  400. cnt = new_cur.fetchone()['c']
  401. print(f" {t}: {cnt} rows", flush=True)
  402. print(f"\n{'=' * 60}", flush=True)
  403. print("迁移完成!", flush=True)
  404. print(f"下一步:将 .env 中 KNOWHUB_DB_NAME 改为 {NEW_DB}", flush=True)
  405. print(f"{'=' * 60}", flush=True)
  406. new_cur.close()
  407. new_conn.close()
  408. if __name__ == '__main__':
  409. main()