#!/usr/bin/env python3 """ Phase 4 + Phase 5 迁移:具体 strategy → 抽象 pattern + knowledge 实例 步骤: 1. Preflight:classifier 全覆盖验证 2. 插入 26 个抽象 strategy(P01..P27,P18 跳过因 0 成员) 3. 为每条具体 strategy 生成 knowledge 实例 + 写 4 类 junction - requirement_knowledge (req, knowledge_id, is_selected, coverage_score, coverage_explanation) - strategy_knowledge (abstract_strategy_id, knowledge_id) - knowledge_resource (knowledge_id, resource_id) ← 从 req 的 resource 联合获取 4. 重建 strategy_capability:抽象 strategy ↔ 成员 cap 联合 5. 重建 strategy_resource:抽象 strategy ↔ 成员 resource 联合 6. 重建 requirement_strategy:req ↔ 抽象 strategy(含 metadata,dedup) 7. 删除具体 strategy 及其残余 junction 幂等与安全: - bk_20260422_* 快照已在 - autocommit=True - 每阶段独立完成 + 验证,失败可重跑(已处理的会被 DELETE 前检查) - version='howard_strategy_instance' 是唯一标记,与老 knowledge (v0) 物理隔离 """ import argparse import hashlib import json import sys import time from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore from knowhub.scripts.abstract_patterns import ( PATTERNS, PATTERN_NAME, PATTERN_DESC, classify, ) NEW_VERSION = 'howard_strategy_instance' MIGRATION_DATE = '2026-04-22' def hash8(text): return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8] def abstract_strategy_id(pid): """P01 -> strategy-abstract-p01""" return f'strategy-abstract-{pid.lower()}' def gen_knowledge_id(req_id, original_strat_id): h = hash8(f'{req_id}|{original_strat_id}') return f'knowledge-stratinst-{h}' # ═══════════════════════════════════════════════════════════ def coverage_to_eval_score(cov): """coverage_score → eval.score""" if cov is None: return None if cov >= 0.9: return 6 if cov >= 0.8: return 5 if cov >= 0.6: return 4 if cov >= 0.4: return 3 return 2 # ═══════════════════════════════════════════════════════════ def build_knowledge_content(strat_row, req_desc, pattern_id, pattern_name): """拼出 req-specific knowledge 的 content 文本""" body = strat_row['body'] if isinstance(strat_row['body'], dict) else json.loads(strat_row['body'] or '{}') parts = [] parts.append(f'## 需求\n{req_desc}\n') parts.append(f'## 本具体方案\n**{strat_row["name"]}**') parts.append(f'(归属抽象套路:{pattern_id} · {pattern_name})\n') if body.get('reasoning'): parts.append(f'## 选择理由\n{body["reasoning"]}\n') if body.get('why_not'): parts.append(f'## 为何不选备选\n{body["why_not"]}\n') if body.get('could_switch_if'): parts.append(f'## 可切换到备选的条件\n{body["could_switch_if"]}\n') hl = body.get('highlight_coverage') if isinstance(hl, list) and hl: parts.append('## 独特优势(highlight_coverage)') for x in hl: parts.append(f'- {x}') parts.append('') bl = body.get('baseline_coverage') if isinstance(bl, list) and bl: parts.append('## 基础覆盖(baseline_coverage)') for x in bl: parts.append(f'- {x}') parts.append('') wo = body.get('workflow_outline') if isinstance(wo, list) and wo: parts.append('## 执行步骤(req-specific phases)') for i, ph in enumerate(wo, 1): if not isinstance(ph, dict): continue parts.append(f'**Phase {i}:{ph.get("phase","")}**') parts.append(f'{ph.get("description","")}') caps = ph.get('capabilities', []) if caps: names = [] for c in caps: if isinstance(c, dict): names.append(f'{c.get("id","?")} {c.get("name","")}') if names: parts.append(f'_能力使用:{"、".join(names)}_') parts.append('') src = body.get('source') if isinstance(src, list) and src: parts.append('## 源案例引用') for s in src[:12]: if isinstance(s, dict): parts.append(f'- {s.get("id","")}: {s.get("title","")}') elif isinstance(s, str): parts.append(f'- {s}') parts.append('') cov = body.get('coverage_score') cov_exp = body.get('coverage_explanation') if cov is not None: parts.append(f'## 覆盖度评分\ncoverage_score = **{cov}**') if cov_exp: parts.append(f'\nLLM 解释:{cov_exp}') return '\n'.join(parts) # ═══════════════════════════════════════════════════════════ def step1_insert_abstract_strategies(cur, stats): print('\n=== Step 1: 插入 26 个抽象 strategy ===', flush=True) now = int(time.time()) for pid, name, cat, desc in PATTERNS: # skip P18 (0 members) if pid == 'P18': continue aid = abstract_strategy_id(pid) # idempotent: DELETE + INSERT cur.execute('DELETE FROM strategy WHERE id = %s', (aid,)) body = { 'pattern_id': pid, 'pattern_category': cat, 'abstract_name': name, 'description': desc, 'migration_note': f'抽象套路 pattern; 在 Phase 4/5 迁移中从具体 strategy 聚合而来', 'migrated_at': MIGRATION_DATE, } cur.execute("""INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""", (aid, name, desc, json.dumps(body, ensure_ascii=False), 'approved', now, now, NEW_VERSION)) stats['abstract_inserted'] += 1 print(f' inserted: {stats["abstract_inserted"]}', flush=True) def step2_create_knowledge_and_junctions(cur, stats): print('\n=== Step 2: 为每条具体 strategy 生成 knowledge + junctions ===', flush=True) cur.execute("""SELECT s.id, s.name, s.body, rs.requirement_id, rs.is_selected, rs.coverage_score, rs.coverage_explanation, r.description AS req_desc FROM strategy s JOIN requirement_strategy rs ON rs.strategy_id = s.id JOIN requirement r ON r.id = rs.requirement_id WHERE s.version = 'howard_dedup'""") # 只处理具体 strategy(abstract 的 version 是 new_version) rows = cur.fetchall() print(f' concrete strategies to migrate: {len(rows)}', flush=True) now = int(time.time()) for i, row in enumerate(rows): sid = row['id']; req_id = row['requirement_id'] is_sel = row['is_selected']; cov = row['coverage_score']; cov_exp = row['coverage_explanation'] # classify pid = classify(sid, req_id, is_sel) if pid is None: stats['unmapped'].append(sid) continue aid = abstract_strategy_id(pid) kid = gen_knowledge_id(req_id, sid) # build content content = build_knowledge_content(row, row['req_desc'], pid, PATTERN_NAME[pid]) task = f'{row["req_desc"][:80]} — {PATTERN_NAME[pid]}' tags = { 'kind': 'req_specific_strategy_execution', 'original_strategy_name': row['name'], 'pattern_id': pid, 'pattern_name': PATTERN_NAME[pid], } source = { 'category': 'strategy_migration_req_specific', 'agent_id': 'strategy_migration_2026-04-22', 'timestamp': f'{MIGRATION_DATE}T00:00:00+00:00', 'original_strategy_id': sid, } es_score = coverage_to_eval_score(cov) eval_obj = {'score': es_score, 'harmful': 0, 'helpful': 1, 'confidence': cov or 0.5} # Insert knowledge cur.execute('DELETE FROM knowledge WHERE id = %s', (kid,)) cur.execute("""INSERT INTO knowledge (id, message_id, task, content, types, tags, tag_keys, scopes, owner, source, eval, created_at, updated_at, status, version) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", (kid, '', task, content, ['strategy', 'execution_instance'], json.dumps(tags, ensure_ascii=False), list(tags.keys()), ['org:cybertogether'], 'agent:strategy_migration', json.dumps(source, ensure_ascii=False), json.dumps(eval_obj, ensure_ascii=False), now, now, 'approved', NEW_VERSION)) stats['knowledge_inserted'] += 1 # requirement_knowledge cur.execute("""DELETE FROM requirement_knowledge WHERE requirement_id = %s AND knowledge_id = %s""", (req_id, kid)) cur.execute("""INSERT INTO requirement_knowledge (requirement_id, knowledge_id, is_selected, coverage_score, coverage_explanation) VALUES (%s, %s, %s, %s, %s)""", (req_id, kid, is_sel, cov, cov_exp)) stats['rk_inserted'] += 1 # strategy_knowledge (abstract -> knowledge) cur.execute("""DELETE FROM strategy_knowledge WHERE strategy_id = %s AND knowledge_id = %s""", (aid, kid)) cur.execute("""INSERT INTO strategy_knowledge (strategy_id, knowledge_id, relation_type) VALUES (%s, %s, 'execution_instance')""", (aid, kid)) stats['sk_inserted'] += 1 # knowledge_resource: 该 knowledge 的 req 关联的所有 resource # 注:这是 req 级别的联合,非严格的 knowledge-specific 引用。 # 更精细的引用可在 Phase 6 从 body.source 逐案匹配(后续优化) cur.execute("""SELECT resource_id FROM requirement_resource WHERE requirement_id = %s""", (req_id,)) res_ids = [r['resource_id'] for r in cur.fetchall()] for rid in res_ids: cur.execute("""INSERT INTO knowledge_resource (knowledge_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (kid, rid)) stats['kr_inserted'] += len(res_ids) if (i + 1) % 20 == 0: print(f' processed {i+1}/{len(rows)}', flush=True) def step3_rebuild_strategy_capability(cur, stats): print('\n=== Step 3: 重建 strategy_capability(抽象 strategy ↔ cap 联合)===', flush=True) # 先收集 each pattern 的 cap union cur.execute("""SELECT sc.capability_id, sc.strategy_id, rs.requirement_id, rs.is_selected, s.id AS sid FROM strategy_capability sc JOIN strategy s ON s.id = sc.strategy_id JOIN requirement_strategy rs ON rs.strategy_id = s.id WHERE s.version = 'howard_dedup'""") rows = cur.fetchall() pattern_caps = {} # pid -> set(cap_id) for r in rows: pid = classify(r['sid'], r['requirement_id'], r['is_selected']) if not pid: continue pattern_caps.setdefault(pid, set()).add(r['capability_id']) # 删除老的 strategy_capability (concrete) cur.execute("""DELETE FROM strategy_capability WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""") print(f' deleted concrete strategy_capability', flush=True) # 插入 abstract strategy_capability total_ins = 0 for pid, caps in pattern_caps.items(): if pid == 'P18': continue aid = abstract_strategy_id(pid) for cap_id in caps: cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id, relation_type) VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""", (aid, cap_id)) total_ins += 1 stats['abstract_strat_cap'] = total_ins print(f' inserted abstract strategy_capability: {total_ins}', flush=True) def step4_rebuild_strategy_resource(cur, stats): print('\n=== Step 4: 重建 strategy_resource(抽象 strategy ↔ resource 联合)===', flush=True) cur.execute("""SELECT sr.resource_id, s.id AS sid, rs.requirement_id, rs.is_selected FROM strategy_resource sr JOIN strategy s ON s.id = sr.strategy_id JOIN requirement_strategy rs ON rs.strategy_id = s.id WHERE s.version = 'howard_dedup'""") rows = cur.fetchall() pattern_res = {} for r in rows: pid = classify(r['sid'], r['requirement_id'], r['is_selected']) if not pid: continue pattern_res.setdefault(pid, set()).add(r['resource_id']) cur.execute("""DELETE FROM strategy_resource WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""") print(f' deleted concrete strategy_resource', flush=True) total_ins = 0 for pid, res_set in pattern_res.items(): if pid == 'P18': continue aid = abstract_strategy_id(pid) for rid in res_set: cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (aid, rid)) total_ins += 1 stats['abstract_strat_res'] = total_ins print(f' inserted abstract strategy_resource: {total_ins}', flush=True) def step5_rebuild_requirement_strategy(cur, stats): print('\n=== Step 5: 重建 requirement_strategy (req ↔ abstract, dedup) ===', flush=True) # 先抓取所有具体 req_strat 记录 cur.execute("""SELECT rs.requirement_id, rs.strategy_id, rs.is_selected, rs.coverage_score, rs.coverage_explanation, s.version FROM requirement_strategy rs JOIN strategy s ON s.id = rs.strategy_id WHERE s.version = 'howard_dedup'""") concrete_rows = cur.fetchall() # 聚合到 (req, abstract_id):is_selected=ANY 成员 selected;coverage 取 max agg = {} # (req, abstract_id) -> {is_selected, coverage_score, coverage_explanation} for r in concrete_rows: pid = classify(r['strategy_id'], r['requirement_id'], r['is_selected']) if not pid: continue aid = abstract_strategy_id(pid) key = (r['requirement_id'], aid) cur_data = agg.get(key, {'is_selected': False, 'coverage_score': None, 'coverage_explanation': None}) if r['is_selected']: cur_data['is_selected'] = True # coverage: 取最高分的 if r['coverage_score'] is not None: if cur_data['coverage_score'] is None or r['coverage_score'] > cur_data['coverage_score']: cur_data['coverage_score'] = r['coverage_score'] cur_data['coverage_explanation'] = r['coverage_explanation'] agg[key] = cur_data # 删除老的具体 req_strategy 行 cur.execute("""DELETE FROM requirement_strategy WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""") print(f' deleted concrete requirement_strategy rows', flush=True) # 插入抽象 req_strategy 行 for (req_id, aid), m in agg.items(): cur.execute("""DELETE FROM requirement_strategy WHERE requirement_id = %s AND strategy_id = %s""", (req_id, aid)) cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id, is_selected, coverage_score, coverage_explanation) VALUES (%s, %s, %s, %s, %s)""", (req_id, aid, m['is_selected'], m['coverage_score'], m['coverage_explanation'])) stats['abstract_req_strat'] = len(agg) print(f' inserted abstract requirement_strategy: {len(agg)}', flush=True) def step6_delete_concrete_strategies(cur, stats): print('\n=== Step 6: 删除具体 strategy 行(howard_dedup version)===', flush=True) # 先确认没有 junction 残余 for t in ['strategy_capability', 'strategy_resource', 'strategy_knowledge', 'requirement_strategy']: cur.execute(f"""SELECT COUNT(*) c FROM {t} WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""") n = cur.fetchone()['c'] if n > 0: print(f' ⚠️ {t} 还有 {n} 行指向具体 strategy,可能不干净', flush=True) cur.execute("DELETE FROM strategy WHERE version = 'howard_dedup'") stats['concrete_deleted'] = cur.rowcount print(f' deleted concrete strategies: {stats["concrete_deleted"]}', flush=True) # ═══════════════════════════════════════════════════════════ def verify(cur): print('\n=== 最终验证 ===', flush=True) cur.execute("""SELECT (SELECT COUNT(*) FROM strategy) s_total, (SELECT COUNT(*) FROM strategy WHERE version='howard_dedup') s_concrete, (SELECT COUNT(*) FROM strategy WHERE version=%s) s_abstract, (SELECT COUNT(*) FROM knowledge WHERE version='v0') k_v0, (SELECT COUNT(*) FROM knowledge WHERE version=%s) k_new, (SELECT COUNT(*) FROM requirement_strategy) rs_total, (SELECT COUNT(*) FROM requirement_knowledge WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version=%s)) rk_new, (SELECT COUNT(*) FROM strategy_knowledge WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version=%s)) sk_new, (SELECT COUNT(*) FROM knowledge_resource WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version=%s)) kr_new, (SELECT COUNT(*) FROM strategy_capability WHERE strategy_id IN (SELECT id FROM strategy WHERE version=%s)) sc_new, (SELECT COUNT(*) FROM strategy_resource WHERE strategy_id IN (SELECT id FROM strategy WHERE version=%s)) sr_new """, (NEW_VERSION,)*7) r = cur.fetchone() for k, v in dict(r).items(): print(f' {k}: {v}', flush=True) # Constraint: strat_cap ⊆ req_cap cur.execute("""SELECT COUNT(*) c FROM strategy_capability sc JOIN requirement_strategy rs ON rs.strategy_id = sc.strategy_id LEFT JOIN requirement_capability rc ON rc.requirement_id=rs.requirement_id AND rc.capability_id=sc.capability_id WHERE rc.capability_id IS NULL""") print(f'\n strat_cap ⊄ req_cap violations: {cur.fetchone()["c"]}', flush=True) # ═══════════════════════════════════════════════════════════ def main(): ap = argparse.ArgumentParser() ap.add_argument('--execute', action='store_true') ap.add_argument('--dry-run', action='store_true') args = ap.parse_args() if not (args.execute or args.dry_run): print('need --execute or --dry-run'); sys.exit(1) s = PostgreSQLCapabilityStore() cur = s._get_cursor() try: # Preflight cur.execute("""SELECT s.id, rs.requirement_id, rs.is_selected FROM strategy s JOIN requirement_strategy rs ON rs.strategy_id=s.id WHERE s.version='howard_dedup'""") all_rows = cur.fetchall() unmapped = [r for r in all_rows if classify(r['id'], r['requirement_id'], r['is_selected']) is None] print(f'Preflight: {len(all_rows)} concrete strategies, {len(unmapped)} unmapped') if unmapped: for r in unmapped[:10]: print(f' unmapped: {r}') print('Abort.') return if args.dry_run: print('DRY RUN OK — use --execute to run') return stats = {'abstract_inserted': 0, 'knowledge_inserted': 0, 'rk_inserted': 0, 'sk_inserted': 0, 'kr_inserted': 0, 'abstract_strat_cap': 0, 'abstract_strat_res': 0, 'abstract_req_strat': 0, 'concrete_deleted': 0, 'unmapped': []} step1_insert_abstract_strategies(cur, stats) step2_create_knowledge_and_junctions(cur, stats) step3_rebuild_strategy_capability(cur, stats) step4_rebuild_strategy_resource(cur, stats) step5_rebuild_requirement_strategy(cur, stats) step6_delete_concrete_strategies(cur, stats) print(f'\n{"="*60}\nStats:') for k, v in stats.items(): if isinstance(v, list): print(f' {k}: {len(v)}') else: print(f' {k}: {v}') verify(cur) finally: cur.close() s.close() if __name__ == '__main__': main()