migrate_tables_v2.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. #!/usr/bin/env python3
  2. """
  3. 数据库迁移脚本 v2:
  4. 1. 新建 atomic_capability 表
  5. 2. ALTER tool_table: 新增 capabilities, embedding; 重命名 knowledge -> tool_knowledge
  6. 3. ALTER knowledge: 新增 support_capability, tools; 拆分 embedding -> task_embedding + content_embedding
  7. 4. ALTER requirement_table: 大幅重构(删旧列、加新列)
  8. 5. 从 JSON/MD 文件导入初始数据
  9. """
  10. import os
  11. import sys
  12. import json
  13. import time
  14. import psycopg2
  15. from psycopg2.extras import RealDictCursor
  16. from dotenv import load_dotenv
  17. # 显式加载项目根目录的 .env
  18. _script_dir = os.path.dirname(os.path.abspath(__file__))
  19. _project_root = os.path.normpath(os.path.join(_script_dir, '..', '..'))
  20. load_dotenv(os.path.join(_project_root, '.env'))
  21. # ─── 连接 ───────────────────────────────────────────────────────────────────
  22. def get_connection():
  23. host = os.getenv('KNOWHUB_DB')
  24. port = int(os.getenv('KNOWHUB_PORT', 5432))
  25. user = os.getenv('KNOWHUB_USER')
  26. password = os.getenv('KNOWHUB_PASSWORD')
  27. dbname = os.getenv('KNOWHUB_DB_NAME')
  28. print(f" Connecting to {host}:{port}/{dbname} as {user} ...")
  29. conn = psycopg2.connect(
  30. host=host,
  31. port=port,
  32. user=user,
  33. password=password,
  34. database=dbname,
  35. connect_timeout=10
  36. )
  37. conn.autocommit = True
  38. print(" Connected.")
  39. return conn
  40. # ─── 1. 新建 atomic_capability 表 ──────────────────────────────────────────
  41. CREATE_ATOMIC_CAPABILITY = """
  42. CREATE TABLE IF NOT EXISTS atomic_capability (
  43. id VARCHAR(64) PRIMARY KEY,
  44. name VARCHAR(255) NOT NULL,
  45. criterion TEXT,
  46. description TEXT,
  47. requirements JSONB DEFAULT '[]',
  48. implements JSONB DEFAULT '{}',
  49. tools JSONB DEFAULT '[]',
  50. source_knowledge JSONB DEFAULT '[]',
  51. embedding float4[]
  52. ) WITH (appendoptimized=false);
  53. """
  54. # ─── 2. ALTER tool_table ───────────────────────────────────────────────────
  55. ALTER_TOOL_TABLE = [
  56. # 新增 capabilities
  57. "ALTER TABLE tool_table ADD COLUMN IF NOT EXISTS capabilities JSONB DEFAULT '[]'",
  58. # 新增 embedding
  59. "ALTER TABLE tool_table ADD COLUMN IF NOT EXISTS embedding float4[]",
  60. # 重命名 knowledge -> tool_knowledge(需检查是否已重命名)
  61. """
  62. DO $$
  63. BEGIN
  64. IF EXISTS (
  65. SELECT 1 FROM information_schema.columns
  66. WHERE table_name = 'tool_table' AND column_name = 'knowledge'
  67. ) AND NOT EXISTS (
  68. SELECT 1 FROM information_schema.columns
  69. WHERE table_name = 'tool_table' AND column_name = 'tool_knowledge'
  70. ) THEN
  71. ALTER TABLE tool_table RENAME COLUMN knowledge TO tool_knowledge;
  72. END IF;
  73. END $$;
  74. """,
  75. ]
  76. # ─── 3. ALTER knowledge 表 ─────────────────────────────────────────────────
  77. ALTER_KNOWLEDGE = [
  78. # 新增 support_capability
  79. "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS support_capability JSONB DEFAULT '[]'",
  80. # 新增 tools
  81. "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS tools JSONB DEFAULT '[]'",
  82. # 拆分 embedding -> task_embedding + content_embedding
  83. # 先加新列,再迁移数据,最后删旧列
  84. "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS task_embedding float4[]",
  85. "ALTER TABLE knowledge ADD COLUMN IF NOT EXISTS content_embedding float4[]",
  86. # 将原 embedding 数据复制到 content_embedding(原有的 embedding 是基于 content 的)
  87. """
  88. DO $$
  89. BEGIN
  90. IF EXISTS (
  91. SELECT 1 FROM information_schema.columns
  92. WHERE table_name = 'knowledge' AND column_name = 'embedding'
  93. ) THEN
  94. UPDATE knowledge SET content_embedding = embedding
  95. WHERE content_embedding IS NULL AND embedding IS NOT NULL;
  96. ALTER TABLE knowledge DROP COLUMN embedding;
  97. END IF;
  98. END $$;
  99. """,
  100. ]
  101. # ─── 4. ALTER requirement_table ────────────────────────────────────────────
  102. ALTER_REQUIREMENT = [
  103. # 新增字段
  104. "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS description TEXT",
  105. "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS atomics JSONB DEFAULT '[]'",
  106. "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS source_nodes JSONB DEFAULT '[]'",
  107. "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS status VARCHAR(32) DEFAULT '未满足'",
  108. "ALTER TABLE requirement_table ADD COLUMN IF NOT EXISTS match_result TEXT",
  109. # 删除旧字段(检查存在再删)
  110. """
  111. DO $$
  112. BEGIN
  113. IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'task') THEN
  114. ALTER TABLE requirement_table DROP COLUMN task;
  115. END IF;
  116. IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'type') THEN
  117. ALTER TABLE requirement_table DROP COLUMN type;
  118. END IF;
  119. IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'source_type') THEN
  120. ALTER TABLE requirement_table DROP COLUMN source_type;
  121. END IF;
  122. IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'source_itemset_id') THEN
  123. ALTER TABLE requirement_table DROP COLUMN source_itemset_id;
  124. END IF;
  125. IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'source_items') THEN
  126. ALTER TABLE requirement_table DROP COLUMN source_items;
  127. END IF;
  128. IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'tools') THEN
  129. ALTER TABLE requirement_table DROP COLUMN tools;
  130. END IF;
  131. IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'knowledge') THEN
  132. ALTER TABLE requirement_table DROP COLUMN knowledge;
  133. END IF;
  134. IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'case_knowledge') THEN
  135. ALTER TABLE requirement_table DROP COLUMN case_knowledge;
  136. END IF;
  137. IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'process_knowledge') THEN
  138. ALTER TABLE requirement_table DROP COLUMN process_knowledge;
  139. END IF;
  140. IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'trace') THEN
  141. ALTER TABLE requirement_table DROP COLUMN trace;
  142. END IF;
  143. IF EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'requirement_table' AND column_name = 'body') THEN
  144. ALTER TABLE requirement_table DROP COLUMN body;
  145. END IF;
  146. END $$;
  147. """,
  148. # embedding 字段保留,仍为 float4[]
  149. ]
  150. # ─── 5. 导入原子能力初始数据 ──────────────────────────────────────────────
  151. def parse_atomic_capabilities(md_path):
  152. """从 atomic_capabilities.md 解析出原子能力列表"""
  153. caps = []
  154. current = None
  155. with open(md_path, 'r', encoding='utf-8') as f:
  156. lines = f.readlines()
  157. for line in lines:
  158. line = line.rstrip()
  159. # 匹配 ### CAP-XXX: 名称
  160. if line.startswith('### CAP-'):
  161. if current:
  162. caps.append(current)
  163. parts = line.split(':', 1)
  164. cap_id = parts[0].replace('### ', '').strip()
  165. cap_name = parts[1].strip() if len(parts) > 1 else ''
  166. current = {
  167. 'id': cap_id,
  168. 'name': cap_name,
  169. 'criterion': '',
  170. 'description': '',
  171. 'implements': {},
  172. 'tools': [],
  173. 'source_knowledge': [],
  174. 'requirements': [],
  175. }
  176. elif current:
  177. if line.startswith('- **功能描述**:'):
  178. current['description'] = line.replace('- **功能描述**:', '').strip()
  179. elif line.startswith('- **判定标准**:'):
  180. current['criterion'] = line.replace('- **判定标准**:', '').strip()
  181. elif line.startswith(' - ComfyUI:') or line.startswith(' - FLUX') or \
  182. line.startswith(' - Midjourney') or line.startswith(' - Nano Banana') or \
  183. line.startswith(' - Seedream'):
  184. # 提取工具名
  185. tool_name = line.strip().lstrip('- ').split(':')[0].strip()
  186. impl_desc = line.strip().lstrip('- ').split(':', 1)[1].strip() if ':' in line else ''
  187. current['implements'][tool_name] = impl_desc
  188. if current:
  189. caps.append(current)
  190. return caps
  191. def import_atomic_capabilities(cursor, caps):
  192. """插入原子能力数据"""
  193. count = 0
  194. for cap in caps:
  195. try:
  196. cursor.execute("""
  197. INSERT INTO atomic_capability (id, name, criterion, description, requirements, implements, tools, source_knowledge)
  198. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
  199. ON CONFLICT (id) DO UPDATE SET
  200. name = EXCLUDED.name,
  201. criterion = EXCLUDED.criterion,
  202. description = EXCLUDED.description,
  203. implements = EXCLUDED.implements
  204. """, (
  205. cap['id'],
  206. cap['name'],
  207. cap['criterion'],
  208. cap['description'],
  209. json.dumps(cap['requirements']),
  210. json.dumps(cap['implements']),
  211. json.dumps(cap['tools']),
  212. json.dumps(cap['source_knowledge']),
  213. ))
  214. count += 1
  215. except Exception as e:
  216. print(f" [!] 插入 {cap['id']} 失败: {e}")
  217. return count
  218. # ─── 6. 导入需求初始数据 ──────────────────────────────────────────────────
  219. def import_requirements(cursor, json_path):
  220. """从 requirements_sorted.json 导入需求数据"""
  221. with open(json_path, 'r', encoding='utf-8') as f:
  222. data = json.load(f)
  223. reqs = data.get('requirements', [])
  224. count = 0
  225. for req in reqs:
  226. req_id = req['requirement_id']
  227. description = req['requirement_text']
  228. atomics = req.get('matched_capabilities', [])
  229. # 构建 source_nodes:从 source_nodes + source_posts 合成
  230. source_nodes_data = []
  231. for node_name in req.get('source_nodes', []):
  232. source_nodes_data.append({
  233. 'node_name': node_name,
  234. 'posts': req.get('source_posts', [])
  235. })
  236. # 状态映射
  237. match_status = req.get('match_status', '未满足')
  238. if match_status == '完全满足':
  239. status = '已满足'
  240. elif match_status == '需要调研':
  241. status = '未满足'
  242. else:
  243. status = '未满足'
  244. # match_result: 组合 capability_combination + research_note
  245. match_parts = []
  246. if req.get('capability_combination'):
  247. match_parts.append(req['capability_combination'])
  248. if req.get('research_note'):
  249. match_parts.append(req['research_note'])
  250. match_result = '\n'.join(match_parts) if match_parts else ''
  251. try:
  252. cursor.execute("""
  253. INSERT INTO requirement_table (id, description, atomics, source_nodes, status, match_result)
  254. VALUES (%s, %s, %s, %s, %s, %s)
  255. ON CONFLICT (id) DO UPDATE SET
  256. description = EXCLUDED.description,
  257. atomics = EXCLUDED.atomics,
  258. source_nodes = EXCLUDED.source_nodes,
  259. status = EXCLUDED.status,
  260. match_result = EXCLUDED.match_result
  261. """, (
  262. req_id,
  263. description,
  264. json.dumps(atomics),
  265. json.dumps(source_nodes_data),
  266. status,
  267. match_result,
  268. ))
  269. count += 1
  270. except Exception as e:
  271. print(f" [!] 插入 {req_id} 失败: {e}")
  272. return count
  273. # ─── 主流程 ───────────────────────────────────────────────────────────────
  274. def main():
  275. print("=" * 60)
  276. print("KnowHub 数据库迁移 v2")
  277. print("=" * 60)
  278. conn = get_connection()
  279. cursor = conn.cursor(cursor_factory=RealDictCursor)
  280. # Step 1: 新建 atomic_capability 表
  281. print("\n[1/6] 新建 atomic_capability 表...")
  282. try:
  283. cursor.execute(CREATE_ATOMIC_CAPABILITY)
  284. print(" OK: atomic_capability 表创建成功")
  285. except Exception as e:
  286. print(f" FAIL: {e}")
  287. # Step 2: ALTER tool_table
  288. print("\n[2/6] 更新 tool_table...")
  289. for sql in ALTER_TOOL_TABLE:
  290. try:
  291. cursor.execute(sql)
  292. except Exception as e:
  293. print(f" WARN: {e}")
  294. print(" OK: tool_table 更新完成 (capabilities, embedding, tool_knowledge)")
  295. # Step 3: ALTER knowledge
  296. print("\n[3/6] 更新 knowledge 表...")
  297. for sql in ALTER_KNOWLEDGE:
  298. try:
  299. cursor.execute(sql)
  300. except Exception as e:
  301. print(f" WARN: {e}")
  302. print(" OK: knowledge 更新完成 (support_capability, tools, task_embedding, content_embedding)")
  303. # Step 4: ALTER requirement_table
  304. print("\n[4/6] 更新 requirement_table...")
  305. for sql in ALTER_REQUIREMENT:
  306. try:
  307. cursor.execute(sql)
  308. except Exception as e:
  309. print(f" WARN: {e}")
  310. print(" OK: requirement_table 更新完成 (description, atomics, source_nodes, status, match_result)")
  311. # Step 5: 导入原子能力
  312. print("\n[5/6] 导入原子能力数据...")
  313. md_path = os.path.join(os.path.dirname(__file__), '..', '..', 'examples', 'tool_research', 'atomic_capabilities.md')
  314. md_path = os.path.normpath(md_path)
  315. if os.path.exists(md_path):
  316. caps = parse_atomic_capabilities(md_path)
  317. count = import_atomic_capabilities(cursor, caps)
  318. print(f" OK: 导入 {count} 条原子能力")
  319. else:
  320. print(f" SKIP: 文件不存在 {md_path}")
  321. # Step 6: 导入需求数据
  322. print("\n[6/6] 导入需求数据...")
  323. json_path = os.path.join(os.path.dirname(__file__), '..', '..', 'examples', 'tool_research', 'requirements_sorted.json')
  324. json_path = os.path.normpath(json_path)
  325. if os.path.exists(json_path):
  326. count = import_requirements(cursor, json_path)
  327. print(f" OK: 导入 {count} 条需求")
  328. else:
  329. print(f" SKIP: 文件不存在 {json_path}")
  330. # 验证
  331. print("\n" + "=" * 60)
  332. print("验证结果:")
  333. print("=" * 60)
  334. for table in ['atomic_capability', 'tool_table', 'knowledge', 'requirement_table', 'resources']:
  335. try:
  336. cursor.execute(f"SELECT COUNT(*) as count FROM {table}")
  337. row = cursor.fetchone()
  338. count = row['count'] if row else 0
  339. cursor.execute(f"""
  340. SELECT column_name FROM information_schema.columns
  341. WHERE table_name = '{table}'
  342. ORDER BY ordinal_position
  343. """)
  344. cols = [r['column_name'] for r in cursor.fetchall()]
  345. print(f"\n {table} ({count} rows)")
  346. print(f" 字段: {', '.join(cols)}")
  347. except Exception as e:
  348. print(f"\n {table}: ERROR - {e}")
  349. cursor.close()
  350. conn.close()
  351. print("\n迁移完成!")
  352. if __name__ == '__main__':
  353. main()