migrate_v3_junction_tables.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. #!/usr/bin/env python3
  2. """
  3. 数据库迁移 v3:JSONB 软关联 → 关联表(junction tables)
  4. 兼容当前混合状态:
  5. - tool_table 已被 RENAME 为 tool(之前的 DDL 部分回滚导致)
  6. - atomic_capability、requirement_table 仍为旧名
  7. - 4 张关联表已创建但为空
  8. 步骤:
  9. 1. 创建 4 张关联表(幂等,已存在则跳过)
  10. 2. 从现有表的 JSONB 字段迁移数据到关联表
  11. 3. 用 CREATE TABLE AS SELECT 创建 capability 和 requirement(变相重命名)
  12. 4. 删除 knowledge 表的 JSONB 关联字段
  13. 5. 删除旧表
  14. """
  15. import os
  16. import json
  17. import psycopg2
  18. from psycopg2.extras import RealDictCursor
  19. from dotenv import load_dotenv
  20. _script_dir = os.path.dirname(os.path.abspath(__file__))
  21. _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..', '..'))
  22. load_dotenv(os.path.join(_project_root, '.env'))
  23. def get_connection():
  24. conn = psycopg2.connect(
  25. host=os.getenv('KNOWHUB_DB'),
  26. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  27. user=os.getenv('KNOWHUB_USER'),
  28. password=os.getenv('KNOWHUB_PASSWORD'),
  29. database=os.getenv('KNOWHUB_DB_NAME'),
  30. connect_timeout=10
  31. )
  32. conn.autocommit = True
  33. return conn
  34. def table_exists(cursor, name):
  35. cursor.execute("SELECT 1 FROM information_schema.tables WHERE table_name = %s", (name,))
  36. return cursor.fetchone() is not None
  37. def column_exists(cursor, table, column):
  38. cursor.execute(
  39. "SELECT 1 FROM information_schema.columns WHERE table_name = %s AND column_name = %s",
  40. (table, column))
  41. return cursor.fetchone() is not None
  42. def resolve_table(cursor, new_name, old_name):
  43. """找到实际可用的表名(处理混合重命名状态)"""
  44. if table_exists(cursor, new_name):
  45. return new_name
  46. if table_exists(cursor, old_name):
  47. return old_name
  48. raise RuntimeError(f"Neither {new_name} nor {old_name} exists!")
  49. # ─── Step 1: 创建关联表 ──────────────────────────────────────────────────────
  50. CREATE_JUNCTION_TABLES = [
  51. """
  52. CREATE TABLE IF NOT EXISTS requirement_capability (
  53. requirement_id VARCHAR NOT NULL,
  54. capability_id VARCHAR NOT NULL,
  55. PRIMARY KEY (requirement_id, capability_id)
  56. )
  57. """,
  58. """
  59. CREATE TABLE IF NOT EXISTS capability_tool (
  60. capability_id VARCHAR NOT NULL,
  61. tool_id VARCHAR NOT NULL,
  62. description TEXT DEFAULT '',
  63. PRIMARY KEY (capability_id, tool_id)
  64. )
  65. """,
  66. """
  67. CREATE TABLE IF NOT EXISTS capability_knowledge (
  68. capability_id VARCHAR NOT NULL,
  69. knowledge_id VARCHAR NOT NULL,
  70. PRIMARY KEY (capability_id, knowledge_id)
  71. )
  72. """,
  73. """
  74. CREATE TABLE IF NOT EXISTS tool_knowledge (
  75. tool_id VARCHAR NOT NULL,
  76. knowledge_id VARCHAR NOT NULL,
  77. PRIMARY KEY (tool_id, knowledge_id)
  78. )
  79. """,
  80. ]
  81. # ─── Step 2: 数据迁移 ────────────────────────────────────────────────────────
  82. def _parse_json(val):
  83. if val is None:
  84. return []
  85. if isinstance(val, (list, dict)):
  86. return val
  87. try:
  88. return json.loads(val)
  89. except (json.JSONDecodeError, TypeError):
  90. return []
  91. def _insert_junction(cursor, table, col_a, col_b, val_a, val_b, extra_cols=None):
  92. if extra_cols:
  93. cols = f"{col_a}, {col_b}, {', '.join(extra_cols.keys())}"
  94. placeholders = ', '.join(['%s'] * (2 + len(extra_cols)))
  95. values = [val_a, val_b] + list(extra_cols.values())
  96. else:
  97. cols = f"{col_a}, {col_b}"
  98. placeholders = '%s, %s'
  99. values = [val_a, val_b]
  100. cursor.execute(
  101. f"INSERT INTO {table} ({cols}) VALUES ({placeholders}) ON CONFLICT DO NOTHING",
  102. values
  103. )
  104. def migrate_data(cursor, tool_tbl, cap_tbl, req_tbl):
  105. """从 JSONB 字段迁移数据到关联表。使用实际表名。"""
  106. skipped = {'dangling': [], 'implements_unmatched': []}
  107. # 预加载有效 ID
  108. print(" 加载 ID...", flush=True)
  109. cursor.execute(f"SELECT id FROM {req_tbl}")
  110. valid_reqs = {r['id'] for r in cursor.fetchall()}
  111. cursor.execute(f"SELECT id FROM {cap_tbl}")
  112. valid_caps = {r['id'] for r in cursor.fetchall()}
  113. cursor.execute(f"SELECT id, name FROM {tool_tbl}")
  114. tool_rows = cursor.fetchall()
  115. valid_tools = {r['id'] for r in tool_rows}
  116. cursor.execute("SELECT id FROM knowledge")
  117. valid_knowledge = {r['id'] for r in cursor.fetchall()}
  118. print(f" OK: {len(valid_reqs)} reqs, {len(valid_caps)} caps, {len(valid_tools)} tools, {len(valid_knowledge)} knowledge", flush=True)
  119. tool_name_to_id = {}
  120. for r in tool_rows:
  121. if r['name']:
  122. tool_name_to_id[r['name'].lower().strip()] = r['id']
  123. # ── requirement_capability ──
  124. print(" requirement_capability...", flush=True)
  125. cursor.execute(f"SELECT id, atomics FROM {req_tbl}")
  126. for row in cursor.fetchall():
  127. for cap_id in _parse_json(row['atomics']):
  128. if cap_id in valid_caps:
  129. _insert_junction(cursor, 'requirement_capability',
  130. 'requirement_id', 'capability_id', row['id'], cap_id)
  131. else:
  132. skipped['dangling'].append(f"req_cap: req={row['id']} → cap={cap_id}")
  133. cursor.execute(f"SELECT id, requirements FROM {cap_tbl}")
  134. for row in cursor.fetchall():
  135. for req_id in _parse_json(row['requirements']):
  136. if req_id in valid_reqs:
  137. _insert_junction(cursor, 'requirement_capability',
  138. 'requirement_id', 'capability_id', req_id, row['id'])
  139. else:
  140. skipped['dangling'].append(f"req_cap: req={req_id} → cap={row['id']}")
  141. # ── capability_tool ──
  142. print(" capability_tool...", flush=True)
  143. cursor.execute(f"SELECT id, tools, implements FROM {cap_tbl}")
  144. for row in cursor.fetchall():
  145. tools_list = _parse_json(row['tools'])
  146. implements_dict = _parse_json(row['implements'])
  147. if not isinstance(implements_dict, dict):
  148. implements_dict = {}
  149. impl_by_tool_id = {}
  150. for tool_name, desc in implements_dict.items():
  151. key = tool_name.lower().strip()
  152. matched_id = tool_name_to_id.get(key)
  153. if not matched_id:
  154. for stored_name, stored_id in tool_name_to_id.items():
  155. if key in stored_name or stored_name in key:
  156. matched_id = stored_id
  157. break
  158. if matched_id:
  159. impl_by_tool_id[matched_id] = desc
  160. else:
  161. skipped['implements_unmatched'].append(
  162. f"cap={row['id']}: {tool_name} = {desc[:80]}")
  163. for tool_id in tools_list:
  164. if tool_id in valid_tools:
  165. desc = impl_by_tool_id.pop(tool_id, '')
  166. _insert_junction(cursor, 'capability_tool',
  167. 'capability_id', 'tool_id', row['id'], tool_id,
  168. extra_cols={'description': desc})
  169. else:
  170. skipped['dangling'].append(f"cap_tool: cap={row['id']} → tool={tool_id}")
  171. for tool_id, desc in impl_by_tool_id.items():
  172. if tool_id in valid_tools:
  173. _insert_junction(cursor, 'capability_tool',
  174. 'capability_id', 'tool_id', row['id'], tool_id,
  175. extra_cols={'description': desc})
  176. # 反向:tool.capabilities(如果列还在)
  177. if column_exists(cursor, tool_tbl, 'capabilities'):
  178. cursor.execute(f"SELECT id, capabilities FROM {tool_tbl}")
  179. for row in cursor.fetchall():
  180. for cap_id in _parse_json(row['capabilities']):
  181. if cap_id in valid_caps:
  182. _insert_junction(cursor, 'capability_tool',
  183. 'capability_id', 'tool_id', cap_id, row['id'])
  184. else:
  185. skipped['dangling'].append(f"cap_tool: cap={cap_id} → tool={row['id']}")
  186. else:
  187. print(" (tool.capabilities 列已丢失,仅从 capability 侧迁移)", flush=True)
  188. # ── capability_knowledge ──
  189. print(" capability_knowledge...", flush=True)
  190. cursor.execute(f"SELECT id, source_knowledge FROM {cap_tbl}")
  191. for row in cursor.fetchall():
  192. for kid in _parse_json(row['source_knowledge']):
  193. if kid in valid_knowledge:
  194. _insert_junction(cursor, 'capability_knowledge',
  195. 'capability_id', 'knowledge_id', row['id'], kid)
  196. else:
  197. skipped['dangling'].append(f"cap_know: cap={row['id']} → k={kid}")
  198. cursor.execute("SELECT id, support_capability FROM knowledge")
  199. for row in cursor.fetchall():
  200. for cap_id in _parse_json(row['support_capability']):
  201. if cap_id in valid_caps:
  202. _insert_junction(cursor, 'capability_knowledge',
  203. 'capability_id', 'knowledge_id', cap_id, row['id'])
  204. else:
  205. skipped['dangling'].append(f"cap_know: cap={cap_id} → k={row['id']}")
  206. # ── tool_knowledge ──
  207. print(" tool_knowledge...", flush=True)
  208. # 正向:tool.*_knowledge(如果列还在)
  209. if column_exists(cursor, tool_tbl, 'tool_knowledge'):
  210. cursor.execute(f"SELECT id, tool_knowledge, case_knowledge, process_knowledge FROM {tool_tbl}")
  211. for row in cursor.fetchall():
  212. all_kids = set()
  213. for field in ('tool_knowledge', 'case_knowledge', 'process_knowledge'):
  214. all_kids.update(_parse_json(row[field]))
  215. for kid in all_kids:
  216. if kid in valid_knowledge:
  217. _insert_junction(cursor, 'tool_knowledge',
  218. 'tool_id', 'knowledge_id', row['id'], kid)
  219. else:
  220. skipped['dangling'].append(f"tool_know: tool={row['id']} → k={kid}")
  221. else:
  222. print(" (tool.*_knowledge 列已丢失,仅从 knowledge 侧迁移)", flush=True)
  223. # 反向:knowledge.tools
  224. cursor.execute("SELECT id, tools FROM knowledge")
  225. for row in cursor.fetchall():
  226. for tool_id in _parse_json(row['tools']):
  227. if tool_id in valid_tools:
  228. _insert_junction(cursor, 'tool_knowledge',
  229. 'tool_id', 'knowledge_id', tool_id, row['id'])
  230. else:
  231. skipped['dangling'].append(f"tool_know: tool={tool_id} → k={row['id']}")
  232. return skipped
  233. # ─── 主流程 ───────────────────────────────────────────────────────────────────
  234. def main():
  235. print("=" * 60)
  236. print("KnowHub 数据库迁移 v3: JSONB 软关联 → 关联表")
  237. print("=" * 60)
  238. conn = get_connection()
  239. cursor = conn.cursor(cursor_factory=RealDictCursor)
  240. # 探测实际表名(处理混合重命名状态)
  241. print("\n[0] 探测表名...")
  242. tool_tbl = resolve_table(cursor, 'tool', 'tool_table')
  243. cap_tbl = resolve_table(cursor, 'capability', 'atomic_capability')
  244. req_tbl = resolve_table(cursor, 'requirement', 'requirement_table')
  245. print(f" tool: {tool_tbl}")
  246. print(f" capability: {cap_tbl}")
  247. print(f" requirement: {req_tbl}")
  248. # Step 1: 创建关联表
  249. print("\n[1/5] 创建关联表...")
  250. for sql in CREATE_JUNCTION_TABLES:
  251. cursor.execute(sql)
  252. print(" OK")
  253. # Step 2: 迁移 JSONB 数据
  254. print("\n[2/5] 迁移 JSONB 数据到关联表...")
  255. skipped = migrate_data(cursor, tool_tbl, cap_tbl, req_tbl)
  256. if skipped['dangling']:
  257. print(f"\n [WARN] 跳过悬空引用 {len(skipped['dangling'])} 条:")
  258. for s in skipped['dangling'][:30]:
  259. print(f" - {s}")
  260. if len(skipped['dangling']) > 30:
  261. print(f" ... 还有 {len(skipped['dangling']) - 30} 条")
  262. if skipped['implements_unmatched']:
  263. print(f"\n [WARN] implements 未匹配 {len(skipped['implements_unmatched'])} 条:")
  264. for s in skipped['implements_unmatched']:
  265. print(f" - {s}")
  266. print("\n 关联表行数:")
  267. for t in ('requirement_capability', 'capability_tool', 'capability_knowledge', 'tool_knowledge'):
  268. cursor.execute(f"SELECT COUNT(*) as count FROM {t}")
  269. print(f" {t}: {cursor.fetchone()['count']}")
  270. # Step 3: 创建新表(对需要重命名的表用 CREATE TABLE AS SELECT)
  271. print("\n[3/5] 创建新表...")
  272. # tool 已经是新名了,只需删除 JSONB 关联列
  273. if tool_tbl == 'tool':
  274. print(" tool: 已是新名,删除 JSONB 列...")
  275. for col in ('capabilities', 'tool_knowledge', 'case_knowledge', 'process_knowledge'):
  276. if column_exists(cursor, 'tool', col):
  277. cursor.execute(f"ALTER TABLE tool DROP COLUMN {col}")
  278. print(f" DROP tool.{col}")
  279. else:
  280. # tool_table → tool via copy
  281. if not table_exists(cursor, 'tool'):
  282. cursor.execute(f"""
  283. CREATE TABLE tool AS SELECT
  284. id, name, version, introduction, tutorial, input, output,
  285. updated_time, status, embedding, implemented_tool_ids
  286. FROM tool_table
  287. """)
  288. cursor.execute("ALTER TABLE tool ADD PRIMARY KEY (id)")
  289. cursor.execute("SELECT COUNT(*) as count FROM tool")
  290. print(f" tool_table → tool: {cursor.fetchone()['count']} rows")
  291. else:
  292. print(" tool: 已存在,跳过")
  293. # atomic_capability → capability
  294. if cap_tbl == 'capability':
  295. print(" capability: 已是新名,删除 JSONB 列...")
  296. for col in ('requirements', 'implements', 'tools', 'source_knowledge'):
  297. if column_exists(cursor, 'capability', col):
  298. cursor.execute(f"ALTER TABLE capability DROP COLUMN {col}")
  299. print(f" DROP capability.{col}")
  300. else:
  301. if not table_exists(cursor, 'capability'):
  302. cursor.execute(f"""
  303. CREATE TABLE capability AS SELECT
  304. id, name, criterion, description, embedding
  305. FROM atomic_capability
  306. """)
  307. cursor.execute("ALTER TABLE capability ADD PRIMARY KEY (id)")
  308. cursor.execute("SELECT COUNT(*) as count FROM capability")
  309. print(f" atomic_capability → capability: {cursor.fetchone()['count']} rows")
  310. else:
  311. print(" capability: 已存在,跳过")
  312. # requirement_table → requirement
  313. if req_tbl == 'requirement':
  314. print(" requirement: 已是新名,删除 JSONB 列...")
  315. for col in ('atomics',):
  316. if column_exists(cursor, 'requirement', col):
  317. cursor.execute(f"ALTER TABLE requirement DROP COLUMN {col}")
  318. print(f" DROP requirement.{col}")
  319. else:
  320. if not table_exists(cursor, 'requirement'):
  321. cursor.execute(f"""
  322. CREATE TABLE requirement AS SELECT
  323. id, description, source_nodes, status, match_result, embedding
  324. FROM requirement_table
  325. """)
  326. cursor.execute("ALTER TABLE requirement ADD PRIMARY KEY (id)")
  327. cursor.execute("SELECT COUNT(*) as count FROM requirement")
  328. print(f" requirement_table → requirement: {cursor.fetchone()['count']} rows")
  329. else:
  330. print(" requirement: 已存在,跳过")
  331. # Step 4: 删除 knowledge 的 JSONB 关联字段
  332. print("\n[4/5] 删除 knowledge 表的 JSONB 关联字段...")
  333. for col in ('support_capability', 'tools'):
  334. if column_exists(cursor, 'knowledge', col):
  335. cursor.execute(f"ALTER TABLE knowledge DROP COLUMN {col}")
  336. print(f" DROP knowledge.{col}")
  337. else:
  338. print(f" knowledge.{col} 已不存在")
  339. # Step 5: 删除旧表
  340. print("\n[5/5] 删除旧表...")
  341. for old_name, new_name in [('tool_table', 'tool'), ('atomic_capability', 'capability'), ('requirement_table', 'requirement')]:
  342. if old_name == new_name:
  343. continue
  344. if table_exists(cursor, old_name) and table_exists(cursor, new_name):
  345. cursor.execute(f"DROP TABLE {old_name}")
  346. print(f" DROP {old_name}")
  347. elif not table_exists(cursor, old_name):
  348. print(f" {old_name} 已不存在")
  349. else:
  350. print(f" [!] {new_name} 不存在,保留 {old_name}")
  351. # 最终验证
  352. print("\n" + "=" * 60)
  353. print("最终表结构:")
  354. print("=" * 60)
  355. for t in ['knowledge', 'tool', 'capability', 'requirement', 'resources',
  356. 'requirement_capability', 'capability_tool', 'capability_knowledge', 'tool_knowledge']:
  357. try:
  358. cursor.execute(f"""
  359. SELECT column_name FROM information_schema.columns
  360. WHERE table_name = %s ORDER BY ordinal_position
  361. """, (t,))
  362. cols = [r['column_name'] for r in cursor.fetchall()]
  363. cursor.execute(f"SELECT COUNT(*) as count FROM {t}")
  364. count = cursor.fetchone()['count']
  365. print(f"\n {t} ({count} rows)")
  366. print(f" {', '.join(cols)}")
  367. except Exception as e:
  368. print(f"\n {t}: ERROR - {e}")
  369. print("\n" + "=" * 60)
  370. print("迁移成功!")
  371. print("=" * 60)
  372. cursor.close()
  373. conn.close()
  374. if __name__ == '__main__':
  375. main()