phase4_5_migrate.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467
  1. #!/usr/bin/env python3
  2. """
  3. Phase 4 + Phase 5 迁移:具体 strategy → 抽象 pattern + knowledge 实例
  4. 步骤:
  5. 1. Preflight:classifier 全覆盖验证
  6. 2. 插入 26 个抽象 strategy(P01..P27,P18 跳过因 0 成员)
  7. 3. 为每条具体 strategy 生成 knowledge 实例 + 写 4 类 junction
  8. - requirement_knowledge (req, knowledge_id, is_selected, coverage_score, coverage_explanation)
  9. - strategy_knowledge (abstract_strategy_id, knowledge_id)
  10. - knowledge_resource (knowledge_id, resource_id) ← 从 req 的 resource 联合获取
  11. 4. 重建 strategy_capability:抽象 strategy ↔ 成员 cap 联合
  12. 5. 重建 strategy_resource:抽象 strategy ↔ 成员 resource 联合
  13. 6. 重建 requirement_strategy:req ↔ 抽象 strategy(含 metadata,dedup)
  14. 7. 删除具体 strategy 及其残余 junction
  15. 幂等与安全:
  16. - bk_20260422_* 快照已在
  17. - autocommit=True
  18. - 每阶段独立完成 + 验证,失败可重跑(已处理的会被 DELETE 前检查)
  19. - version='howard_strategy_instance' 是唯一标记,与老 knowledge (v0) 物理隔离
  20. """
  21. import argparse
  22. import hashlib
  23. import json
  24. import sys
  25. import time
  26. from pathlib import Path
  27. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  28. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  29. from knowhub.scripts.abstract_patterns import (
  30. PATTERNS, PATTERN_NAME, PATTERN_DESC, classify,
  31. )
  32. NEW_VERSION = 'howard_strategy_instance'
  33. MIGRATION_DATE = '2026-04-22'
  34. def hash8(text):
  35. return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8]
  36. def abstract_strategy_id(pid):
  37. """P01 -> strategy-abstract-p01"""
  38. return f'strategy-abstract-{pid.lower()}'
  39. def gen_knowledge_id(req_id, original_strat_id):
  40. h = hash8(f'{req_id}|{original_strat_id}')
  41. return f'knowledge-stratinst-{h}'
  42. # ═══════════════════════════════════════════════════════════
  43. def coverage_to_eval_score(cov):
  44. """coverage_score → eval.score"""
  45. if cov is None: return None
  46. if cov >= 0.9: return 6
  47. if cov >= 0.8: return 5
  48. if cov >= 0.6: return 4
  49. if cov >= 0.4: return 3
  50. return 2
  51. # ═══════════════════════════════════════════════════════════
  52. def build_knowledge_content(strat_row, req_desc, pattern_id, pattern_name):
  53. """拼出 req-specific knowledge 的 content 文本"""
  54. body = strat_row['body'] if isinstance(strat_row['body'], dict) else json.loads(strat_row['body'] or '{}')
  55. parts = []
  56. parts.append(f'## 需求\n{req_desc}\n')
  57. parts.append(f'## 本具体方案\n**{strat_row["name"]}**')
  58. parts.append(f'(归属抽象套路:{pattern_id} · {pattern_name})\n')
  59. if body.get('reasoning'):
  60. parts.append(f'## 选择理由\n{body["reasoning"]}\n')
  61. if body.get('why_not'):
  62. parts.append(f'## 为何不选备选\n{body["why_not"]}\n')
  63. if body.get('could_switch_if'):
  64. parts.append(f'## 可切换到备选的条件\n{body["could_switch_if"]}\n')
  65. hl = body.get('highlight_coverage')
  66. if isinstance(hl, list) and hl:
  67. parts.append('## 独特优势(highlight_coverage)')
  68. for x in hl: parts.append(f'- {x}')
  69. parts.append('')
  70. bl = body.get('baseline_coverage')
  71. if isinstance(bl, list) and bl:
  72. parts.append('## 基础覆盖(baseline_coverage)')
  73. for x in bl: parts.append(f'- {x}')
  74. parts.append('')
  75. wo = body.get('workflow_outline')
  76. if isinstance(wo, list) and wo:
  77. parts.append('## 执行步骤(req-specific phases)')
  78. for i, ph in enumerate(wo, 1):
  79. if not isinstance(ph, dict): continue
  80. parts.append(f'**Phase {i}:{ph.get("phase","")}**')
  81. parts.append(f'{ph.get("description","")}')
  82. caps = ph.get('capabilities', [])
  83. if caps:
  84. names = []
  85. for c in caps:
  86. if isinstance(c, dict):
  87. names.append(f'{c.get("id","?")} {c.get("name","")}')
  88. if names:
  89. parts.append(f'_能力使用:{"、".join(names)}_')
  90. parts.append('')
  91. src = body.get('source')
  92. if isinstance(src, list) and src:
  93. parts.append('## 源案例引用')
  94. for s in src[:12]:
  95. if isinstance(s, dict):
  96. parts.append(f'- {s.get("id","")}: {s.get("title","")}')
  97. elif isinstance(s, str):
  98. parts.append(f'- {s}')
  99. parts.append('')
  100. cov = body.get('coverage_score')
  101. cov_exp = body.get('coverage_explanation')
  102. if cov is not None:
  103. parts.append(f'## 覆盖度评分\ncoverage_score = **{cov}**')
  104. if cov_exp:
  105. parts.append(f'\nLLM 解释:{cov_exp}')
  106. return '\n'.join(parts)
  107. # ═══════════════════════════════════════════════════════════
  108. def step1_insert_abstract_strategies(cur, stats):
  109. print('\n=== Step 1: 插入 26 个抽象 strategy ===', flush=True)
  110. now = int(time.time())
  111. for pid, name, cat, desc in PATTERNS:
  112. # skip P18 (0 members)
  113. if pid == 'P18': continue
  114. aid = abstract_strategy_id(pid)
  115. # idempotent: DELETE + INSERT
  116. cur.execute('DELETE FROM strategy WHERE id = %s', (aid,))
  117. body = {
  118. 'pattern_id': pid,
  119. 'pattern_category': cat,
  120. 'abstract_name': name,
  121. 'description': desc,
  122. 'migration_note': f'抽象套路 pattern; 在 Phase 4/5 迁移中从具体 strategy 聚合而来',
  123. 'migrated_at': MIGRATION_DATE,
  124. }
  125. cur.execute("""INSERT INTO strategy (id, name, description, body, status,
  126. created_at, updated_at, version)
  127. VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
  128. (aid, name, desc,
  129. json.dumps(body, ensure_ascii=False),
  130. 'approved', now, now, NEW_VERSION))
  131. stats['abstract_inserted'] += 1
  132. print(f' inserted: {stats["abstract_inserted"]}', flush=True)
  133. def step2_create_knowledge_and_junctions(cur, stats):
  134. print('\n=== Step 2: 为每条具体 strategy 生成 knowledge + junctions ===', flush=True)
  135. cur.execute("""SELECT s.id, s.name, s.body, rs.requirement_id,
  136. rs.is_selected, rs.coverage_score, rs.coverage_explanation,
  137. r.description AS req_desc
  138. FROM strategy s
  139. JOIN requirement_strategy rs ON rs.strategy_id = s.id
  140. JOIN requirement r ON r.id = rs.requirement_id
  141. WHERE s.version = 'howard_dedup'""") # 只处理具体 strategy(abstract 的 version 是 new_version)
  142. rows = cur.fetchall()
  143. print(f' concrete strategies to migrate: {len(rows)}', flush=True)
  144. now = int(time.time())
  145. for i, row in enumerate(rows):
  146. sid = row['id']; req_id = row['requirement_id']
  147. is_sel = row['is_selected']; cov = row['coverage_score']; cov_exp = row['coverage_explanation']
  148. # classify
  149. pid = classify(sid, req_id, is_sel)
  150. if pid is None:
  151. stats['unmapped'].append(sid)
  152. continue
  153. aid = abstract_strategy_id(pid)
  154. kid = gen_knowledge_id(req_id, sid)
  155. # build content
  156. content = build_knowledge_content(row, row['req_desc'], pid, PATTERN_NAME[pid])
  157. task = f'{row["req_desc"][:80]} — {PATTERN_NAME[pid]}'
  158. tags = {
  159. 'kind': 'req_specific_strategy_execution',
  160. 'original_strategy_name': row['name'],
  161. 'pattern_id': pid,
  162. 'pattern_name': PATTERN_NAME[pid],
  163. }
  164. source = {
  165. 'category': 'strategy_migration_req_specific',
  166. 'agent_id': 'strategy_migration_2026-04-22',
  167. 'timestamp': f'{MIGRATION_DATE}T00:00:00+00:00',
  168. 'original_strategy_id': sid,
  169. }
  170. es_score = coverage_to_eval_score(cov)
  171. eval_obj = {'score': es_score, 'harmful': 0, 'helpful': 1, 'confidence': cov or 0.5}
  172. # Insert knowledge
  173. cur.execute('DELETE FROM knowledge WHERE id = %s', (kid,))
  174. cur.execute("""INSERT INTO knowledge
  175. (id, message_id, task, content, types, tags, tag_keys,
  176. scopes, owner, source, eval, created_at, updated_at,
  177. status, version)
  178. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
  179. %s, %s, %s, %s)""",
  180. (kid, '', task, content,
  181. ['strategy', 'execution_instance'],
  182. json.dumps(tags, ensure_ascii=False),
  183. list(tags.keys()),
  184. ['org:cybertogether'],
  185. 'agent:strategy_migration',
  186. json.dumps(source, ensure_ascii=False),
  187. json.dumps(eval_obj, ensure_ascii=False),
  188. now, now, 'approved', NEW_VERSION))
  189. stats['knowledge_inserted'] += 1
  190. # requirement_knowledge
  191. cur.execute("""DELETE FROM requirement_knowledge
  192. WHERE requirement_id = %s AND knowledge_id = %s""", (req_id, kid))
  193. cur.execute("""INSERT INTO requirement_knowledge
  194. (requirement_id, knowledge_id, is_selected,
  195. coverage_score, coverage_explanation)
  196. VALUES (%s, %s, %s, %s, %s)""",
  197. (req_id, kid, is_sel, cov, cov_exp))
  198. stats['rk_inserted'] += 1
  199. # strategy_knowledge (abstract -> knowledge)
  200. cur.execute("""DELETE FROM strategy_knowledge
  201. WHERE strategy_id = %s AND knowledge_id = %s""", (aid, kid))
  202. cur.execute("""INSERT INTO strategy_knowledge
  203. (strategy_id, knowledge_id, relation_type)
  204. VALUES (%s, %s, 'execution_instance')""", (aid, kid))
  205. stats['sk_inserted'] += 1
  206. # knowledge_resource: 该 knowledge 的 req 关联的所有 resource
  207. # 注:这是 req 级别的联合,非严格的 knowledge-specific 引用。
  208. # 更精细的引用可在 Phase 6 从 body.source 逐案匹配(后续优化)
  209. cur.execute("""SELECT resource_id FROM requirement_resource
  210. WHERE requirement_id = %s""", (req_id,))
  211. res_ids = [r['resource_id'] for r in cur.fetchall()]
  212. for rid in res_ids:
  213. cur.execute("""INSERT INTO knowledge_resource (knowledge_id, resource_id)
  214. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (kid, rid))
  215. stats['kr_inserted'] += len(res_ids)
  216. if (i + 1) % 20 == 0:
  217. print(f' processed {i+1}/{len(rows)}', flush=True)
  218. def step3_rebuild_strategy_capability(cur, stats):
  219. print('\n=== Step 3: 重建 strategy_capability(抽象 strategy ↔ cap 联合)===', flush=True)
  220. # 先收集 each pattern 的 cap union
  221. cur.execute("""SELECT sc.capability_id, sc.strategy_id, rs.requirement_id, rs.is_selected, s.id AS sid
  222. FROM strategy_capability sc
  223. JOIN strategy s ON s.id = sc.strategy_id
  224. JOIN requirement_strategy rs ON rs.strategy_id = s.id
  225. WHERE s.version = 'howard_dedup'""")
  226. rows = cur.fetchall()
  227. pattern_caps = {} # pid -> set(cap_id)
  228. for r in rows:
  229. pid = classify(r['sid'], r['requirement_id'], r['is_selected'])
  230. if not pid: continue
  231. pattern_caps.setdefault(pid, set()).add(r['capability_id'])
  232. # 删除老的 strategy_capability (concrete)
  233. cur.execute("""DELETE FROM strategy_capability
  234. WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""")
  235. print(f' deleted concrete strategy_capability', flush=True)
  236. # 插入 abstract strategy_capability
  237. total_ins = 0
  238. for pid, caps in pattern_caps.items():
  239. if pid == 'P18': continue
  240. aid = abstract_strategy_id(pid)
  241. for cap_id in caps:
  242. cur.execute("""INSERT INTO strategy_capability
  243. (strategy_id, capability_id, relation_type)
  244. VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""",
  245. (aid, cap_id))
  246. total_ins += 1
  247. stats['abstract_strat_cap'] = total_ins
  248. print(f' inserted abstract strategy_capability: {total_ins}', flush=True)
  249. def step4_rebuild_strategy_resource(cur, stats):
  250. print('\n=== Step 4: 重建 strategy_resource(抽象 strategy ↔ resource 联合)===', flush=True)
  251. cur.execute("""SELECT sr.resource_id, s.id AS sid, rs.requirement_id, rs.is_selected
  252. FROM strategy_resource sr
  253. JOIN strategy s ON s.id = sr.strategy_id
  254. JOIN requirement_strategy rs ON rs.strategy_id = s.id
  255. WHERE s.version = 'howard_dedup'""")
  256. rows = cur.fetchall()
  257. pattern_res = {}
  258. for r in rows:
  259. pid = classify(r['sid'], r['requirement_id'], r['is_selected'])
  260. if not pid: continue
  261. pattern_res.setdefault(pid, set()).add(r['resource_id'])
  262. cur.execute("""DELETE FROM strategy_resource
  263. WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""")
  264. print(f' deleted concrete strategy_resource', flush=True)
  265. total_ins = 0
  266. for pid, res_set in pattern_res.items():
  267. if pid == 'P18': continue
  268. aid = abstract_strategy_id(pid)
  269. for rid in res_set:
  270. cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
  271. VALUES (%s, %s) ON CONFLICT DO NOTHING""", (aid, rid))
  272. total_ins += 1
  273. stats['abstract_strat_res'] = total_ins
  274. print(f' inserted abstract strategy_resource: {total_ins}', flush=True)
  275. def step5_rebuild_requirement_strategy(cur, stats):
  276. print('\n=== Step 5: 重建 requirement_strategy (req ↔ abstract, dedup) ===', flush=True)
  277. # 先抓取所有具体 req_strat 记录
  278. cur.execute("""SELECT rs.requirement_id, rs.strategy_id, rs.is_selected,
  279. rs.coverage_score, rs.coverage_explanation,
  280. s.version
  281. FROM requirement_strategy rs
  282. JOIN strategy s ON s.id = rs.strategy_id
  283. WHERE s.version = 'howard_dedup'""")
  284. concrete_rows = cur.fetchall()
  285. # 聚合到 (req, abstract_id):is_selected=ANY 成员 selected;coverage 取 max
  286. agg = {} # (req, abstract_id) -> {is_selected, coverage_score, coverage_explanation}
  287. for r in concrete_rows:
  288. pid = classify(r['strategy_id'], r['requirement_id'], r['is_selected'])
  289. if not pid: continue
  290. aid = abstract_strategy_id(pid)
  291. key = (r['requirement_id'], aid)
  292. cur_data = agg.get(key, {'is_selected': False, 'coverage_score': None,
  293. 'coverage_explanation': None})
  294. if r['is_selected']:
  295. cur_data['is_selected'] = True
  296. # coverage: 取最高分的
  297. if r['coverage_score'] is not None:
  298. if cur_data['coverage_score'] is None or r['coverage_score'] > cur_data['coverage_score']:
  299. cur_data['coverage_score'] = r['coverage_score']
  300. cur_data['coverage_explanation'] = r['coverage_explanation']
  301. agg[key] = cur_data
  302. # 删除老的具体 req_strategy 行
  303. cur.execute("""DELETE FROM requirement_strategy
  304. WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""")
  305. print(f' deleted concrete requirement_strategy rows', flush=True)
  306. # 插入抽象 req_strategy 行
  307. for (req_id, aid), m in agg.items():
  308. cur.execute("""DELETE FROM requirement_strategy
  309. WHERE requirement_id = %s AND strategy_id = %s""", (req_id, aid))
  310. cur.execute("""INSERT INTO requirement_strategy
  311. (requirement_id, strategy_id, is_selected,
  312. coverage_score, coverage_explanation)
  313. VALUES (%s, %s, %s, %s, %s)""",
  314. (req_id, aid, m['is_selected'],
  315. m['coverage_score'], m['coverage_explanation']))
  316. stats['abstract_req_strat'] = len(agg)
  317. print(f' inserted abstract requirement_strategy: {len(agg)}', flush=True)
  318. def step6_delete_concrete_strategies(cur, stats):
  319. print('\n=== Step 6: 删除具体 strategy 行(howard_dedup version)===', flush=True)
  320. # 先确认没有 junction 残余
  321. for t in ['strategy_capability', 'strategy_resource', 'strategy_knowledge',
  322. 'requirement_strategy']:
  323. cur.execute(f"""SELECT COUNT(*) c FROM {t}
  324. WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""")
  325. n = cur.fetchone()['c']
  326. if n > 0:
  327. print(f' ⚠️ {t} 还有 {n} 行指向具体 strategy,可能不干净', flush=True)
  328. cur.execute("DELETE FROM strategy WHERE version = 'howard_dedup'")
  329. stats['concrete_deleted'] = cur.rowcount
  330. print(f' deleted concrete strategies: {stats["concrete_deleted"]}', flush=True)
  331. # ═══════════════════════════════════════════════════════════
  332. def verify(cur):
  333. print('\n=== 最终验证 ===', flush=True)
  334. cur.execute("""SELECT
  335. (SELECT COUNT(*) FROM strategy) s_total,
  336. (SELECT COUNT(*) FROM strategy WHERE version='howard_dedup') s_concrete,
  337. (SELECT COUNT(*) FROM strategy WHERE version=%s) s_abstract,
  338. (SELECT COUNT(*) FROM knowledge WHERE version='v0') k_v0,
  339. (SELECT COUNT(*) FROM knowledge WHERE version=%s) k_new,
  340. (SELECT COUNT(*) FROM requirement_strategy) rs_total,
  341. (SELECT COUNT(*) FROM requirement_knowledge
  342. WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version=%s)) rk_new,
  343. (SELECT COUNT(*) FROM strategy_knowledge
  344. WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version=%s)) sk_new,
  345. (SELECT COUNT(*) FROM knowledge_resource
  346. WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version=%s)) kr_new,
  347. (SELECT COUNT(*) FROM strategy_capability
  348. WHERE strategy_id IN (SELECT id FROM strategy WHERE version=%s)) sc_new,
  349. (SELECT COUNT(*) FROM strategy_resource
  350. WHERE strategy_id IN (SELECT id FROM strategy WHERE version=%s)) sr_new
  351. """, (NEW_VERSION,)*7)
  352. r = cur.fetchone()
  353. for k, v in dict(r).items(): print(f' {k}: {v}', flush=True)
  354. # Constraint: strat_cap ⊆ req_cap
  355. cur.execute("""SELECT COUNT(*) c FROM strategy_capability sc
  356. JOIN requirement_strategy rs ON rs.strategy_id = sc.strategy_id
  357. LEFT JOIN requirement_capability rc
  358. ON rc.requirement_id=rs.requirement_id AND rc.capability_id=sc.capability_id
  359. WHERE rc.capability_id IS NULL""")
  360. print(f'\n strat_cap ⊄ req_cap violations: {cur.fetchone()["c"]}', flush=True)
  361. # ═══════════════════════════════════════════════════════════
  362. def main():
  363. ap = argparse.ArgumentParser()
  364. ap.add_argument('--execute', action='store_true')
  365. ap.add_argument('--dry-run', action='store_true')
  366. args = ap.parse_args()
  367. if not (args.execute or args.dry_run):
  368. print('need --execute or --dry-run'); sys.exit(1)
  369. s = PostgreSQLCapabilityStore()
  370. cur = s._get_cursor()
  371. try:
  372. # Preflight
  373. cur.execute("""SELECT s.id, rs.requirement_id, rs.is_selected
  374. FROM strategy s JOIN requirement_strategy rs ON rs.strategy_id=s.id
  375. WHERE s.version='howard_dedup'""")
  376. all_rows = cur.fetchall()
  377. unmapped = [r for r in all_rows if classify(r['id'], r['requirement_id'], r['is_selected']) is None]
  378. print(f'Preflight: {len(all_rows)} concrete strategies, {len(unmapped)} unmapped')
  379. if unmapped:
  380. for r in unmapped[:10]: print(f' unmapped: {r}')
  381. print('Abort.')
  382. return
  383. if args.dry_run:
  384. print('DRY RUN OK — use --execute to run')
  385. return
  386. stats = {'abstract_inserted': 0, 'knowledge_inserted': 0,
  387. 'rk_inserted': 0, 'sk_inserted': 0, 'kr_inserted': 0,
  388. 'abstract_strat_cap': 0, 'abstract_strat_res': 0,
  389. 'abstract_req_strat': 0, 'concrete_deleted': 0,
  390. 'unmapped': []}
  391. step1_insert_abstract_strategies(cur, stats)
  392. step2_create_knowledge_and_junctions(cur, stats)
  393. step3_rebuild_strategy_capability(cur, stats)
  394. step4_rebuild_strategy_resource(cur, stats)
  395. step5_rebuild_requirement_strategy(cur, stats)
  396. step6_delete_concrete_strategies(cur, stats)
  397. print(f'\n{"="*60}\nStats:')
  398. for k, v in stats.items():
  399. if isinstance(v, list):
  400. print(f' {k}: {len(v)}')
  401. else:
  402. print(f' {k}: {v}')
  403. verify(cur)
  404. finally:
  405. cur.close()
  406. s.close()
  407. if __name__ == '__main__':
  408. main()