#!/usr/bin/env python3 """ Step 2: 为 dev_dedup 版本做严格冗余复制 + 从 bk_20260422_* 恢复原具体 strategy 数据。 ID 命名规则: - 复制的 capability / resource 行 ID 加 '__dd' 后缀 - strategy 从备份恢复,沿用原 ID(已从主表 DELETE,无冲突) 执行顺序(FK 依赖): 1. capability (332 → dev_dedup 新 ID) 2. resource (2539 → dev_dedup 新 ID) 3. requirement_capability (1106 → remap cap_id 到 dev_dedup) 4. requirement_resource (2736 → remap resource_id 到 dev_dedup) 5. strategy (99 from bk,version 改 dev_dedup) 6. requirement_strategy (99 from bk,is_selected 默认 TRUE,coverage 空) 7. strategy_capability (703 from bk,remap cap_id) 8. strategy_resource (2736 from bk,remap resource_id) """ import sys, time from pathlib import Path import psycopg2.extras sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore SUFFIX = '__dd' def remap(old_id: str) -> str: return f'{old_id}{SUFFIX}' def t(label, fn): t0 = time.time() r = fn() print(f' [{time.time()-t0:.1f}s] {label}', flush=True) return r def main(): s = PostgreSQLCapabilityStore() cur = s._get_cursor() try: cur.execute("SET statement_timeout = '120s'") cur.execute("""SELECT pid FROM pg_stat_activity WHERE state='idle in transaction' AND pid!=pg_backend_pid() AND datname=current_database()""") for r in cur.fetchall(): cur.execute("SELECT pg_terminate_backend(%s)", (r['pid'],)) # ===== 1. capability 复制 ===== print('\n[1] 复制 capability 给 dev_dedup', flush=True) cur.execute("""SELECT id, name, criterion, description, effects FROM capability WHERE version='dev_abstract'""") caps = cur.fetchall() print(f' 源 cap 行数: {len(caps)}', flush=True) inserted = 0 for c in caps: new_id = remap(c['id']) cur.execute("""INSERT INTO capability (id, name, criterion, description, effects, version) VALUES (%s, %s, %s, %s, %s, 'dev_dedup') ON CONFLICT (id) DO NOTHING""", (new_id, c['name'], c['criterion'], c['description'], psycopg2.extras.Json(c['effects']) if c['effects'] is not None else None)) inserted += cur.rowcount or 0 print(f' inserted: {inserted}', flush=True) # ===== 2. resource 复制 ===== print('\n[2] 复制 resource 给 dev_dedup', flush=True) cur.execute("""SELECT id, title, body, secure_body, content_type, metadata, sort_order, submitted_by, created_at, updated_at, images FROM resource WHERE version='dev_abstract'""") ress = cur.fetchall() print(f' 源 resource 行数: {len(ress)}', flush=True) inserted = 0 for r in ress: new_id = remap(r['id']) cur.execute("""INSERT INTO resource (id, title, body, secure_body, content_type, metadata, sort_order, submitted_by, created_at, updated_at, images, version) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'dev_dedup') ON CONFLICT (id) DO NOTHING""", (new_id, r['title'], r['body'], r['secure_body'], r['content_type'], psycopg2.extras.Json(r['metadata']) if r['metadata'] is not None else None, r['sort_order'], r['submitted_by'], r['created_at'], r['updated_at'], psycopg2.extras.Json(r['images']) if r['images'] is not None else None)) inserted += cur.rowcount or 0 print(f' inserted: {inserted}', flush=True) # ===== 3. requirement_capability 复制(remap cap_id)===== print('\n[3] 复制 requirement_capability(remap cap_id)', flush=True) cur.execute("""SELECT rc.requirement_id, rc.capability_id FROM requirement_capability rc JOIN capability c ON c.id=rc.capability_id WHERE c.version='dev_abstract'""") rc_rows = cur.fetchall() print(f' 源 req_cap 行数: {len(rc_rows)}', flush=True) inserted = 0 for row in rc_rows: cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (row['requirement_id'], remap(row['capability_id']))) inserted += cur.rowcount or 0 print(f' inserted: {inserted}', flush=True) # ===== 4. requirement_resource 复制(remap resource_id)===== print('\n[4] 复制 requirement_resource(remap resource_id)', flush=True) cur.execute("""SELECT rr.requirement_id, rr.resource_id FROM requirement_resource rr JOIN resource r ON r.id=rr.resource_id WHERE r.version='dev_abstract'""") rr_rows = cur.fetchall() print(f' 源 req_resource 行数: {len(rr_rows)}', flush=True) inserted = 0 for row in rr_rows: cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (row['requirement_id'], remap(row['resource_id']))) inserted += cur.rowcount or 0 print(f' inserted: {inserted}', flush=True) # ===== 5. strategy 从 backup 恢复 ===== print('\n[5] strategy 从 bk_20260422_strategy 恢复', flush=True) cur.execute("""SELECT id, name, description, body, status, created_at, updated_at FROM bk_20260422_strategy""") bk_strats = cur.fetchall() print(f' 备份 strategy 行数: {len(bk_strats)}', flush=True) inserted = 0 for st in bk_strats: cur.execute("""INSERT INTO strategy (id, name, description, body, status, created_at, updated_at, version) VALUES (%s,%s,%s,%s,%s,%s,%s,'dev_dedup') ON CONFLICT (id) DO NOTHING""", (st['id'], st['name'], st['description'], psycopg2.extras.Json(st['body']) if st['body'] is not None else None, st['status'], st['created_at'], st['updated_at'])) inserted += cur.rowcount or 0 print(f' inserted: {inserted}', flush=True) # ===== 6. requirement_strategy 从 backup 恢复 ===== print('\n[6] requirement_strategy 从备份恢复(is_selected=TRUE)', flush=True) cur.execute("""SELECT column_name FROM information_schema.columns WHERE table_name='bk_20260422_requirement_strategy' ORDER BY ordinal_position""") bk_rs_cols = [r['column_name'] for r in cur.fetchall()] print(f' bk_rs 列: {bk_rs_cols}', flush=True) cur.execute(f"SELECT * FROM bk_20260422_requirement_strategy") bk_rs = cur.fetchall() print(f' bk req_strategy 行数: {len(bk_rs)}', flush=True) inserted = 0 # live req_strategy 目前有 (req_id, strat_id, [role], is_selected, coverage_score, coverage_explanation) cur.execute("""SELECT column_name FROM information_schema.columns WHERE table_name='requirement_strategy' ORDER BY ordinal_position""") live_rs_cols = [r['column_name'] for r in cur.fetchall()] print(f' live_rs 列: {live_rs_cols}', flush=True) for row in bk_rs: # 只取 requirement_id / strategy_id;is_selected 默认 TRUE(备份都是选中的) # 其他 coverage 字段留 NULL cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id, is_selected) VALUES (%s, %s, TRUE) ON CONFLICT DO NOTHING""", (row['requirement_id'], row['strategy_id'])) inserted += cur.rowcount or 0 print(f' inserted: {inserted}', flush=True) # ===== 7. strategy_capability 从 backup 恢复(remap cap_id)===== print('\n[7] strategy_capability 从备份恢复(remap cap_id)', flush=True) cur.execute("SELECT strategy_id, capability_id FROM bk_20260422_strategy_capability") bk_sc = cur.fetchall() print(f' bk strat_cap 行数: {len(bk_sc)}', flush=True) inserted = 0 skipped_nocap = 0 # 检查哪些 backup cap_id 存在于当前 capability(dev_abstract) cur.execute("SELECT id FROM capability WHERE version='dev_abstract'") valid_old_caps = {r['id'] for r in cur.fetchall()} for row in bk_sc: if row['capability_id'] not in valid_old_caps: skipped_nocap += 1; continue cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (row['strategy_id'], remap(row['capability_id']))) inserted += cur.rowcount or 0 print(f' inserted: {inserted} (skipped: {skipped_nocap} cap_id 不在当前 cap 表)', flush=True) # ===== 8. strategy_resource 从 backup 恢复(remap resource_id)===== print('\n[8] strategy_resource 从备份恢复(remap resource_id)', flush=True) cur.execute("SELECT strategy_id, resource_id FROM bk_20260422_strategy_resource") bk_sr = cur.fetchall() print(f' bk strat_resource 行数: {len(bk_sr)}', flush=True) inserted = 0; skipped_nores = 0 cur.execute("SELECT id FROM resource WHERE version='dev_abstract'") valid_old_ress = {r['id'] for r in cur.fetchall()} for row in bk_sr: if row['resource_id'] not in valid_old_ress: skipped_nores += 1; continue cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id) VALUES (%s, %s) ON CONFLICT DO NOTHING""", (row['strategy_id'], remap(row['resource_id']))) inserted += cur.rowcount or 0 print(f' inserted: {inserted} (skipped: {skipped_nores} resource_id 不在当前 resource 表)', flush=True) # ===== 汇总 ===== print('\n=== 迁移后汇总 ===', flush=True) for tbl in ['capability', 'resource', 'strategy', 'knowledge']: cur.execute(f"SELECT version, COUNT(*) n FROM {tbl} GROUP BY version ORDER BY n DESC") parts = [f'{r["version"]}={r["n"]}' for r in cur.fetchall()] print(f' {tbl}: {", ".join(parts)}', flush=True) for tbl in ['requirement_capability', 'requirement_resource', 'requirement_strategy', 'strategy_capability', 'strategy_resource']: cur.execute(f'SELECT COUNT(*) c FROM {tbl}') print(f' {tbl}: {cur.fetchone()["c"]}', flush=True) finally: cur.close(); s.close() if __name__ == '__main__': main()