| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467 |
- #!/usr/bin/env python3
- """
- Phase 4 + Phase 5 迁移:具体 strategy → 抽象 pattern + knowledge 实例
- 步骤:
- 1. Preflight:classifier 全覆盖验证
- 2. 插入 26 个抽象 strategy(P01..P27,P18 跳过因 0 成员)
- 3. 为每条具体 strategy 生成 knowledge 实例 + 写 4 类 junction
- - requirement_knowledge (req, knowledge_id, is_selected, coverage_score, coverage_explanation)
- - strategy_knowledge (abstract_strategy_id, knowledge_id)
- - knowledge_resource (knowledge_id, resource_id) ← 从 req 的 resource 联合获取
- 4. 重建 strategy_capability:抽象 strategy ↔ 成员 cap 联合
- 5. 重建 strategy_resource:抽象 strategy ↔ 成员 resource 联合
- 6. 重建 requirement_strategy:req ↔ 抽象 strategy(含 metadata,dedup)
- 7. 删除具体 strategy 及其残余 junction
- 幂等与安全:
- - bk_20260422_* 快照已在
- - autocommit=True
- - 每阶段独立完成 + 验证,失败可重跑(已处理的会被 DELETE 前检查)
- - version='howard_strategy_instance' 是唯一标记,与老 knowledge (v0) 物理隔离
- """
- import argparse
- import hashlib
- import json
- import sys
- import time
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- from knowhub.scripts.abstract_patterns import (
- PATTERNS, PATTERN_NAME, PATTERN_DESC, classify,
- )
- NEW_VERSION = 'howard_strategy_instance'
- MIGRATION_DATE = '2026-04-22'
- def hash8(text):
- return hashlib.sha256(text.encode('utf-8')).hexdigest()[:8]
- def abstract_strategy_id(pid):
- """P01 -> strategy-abstract-p01"""
- return f'strategy-abstract-{pid.lower()}'
- def gen_knowledge_id(req_id, original_strat_id):
- h = hash8(f'{req_id}|{original_strat_id}')
- return f'knowledge-stratinst-{h}'
- # ═══════════════════════════════════════════════════════════
- def coverage_to_eval_score(cov):
- """coverage_score → eval.score"""
- if cov is None: return None
- if cov >= 0.9: return 6
- if cov >= 0.8: return 5
- if cov >= 0.6: return 4
- if cov >= 0.4: return 3
- return 2
- # ═══════════════════════════════════════════════════════════
- def build_knowledge_content(strat_row, req_desc, pattern_id, pattern_name):
- """拼出 req-specific knowledge 的 content 文本"""
- body = strat_row['body'] if isinstance(strat_row['body'], dict) else json.loads(strat_row['body'] or '{}')
- parts = []
- parts.append(f'## 需求\n{req_desc}\n')
- parts.append(f'## 本具体方案\n**{strat_row["name"]}**')
- parts.append(f'(归属抽象套路:{pattern_id} · {pattern_name})\n')
- if body.get('reasoning'):
- parts.append(f'## 选择理由\n{body["reasoning"]}\n')
- if body.get('why_not'):
- parts.append(f'## 为何不选备选\n{body["why_not"]}\n')
- if body.get('could_switch_if'):
- parts.append(f'## 可切换到备选的条件\n{body["could_switch_if"]}\n')
- hl = body.get('highlight_coverage')
- if isinstance(hl, list) and hl:
- parts.append('## 独特优势(highlight_coverage)')
- for x in hl: parts.append(f'- {x}')
- parts.append('')
- bl = body.get('baseline_coverage')
- if isinstance(bl, list) and bl:
- parts.append('## 基础覆盖(baseline_coverage)')
- for x in bl: parts.append(f'- {x}')
- parts.append('')
- wo = body.get('workflow_outline')
- if isinstance(wo, list) and wo:
- parts.append('## 执行步骤(req-specific phases)')
- for i, ph in enumerate(wo, 1):
- if not isinstance(ph, dict): continue
- parts.append(f'**Phase {i}:{ph.get("phase","")}**')
- parts.append(f'{ph.get("description","")}')
- caps = ph.get('capabilities', [])
- if caps:
- names = []
- for c in caps:
- if isinstance(c, dict):
- names.append(f'{c.get("id","?")} {c.get("name","")}')
- if names:
- parts.append(f'_能力使用:{"、".join(names)}_')
- parts.append('')
- src = body.get('source')
- if isinstance(src, list) and src:
- parts.append('## 源案例引用')
- for s in src[:12]:
- if isinstance(s, dict):
- parts.append(f'- {s.get("id","")}: {s.get("title","")}')
- elif isinstance(s, str):
- parts.append(f'- {s}')
- parts.append('')
- cov = body.get('coverage_score')
- cov_exp = body.get('coverage_explanation')
- if cov is not None:
- parts.append(f'## 覆盖度评分\ncoverage_score = **{cov}**')
- if cov_exp:
- parts.append(f'\nLLM 解释:{cov_exp}')
- return '\n'.join(parts)
- # ═══════════════════════════════════════════════════════════
- def step1_insert_abstract_strategies(cur, stats):
- print('\n=== Step 1: 插入 26 个抽象 strategy ===', flush=True)
- now = int(time.time())
- for pid, name, cat, desc in PATTERNS:
- # skip P18 (0 members)
- if pid == 'P18': continue
- aid = abstract_strategy_id(pid)
- # idempotent: DELETE + INSERT
- cur.execute('DELETE FROM strategy WHERE id = %s', (aid,))
- body = {
- 'pattern_id': pid,
- 'pattern_category': cat,
- 'abstract_name': name,
- 'description': desc,
- 'migration_note': f'抽象套路 pattern; 在 Phase 4/5 迁移中从具体 strategy 聚合而来',
- 'migrated_at': MIGRATION_DATE,
- }
- cur.execute("""INSERT INTO strategy (id, name, description, body, status,
- created_at, updated_at, version)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""",
- (aid, name, desc,
- json.dumps(body, ensure_ascii=False),
- 'approved', now, now, NEW_VERSION))
- stats['abstract_inserted'] += 1
- print(f' inserted: {stats["abstract_inserted"]}', flush=True)
- def step2_create_knowledge_and_junctions(cur, stats):
- print('\n=== Step 2: 为每条具体 strategy 生成 knowledge + junctions ===', flush=True)
- cur.execute("""SELECT s.id, s.name, s.body, rs.requirement_id,
- rs.is_selected, rs.coverage_score, rs.coverage_explanation,
- r.description AS req_desc
- FROM strategy s
- JOIN requirement_strategy rs ON rs.strategy_id = s.id
- JOIN requirement r ON r.id = rs.requirement_id
- WHERE s.version = 'howard_dedup'""") # 只处理具体 strategy(abstract 的 version 是 new_version)
- rows = cur.fetchall()
- print(f' concrete strategies to migrate: {len(rows)}', flush=True)
- now = int(time.time())
- for i, row in enumerate(rows):
- sid = row['id']; req_id = row['requirement_id']
- is_sel = row['is_selected']; cov = row['coverage_score']; cov_exp = row['coverage_explanation']
- # classify
- pid = classify(sid, req_id, is_sel)
- if pid is None:
- stats['unmapped'].append(sid)
- continue
- aid = abstract_strategy_id(pid)
- kid = gen_knowledge_id(req_id, sid)
- # build content
- content = build_knowledge_content(row, row['req_desc'], pid, PATTERN_NAME[pid])
- task = f'{row["req_desc"][:80]} — {PATTERN_NAME[pid]}'
- tags = {
- 'kind': 'req_specific_strategy_execution',
- 'original_strategy_name': row['name'],
- 'pattern_id': pid,
- 'pattern_name': PATTERN_NAME[pid],
- }
- source = {
- 'category': 'strategy_migration_req_specific',
- 'agent_id': 'strategy_migration_2026-04-22',
- 'timestamp': f'{MIGRATION_DATE}T00:00:00+00:00',
- 'original_strategy_id': sid,
- }
- es_score = coverage_to_eval_score(cov)
- eval_obj = {'score': es_score, 'harmful': 0, 'helpful': 1, 'confidence': cov or 0.5}
- # Insert knowledge
- cur.execute('DELETE FROM knowledge WHERE id = %s', (kid,))
- cur.execute("""INSERT INTO knowledge
- (id, message_id, task, content, types, tags, tag_keys,
- scopes, owner, source, eval, created_at, updated_at,
- status, version)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
- %s, %s, %s, %s)""",
- (kid, '', task, content,
- ['strategy', 'execution_instance'],
- json.dumps(tags, ensure_ascii=False),
- list(tags.keys()),
- ['org:cybertogether'],
- 'agent:strategy_migration',
- json.dumps(source, ensure_ascii=False),
- json.dumps(eval_obj, ensure_ascii=False),
- now, now, 'approved', NEW_VERSION))
- stats['knowledge_inserted'] += 1
- # requirement_knowledge
- cur.execute("""DELETE FROM requirement_knowledge
- WHERE requirement_id = %s AND knowledge_id = %s""", (req_id, kid))
- cur.execute("""INSERT INTO requirement_knowledge
- (requirement_id, knowledge_id, is_selected,
- coverage_score, coverage_explanation)
- VALUES (%s, %s, %s, %s, %s)""",
- (req_id, kid, is_sel, cov, cov_exp))
- stats['rk_inserted'] += 1
- # strategy_knowledge (abstract -> knowledge)
- cur.execute("""DELETE FROM strategy_knowledge
- WHERE strategy_id = %s AND knowledge_id = %s""", (aid, kid))
- cur.execute("""INSERT INTO strategy_knowledge
- (strategy_id, knowledge_id, relation_type)
- VALUES (%s, %s, 'execution_instance')""", (aid, kid))
- stats['sk_inserted'] += 1
- # knowledge_resource: 该 knowledge 的 req 关联的所有 resource
- # 注:这是 req 级别的联合,非严格的 knowledge-specific 引用。
- # 更精细的引用可在 Phase 6 从 body.source 逐案匹配(后续优化)
- cur.execute("""SELECT resource_id FROM requirement_resource
- WHERE requirement_id = %s""", (req_id,))
- res_ids = [r['resource_id'] for r in cur.fetchall()]
- for rid in res_ids:
- cur.execute("""INSERT INTO knowledge_resource (knowledge_id, resource_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (kid, rid))
- stats['kr_inserted'] += len(res_ids)
- if (i + 1) % 20 == 0:
- print(f' processed {i+1}/{len(rows)}', flush=True)
- def step3_rebuild_strategy_capability(cur, stats):
- print('\n=== Step 3: 重建 strategy_capability(抽象 strategy ↔ cap 联合)===', flush=True)
- # 先收集 each pattern 的 cap union
- cur.execute("""SELECT sc.capability_id, sc.strategy_id, rs.requirement_id, rs.is_selected, s.id AS sid
- FROM strategy_capability sc
- JOIN strategy s ON s.id = sc.strategy_id
- JOIN requirement_strategy rs ON rs.strategy_id = s.id
- WHERE s.version = 'howard_dedup'""")
- rows = cur.fetchall()
- pattern_caps = {} # pid -> set(cap_id)
- for r in rows:
- pid = classify(r['sid'], r['requirement_id'], r['is_selected'])
- if not pid: continue
- pattern_caps.setdefault(pid, set()).add(r['capability_id'])
- # 删除老的 strategy_capability (concrete)
- cur.execute("""DELETE FROM strategy_capability
- WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""")
- print(f' deleted concrete strategy_capability', flush=True)
- # 插入 abstract strategy_capability
- total_ins = 0
- for pid, caps in pattern_caps.items():
- if pid == 'P18': continue
- aid = abstract_strategy_id(pid)
- for cap_id in caps:
- cur.execute("""INSERT INTO strategy_capability
- (strategy_id, capability_id, relation_type)
- VALUES (%s, %s, 'compose') ON CONFLICT DO NOTHING""",
- (aid, cap_id))
- total_ins += 1
- stats['abstract_strat_cap'] = total_ins
- print(f' inserted abstract strategy_capability: {total_ins}', flush=True)
- def step4_rebuild_strategy_resource(cur, stats):
- print('\n=== Step 4: 重建 strategy_resource(抽象 strategy ↔ resource 联合)===', flush=True)
- cur.execute("""SELECT sr.resource_id, s.id AS sid, rs.requirement_id, rs.is_selected
- FROM strategy_resource sr
- JOIN strategy s ON s.id = sr.strategy_id
- JOIN requirement_strategy rs ON rs.strategy_id = s.id
- WHERE s.version = 'howard_dedup'""")
- rows = cur.fetchall()
- pattern_res = {}
- for r in rows:
- pid = classify(r['sid'], r['requirement_id'], r['is_selected'])
- if not pid: continue
- pattern_res.setdefault(pid, set()).add(r['resource_id'])
- cur.execute("""DELETE FROM strategy_resource
- WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""")
- print(f' deleted concrete strategy_resource', flush=True)
- total_ins = 0
- for pid, res_set in pattern_res.items():
- if pid == 'P18': continue
- aid = abstract_strategy_id(pid)
- for rid in res_set:
- cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""", (aid, rid))
- total_ins += 1
- stats['abstract_strat_res'] = total_ins
- print(f' inserted abstract strategy_resource: {total_ins}', flush=True)
- def step5_rebuild_requirement_strategy(cur, stats):
- print('\n=== Step 5: 重建 requirement_strategy (req ↔ abstract, dedup) ===', flush=True)
- # 先抓取所有具体 req_strat 记录
- cur.execute("""SELECT rs.requirement_id, rs.strategy_id, rs.is_selected,
- rs.coverage_score, rs.coverage_explanation,
- s.version
- FROM requirement_strategy rs
- JOIN strategy s ON s.id = rs.strategy_id
- WHERE s.version = 'howard_dedup'""")
- concrete_rows = cur.fetchall()
- # 聚合到 (req, abstract_id):is_selected=ANY 成员 selected;coverage 取 max
- agg = {} # (req, abstract_id) -> {is_selected, coverage_score, coverage_explanation}
- for r in concrete_rows:
- pid = classify(r['strategy_id'], r['requirement_id'], r['is_selected'])
- if not pid: continue
- aid = abstract_strategy_id(pid)
- key = (r['requirement_id'], aid)
- cur_data = agg.get(key, {'is_selected': False, 'coverage_score': None,
- 'coverage_explanation': None})
- if r['is_selected']:
- cur_data['is_selected'] = True
- # coverage: 取最高分的
- if r['coverage_score'] is not None:
- if cur_data['coverage_score'] is None or r['coverage_score'] > cur_data['coverage_score']:
- cur_data['coverage_score'] = r['coverage_score']
- cur_data['coverage_explanation'] = r['coverage_explanation']
- agg[key] = cur_data
- # 删除老的具体 req_strategy 行
- cur.execute("""DELETE FROM requirement_strategy
- WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""")
- print(f' deleted concrete requirement_strategy rows', flush=True)
- # 插入抽象 req_strategy 行
- for (req_id, aid), m in agg.items():
- cur.execute("""DELETE FROM requirement_strategy
- WHERE requirement_id = %s AND strategy_id = %s""", (req_id, aid))
- cur.execute("""INSERT INTO requirement_strategy
- (requirement_id, strategy_id, is_selected,
- coverage_score, coverage_explanation)
- VALUES (%s, %s, %s, %s, %s)""",
- (req_id, aid, m['is_selected'],
- m['coverage_score'], m['coverage_explanation']))
- stats['abstract_req_strat'] = len(agg)
- print(f' inserted abstract requirement_strategy: {len(agg)}', flush=True)
- def step6_delete_concrete_strategies(cur, stats):
- print('\n=== Step 6: 删除具体 strategy 行(howard_dedup version)===', flush=True)
- # 先确认没有 junction 残余
- for t in ['strategy_capability', 'strategy_resource', 'strategy_knowledge',
- 'requirement_strategy']:
- cur.execute(f"""SELECT COUNT(*) c FROM {t}
- WHERE strategy_id IN (SELECT id FROM strategy WHERE version='howard_dedup')""")
- n = cur.fetchone()['c']
- if n > 0:
- print(f' ⚠️ {t} 还有 {n} 行指向具体 strategy,可能不干净', flush=True)
- cur.execute("DELETE FROM strategy WHERE version = 'howard_dedup'")
- stats['concrete_deleted'] = cur.rowcount
- print(f' deleted concrete strategies: {stats["concrete_deleted"]}', flush=True)
- # ═══════════════════════════════════════════════════════════
- def verify(cur):
- print('\n=== 最终验证 ===', flush=True)
- cur.execute("""SELECT
- (SELECT COUNT(*) FROM strategy) s_total,
- (SELECT COUNT(*) FROM strategy WHERE version='howard_dedup') s_concrete,
- (SELECT COUNT(*) FROM strategy WHERE version=%s) s_abstract,
- (SELECT COUNT(*) FROM knowledge WHERE version='v0') k_v0,
- (SELECT COUNT(*) FROM knowledge WHERE version=%s) k_new,
- (SELECT COUNT(*) FROM requirement_strategy) rs_total,
- (SELECT COUNT(*) FROM requirement_knowledge
- WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version=%s)) rk_new,
- (SELECT COUNT(*) FROM strategy_knowledge
- WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version=%s)) sk_new,
- (SELECT COUNT(*) FROM knowledge_resource
- WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version=%s)) kr_new,
- (SELECT COUNT(*) FROM strategy_capability
- WHERE strategy_id IN (SELECT id FROM strategy WHERE version=%s)) sc_new,
- (SELECT COUNT(*) FROM strategy_resource
- WHERE strategy_id IN (SELECT id FROM strategy WHERE version=%s)) sr_new
- """, (NEW_VERSION,)*7)
- r = cur.fetchone()
- for k, v in dict(r).items(): print(f' {k}: {v}', flush=True)
- # Constraint: strat_cap ⊆ req_cap
- cur.execute("""SELECT COUNT(*) c FROM strategy_capability sc
- JOIN requirement_strategy rs ON rs.strategy_id = sc.strategy_id
- LEFT JOIN requirement_capability rc
- ON rc.requirement_id=rs.requirement_id AND rc.capability_id=sc.capability_id
- WHERE rc.capability_id IS NULL""")
- print(f'\n strat_cap ⊄ req_cap violations: {cur.fetchone()["c"]}', flush=True)
- # ═══════════════════════════════════════════════════════════
- def main():
- ap = argparse.ArgumentParser()
- ap.add_argument('--execute', action='store_true')
- ap.add_argument('--dry-run', action='store_true')
- args = ap.parse_args()
- if not (args.execute or args.dry_run):
- print('need --execute or --dry-run'); sys.exit(1)
- s = PostgreSQLCapabilityStore()
- cur = s._get_cursor()
- try:
- # Preflight
- cur.execute("""SELECT s.id, rs.requirement_id, rs.is_selected
- FROM strategy s JOIN requirement_strategy rs ON rs.strategy_id=s.id
- WHERE s.version='howard_dedup'""")
- all_rows = cur.fetchall()
- unmapped = [r for r in all_rows if classify(r['id'], r['requirement_id'], r['is_selected']) is None]
- print(f'Preflight: {len(all_rows)} concrete strategies, {len(unmapped)} unmapped')
- if unmapped:
- for r in unmapped[:10]: print(f' unmapped: {r}')
- print('Abort.')
- return
- if args.dry_run:
- print('DRY RUN OK — use --execute to run')
- return
- stats = {'abstract_inserted': 0, 'knowledge_inserted': 0,
- 'rk_inserted': 0, 'sk_inserted': 0, 'kr_inserted': 0,
- 'abstract_strat_cap': 0, 'abstract_strat_res': 0,
- 'abstract_req_strat': 0, 'concrete_deleted': 0,
- 'unmapped': []}
- step1_insert_abstract_strategies(cur, stats)
- step2_create_knowledge_and_junctions(cur, stats)
- step3_rebuild_strategy_capability(cur, stats)
- step4_rebuild_strategy_resource(cur, stats)
- step5_rebuild_requirement_strategy(cur, stats)
- step6_delete_concrete_strategies(cur, stats)
- print(f'\n{"="*60}\nStats:')
- for k, v in stats.items():
- if isinstance(v, list):
- print(f' {k}: {len(v)}')
- else:
- print(f' {k}: {v}')
- verify(cur)
- finally:
- cur.close()
- s.close()
- if __name__ == '__main__':
- main()
|