| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- #!/usr/bin/env python3
- """
- Step 2: 为 dev_dedup 版本做严格冗余复制 + 从 bk_20260422_* 恢复原具体 strategy 数据。
- ID 命名规则:
- - 复制的 capability / resource 行 ID 加 '__dd' 后缀
- - strategy 从备份恢复,沿用原 ID(已从主表 DELETE,无冲突)
- 执行顺序(FK 依赖):
- 1. capability (332 → dev_dedup 新 ID)
- 2. resource (2539 → dev_dedup 新 ID)
- 3. requirement_capability (1106 → remap cap_id 到 dev_dedup)
- 4. requirement_resource (2736 → remap resource_id 到 dev_dedup)
- 5. strategy (99 from bk,version 改 dev_dedup)
- 6. requirement_strategy (99 from bk,is_selected 默认 TRUE,coverage 空)
- 7. strategy_capability (703 from bk,remap cap_id)
- 8. strategy_resource (2736 from bk,remap resource_id)
- """
- import sys, time
- from pathlib import Path
- import psycopg2.extras
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- SUFFIX = '__dd'
- def remap(old_id: str) -> str:
- return f'{old_id}{SUFFIX}'
- def t(label, fn):
- t0 = time.time()
- r = fn()
- print(f' [{time.time()-t0:.1f}s] {label}', flush=True)
- return r
- def main():
- s = PostgreSQLCapabilityStore()
- cur = s._get_cursor()
- try:
- cur.execute("SET statement_timeout = '120s'")
- cur.execute("""SELECT pid FROM pg_stat_activity WHERE state='idle in transaction'
- AND pid!=pg_backend_pid() AND datname=current_database()""")
- for r in cur.fetchall():
- cur.execute("SELECT pg_terminate_backend(%s)", (r['pid'],))
- # ===== 1. capability 复制 =====
- print('\n[1] 复制 capability 给 dev_dedup', flush=True)
- cur.execute("""SELECT id, name, criterion, description, effects
- FROM capability WHERE version='dev_abstract'""")
- caps = cur.fetchall()
- print(f' 源 cap 行数: {len(caps)}', flush=True)
- inserted = 0
- for c in caps:
- new_id = remap(c['id'])
- cur.execute("""INSERT INTO capability (id, name, criterion, description, effects, version)
- VALUES (%s, %s, %s, %s, %s, 'dev_dedup') ON CONFLICT (id) DO NOTHING""",
- (new_id, c['name'], c['criterion'], c['description'],
- psycopg2.extras.Json(c['effects']) if c['effects'] is not None else None))
- inserted += cur.rowcount or 0
- print(f' inserted: {inserted}', flush=True)
- # ===== 2. resource 复制 =====
- print('\n[2] 复制 resource 给 dev_dedup', flush=True)
- cur.execute("""SELECT id, title, body, secure_body, content_type, metadata,
- sort_order, submitted_by, created_at, updated_at, images
- FROM resource WHERE version='dev_abstract'""")
- ress = cur.fetchall()
- print(f' 源 resource 行数: {len(ress)}', flush=True)
- inserted = 0
- for r in ress:
- new_id = remap(r['id'])
- cur.execute("""INSERT INTO resource (id, title, body, secure_body, content_type,
- metadata, sort_order, submitted_by, created_at,
- updated_at, images, version)
- VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'dev_dedup')
- ON CONFLICT (id) DO NOTHING""",
- (new_id, r['title'], r['body'], r['secure_body'], r['content_type'],
- psycopg2.extras.Json(r['metadata']) if r['metadata'] is not None else None,
- r['sort_order'], r['submitted_by'], r['created_at'], r['updated_at'],
- psycopg2.extras.Json(r['images']) if r['images'] is not None else None))
- inserted += cur.rowcount or 0
- print(f' inserted: {inserted}', flush=True)
- # ===== 3. requirement_capability 复制(remap cap_id)=====
- print('\n[3] 复制 requirement_capability(remap cap_id)', flush=True)
- cur.execute("""SELECT rc.requirement_id, rc.capability_id
- FROM requirement_capability rc
- JOIN capability c ON c.id=rc.capability_id
- WHERE c.version='dev_abstract'""")
- rc_rows = cur.fetchall()
- print(f' 源 req_cap 行数: {len(rc_rows)}', flush=True)
- inserted = 0
- for row in rc_rows:
- cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""",
- (row['requirement_id'], remap(row['capability_id'])))
- inserted += cur.rowcount or 0
- print(f' inserted: {inserted}', flush=True)
- # ===== 4. requirement_resource 复制(remap resource_id)=====
- print('\n[4] 复制 requirement_resource(remap resource_id)', flush=True)
- cur.execute("""SELECT rr.requirement_id, rr.resource_id
- FROM requirement_resource rr
- JOIN resource r ON r.id=rr.resource_id
- WHERE r.version='dev_abstract'""")
- rr_rows = cur.fetchall()
- print(f' 源 req_resource 行数: {len(rr_rows)}', flush=True)
- inserted = 0
- for row in rr_rows:
- cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""",
- (row['requirement_id'], remap(row['resource_id'])))
- inserted += cur.rowcount or 0
- print(f' inserted: {inserted}', flush=True)
- # ===== 5. strategy 从 backup 恢复 =====
- print('\n[5] strategy 从 bk_20260422_strategy 恢复', flush=True)
- cur.execute("""SELECT id, name, description, body, status, created_at, updated_at
- FROM bk_20260422_strategy""")
- bk_strats = cur.fetchall()
- print(f' 备份 strategy 行数: {len(bk_strats)}', flush=True)
- inserted = 0
- for st in bk_strats:
- cur.execute("""INSERT INTO strategy (id, name, description, body, status,
- created_at, updated_at, version)
- VALUES (%s,%s,%s,%s,%s,%s,%s,'dev_dedup')
- ON CONFLICT (id) DO NOTHING""",
- (st['id'], st['name'], st['description'],
- psycopg2.extras.Json(st['body']) if st['body'] is not None else None,
- st['status'], st['created_at'], st['updated_at']))
- inserted += cur.rowcount or 0
- print(f' inserted: {inserted}', flush=True)
- # ===== 6. requirement_strategy 从 backup 恢复 =====
- print('\n[6] requirement_strategy 从备份恢复(is_selected=TRUE)', flush=True)
- cur.execute("""SELECT column_name FROM information_schema.columns
- WHERE table_name='bk_20260422_requirement_strategy' ORDER BY ordinal_position""")
- bk_rs_cols = [r['column_name'] for r in cur.fetchall()]
- print(f' bk_rs 列: {bk_rs_cols}', flush=True)
- cur.execute(f"SELECT * FROM bk_20260422_requirement_strategy")
- bk_rs = cur.fetchall()
- print(f' bk req_strategy 行数: {len(bk_rs)}', flush=True)
- inserted = 0
- # live req_strategy 目前有 (req_id, strat_id, [role], is_selected, coverage_score, coverage_explanation)
- cur.execute("""SELECT column_name FROM information_schema.columns
- WHERE table_name='requirement_strategy' ORDER BY ordinal_position""")
- live_rs_cols = [r['column_name'] for r in cur.fetchall()]
- print(f' live_rs 列: {live_rs_cols}', flush=True)
- for row in bk_rs:
- # 只取 requirement_id / strategy_id;is_selected 默认 TRUE(备份都是选中的)
- # 其他 coverage 字段留 NULL
- cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id, is_selected)
- VALUES (%s, %s, TRUE) ON CONFLICT DO NOTHING""",
- (row['requirement_id'], row['strategy_id']))
- inserted += cur.rowcount or 0
- print(f' inserted: {inserted}', flush=True)
- # ===== 7. strategy_capability 从 backup 恢复(remap cap_id)=====
- print('\n[7] strategy_capability 从备份恢复(remap cap_id)', flush=True)
- cur.execute("SELECT strategy_id, capability_id FROM bk_20260422_strategy_capability")
- bk_sc = cur.fetchall()
- print(f' bk strat_cap 行数: {len(bk_sc)}', flush=True)
- inserted = 0
- skipped_nocap = 0
- # 检查哪些 backup cap_id 存在于当前 capability(dev_abstract)
- cur.execute("SELECT id FROM capability WHERE version='dev_abstract'")
- valid_old_caps = {r['id'] for r in cur.fetchall()}
- for row in bk_sc:
- if row['capability_id'] not in valid_old_caps:
- skipped_nocap += 1; continue
- cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""",
- (row['strategy_id'], remap(row['capability_id'])))
- inserted += cur.rowcount or 0
- print(f' inserted: {inserted} (skipped: {skipped_nocap} cap_id 不在当前 cap 表)', flush=True)
- # ===== 8. strategy_resource 从 backup 恢复(remap resource_id)=====
- print('\n[8] strategy_resource 从备份恢复(remap resource_id)', flush=True)
- cur.execute("SELECT strategy_id, resource_id FROM bk_20260422_strategy_resource")
- bk_sr = cur.fetchall()
- print(f' bk strat_resource 行数: {len(bk_sr)}', flush=True)
- inserted = 0; skipped_nores = 0
- cur.execute("SELECT id FROM resource WHERE version='dev_abstract'")
- valid_old_ress = {r['id'] for r in cur.fetchall()}
- for row in bk_sr:
- if row['resource_id'] not in valid_old_ress:
- skipped_nores += 1; continue
- cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
- VALUES (%s, %s) ON CONFLICT DO NOTHING""",
- (row['strategy_id'], remap(row['resource_id'])))
- inserted += cur.rowcount or 0
- print(f' inserted: {inserted} (skipped: {skipped_nores} resource_id 不在当前 resource 表)', flush=True)
- # ===== 汇总 =====
- print('\n=== 迁移后汇总 ===', flush=True)
- for tbl in ['capability', 'resource', 'strategy', 'knowledge']:
- cur.execute(f"SELECT version, COUNT(*) n FROM {tbl} GROUP BY version ORDER BY n DESC")
- parts = [f'{r["version"]}={r["n"]}' for r in cur.fetchall()]
- print(f' {tbl}: {", ".join(parts)}', flush=True)
- for tbl in ['requirement_capability', 'requirement_resource',
- 'requirement_strategy', 'strategy_capability', 'strategy_resource']:
- cur.execute(f'SELECT COUNT(*) c FROM {tbl}')
- print(f' {tbl}: {cur.fetchone()["c"]}', flush=True)
- finally:
- cur.close(); s.close()
- if __name__ == '__main__':
- main()
|