#!/usr/bin/env python3 """ Phase 5 补丁:新建 knowledge_capability junction,保留"req-specific 执行用了哪些 cap"的粒度。 背景:Phase 4/5 迁移后,strategy_capability 是抽象 pattern 的 cap 联合集, 不再与某 req 的 research 强一致。原 strat_cap ⊆ req_cap 约束失效, 需要在 knowledge 层重建同等约束:knowledge_cap ⊆ req_cap(via requirement_knowledge)。 数据源:bk_20260422_strategy_capability(具体 strategy 迁移前的快照)+ knowledge.source.original_strategy_id(指向原具体 strategy) 验证:所有新建的 knowledge_capability 必须满足 (knowledge.req_id, cap_id) ∈ requirement_capability """ import json import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore def main(): s = PostgreSQLCapabilityStore() cur = s._get_cursor() try: # 1. 检查 knowledge_capability 是否存在(可能已是空表) cur.execute("""SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name='knowledge_capability')""") if not cur.fetchone()['exists']: print('创建 knowledge_capability 表...', flush=True) cur.execute(""" CREATE TABLE knowledge_capability ( knowledge_id VARCHAR NOT NULL, capability_id VARCHAR NOT NULL, PRIMARY KEY (knowledge_id, capability_id) ) DISTRIBUTED BY (knowledge_id) """) print(' ✓ 已建表', flush=True) else: cur.execute('SELECT COUNT(*) c FROM knowledge_capability') print(f'knowledge_capability 已存在 ({cur.fetchone()["c"]} rows)', flush=True) # 2. 从备份表 + knowledge.source 重建 knowledge ↔ cap print('\n=== 从备份还原 knowledge-cap 关系 ===', flush=True) cur.execute("""SELECT id, source FROM knowledge WHERE version='howard_strategy_instance'""") knowledge_list = cur.fetchall() print(f' new knowledge: {len(knowledge_list)}', flush=True) total_inserted = 0 for k in knowledge_list: src = k['source'] if isinstance(k['source'], dict) else json.loads(k['source'] or '{}') orig_sid = src.get('original_strategy_id') if not orig_sid: continue # 查原具体 strategy 的 cap(从备份) cur.execute("""SELECT capability_id FROM bk_20260422_strategy_capability WHERE strategy_id = %s""", (orig_sid,)) caps = [r['capability_id'] for r in cur.fetchall()] for cap_id in caps: cur.execute("""INSERT INTO knowledge_capability (knowledge_id, capability_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (k['id'], cap_id)) total_inserted += 1 print(f' inserted knowledge_capability: {total_inserted}', flush=True) # 3. 验证约束:knowledge_cap ⊆ req_cap(via requirement_knowledge) print('\n=== 验证 knowledge_cap ⊆ req_cap ===', flush=True) cur.execute("""SELECT COUNT(*) c FROM knowledge_capability kc JOIN requirement_knowledge rk ON rk.knowledge_id = kc.knowledge_id LEFT JOIN requirement_capability rc ON rc.requirement_id = rk.requirement_id AND rc.capability_id = kc.capability_id WHERE rc.capability_id IS NULL""") violations = cur.fetchone()['c'] print(f' 违规行数: {violations} {"❌" if violations>0 else "✓"}', flush=True) # 4. 展示:每个 knowledge 的 cap 数量分布 cur.execute("""SELECT n_caps, COUNT(*) knowledge_n FROM (SELECT knowledge_id, COUNT(*) n_caps FROM knowledge_capability WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version='howard_strategy_instance') GROUP BY knowledge_id) t GROUP BY n_caps ORDER BY n_caps""") print('\n 每 knowledge cap 数量分布:', flush=True) for r in cur.fetchall(): print(f' {r["n_caps"]} caps: {r["knowledge_n"]} knowledge', flush=True) # 5. 汇总最终 DB 状态 print('\n=== 最终汇总 ===', flush=True) cur.execute("""SELECT (SELECT COUNT(*) FROM strategy) s_total, (SELECT COUNT(*) FROM knowledge WHERE version='howard_strategy_instance') new_k, (SELECT COUNT(*) FROM requirement_strategy) rs, (SELECT COUNT(*) FROM requirement_knowledge WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version='howard_strategy_instance')) new_rk, (SELECT COUNT(*) FROM strategy_knowledge WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version='howard_strategy_instance')) new_sk, (SELECT COUNT(*) FROM knowledge_resource WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version='howard_strategy_instance')) new_kr, (SELECT COUNT(*) FROM knowledge_capability WHERE knowledge_id IN (SELECT id FROM knowledge WHERE version='howard_strategy_instance')) new_kc, (SELECT COUNT(*) FROM strategy_capability) sc, (SELECT COUNT(*) FROM strategy_resource) sr, (SELECT COUNT(*) FROM requirement_capability) req_cap""") for k, v in dict(cur.fetchone()).items(): print(f' {k}: {v}', flush=True) finally: cur.close() s.close() if __name__ == '__main__': main()