version_step2_duplicate_dev_dedup.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. #!/usr/bin/env python3
  2. """
  3. Step 2: 为 dev_dedup 版本做严格冗余复制 + 从 bk_20260422_* 恢复原具体 strategy 数据。
  4. ID 命名规则:
  5. - 复制的 capability / resource 行 ID 加 '__dd' 后缀
  6. - strategy 从备份恢复,沿用原 ID(已从主表 DELETE,无冲突)
  7. 执行顺序(FK 依赖):
  8. 1. capability (332 → dev_dedup 新 ID)
  9. 2. resource (2539 → dev_dedup 新 ID)
  10. 3. requirement_capability (1106 → remap cap_id 到 dev_dedup)
  11. 4. requirement_resource (2736 → remap resource_id 到 dev_dedup)
  12. 5. strategy (99 from bk,version 改 dev_dedup)
  13. 6. requirement_strategy (99 from bk,is_selected 默认 TRUE,coverage 空)
  14. 7. strategy_capability (703 from bk,remap cap_id)
  15. 8. strategy_resource (2736 from bk,remap resource_id)
  16. """
  17. import sys, time
  18. from pathlib import Path
  19. import psycopg2.extras
  20. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  21. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  22. SUFFIX = '__dd'
  23. def remap(old_id: str) -> str:
  24. return f'{old_id}{SUFFIX}'
  25. def t(label, fn):
  26. t0 = time.time()
  27. r = fn()
  28. print(f' [{time.time()-t0:.1f}s] {label}', flush=True)
  29. return r
  30. def main():
  31. s = PostgreSQLCapabilityStore()
  32. cur = s._get_cursor()
  33. try:
  34. cur.execute("SET statement_timeout = '120s'")
  35. cur.execute("""SELECT pid FROM pg_stat_activity WHERE state='idle in transaction'
  36. AND pid!=pg_backend_pid() AND datname=current_database()""")
  37. for r in cur.fetchall():
  38. cur.execute("SELECT pg_terminate_backend(%s)", (r['pid'],))
  39. # ===== 1. capability 复制 =====
  40. print('\n[1] 复制 capability 给 dev_dedup', flush=True)
  41. cur.execute("""SELECT id, name, criterion, description, effects
  42. FROM capability WHERE version='dev_abstract'""")
  43. caps = cur.fetchall()
  44. print(f' 源 cap 行数: {len(caps)}', flush=True)
  45. inserted = 0
  46. for c in caps:
  47. new_id = remap(c['id'])
  48. cur.execute("""INSERT INTO capability (id, name, criterion, description, effects, version)
  49. VALUES (%s, %s, %s, %s, %s, 'dev_dedup') ON CONFLICT (id) DO NOTHING""",
  50. (new_id, c['name'], c['criterion'], c['description'],
  51. psycopg2.extras.Json(c['effects']) if c['effects'] is not None else None))
  52. inserted += cur.rowcount or 0
  53. print(f' inserted: {inserted}', flush=True)
  54. # ===== 2. resource 复制 =====
  55. print('\n[2] 复制 resource 给 dev_dedup', flush=True)
  56. cur.execute("""SELECT id, title, body, secure_body, content_type, metadata,
  57. sort_order, submitted_by, created_at, updated_at, images
  58. FROM resource WHERE version='dev_abstract'""")
  59. ress = cur.fetchall()
  60. print(f' 源 resource 行数: {len(ress)}', flush=True)
  61. inserted = 0
  62. for r in ress:
  63. new_id = remap(r['id'])
  64. cur.execute("""INSERT INTO resource (id, title, body, secure_body, content_type,
  65. metadata, sort_order, submitted_by, created_at,
  66. updated_at, images, version)
  67. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,'dev_dedup')
  68. ON CONFLICT (id) DO NOTHING""",
  69. (new_id, r['title'], r['body'], r['secure_body'], r['content_type'],
  70. psycopg2.extras.Json(r['metadata']) if r['metadata'] is not None else None,
  71. r['sort_order'], r['submitted_by'], r['created_at'], r['updated_at'],
  72. psycopg2.extras.Json(r['images']) if r['images'] is not None else None))
  73. inserted += cur.rowcount or 0
  74. print(f' inserted: {inserted}', flush=True)
  75. # ===== 3. requirement_capability 复制(remap cap_id)=====
  76. print('\n[3] 复制 requirement_capability(remap cap_id)', flush=True)
  77. cur.execute("""SELECT rc.requirement_id, rc.capability_id
  78. FROM requirement_capability rc
  79. JOIN capability c ON c.id=rc.capability_id
  80. WHERE c.version='dev_abstract'""")
  81. rc_rows = cur.fetchall()
  82. print(f' 源 req_cap 行数: {len(rc_rows)}', flush=True)
  83. inserted = 0
  84. for row in rc_rows:
  85. cur.execute("""INSERT INTO requirement_capability (requirement_id, capability_id)
  86. VALUES (%s, %s) ON CONFLICT DO NOTHING""",
  87. (row['requirement_id'], remap(row['capability_id'])))
  88. inserted += cur.rowcount or 0
  89. print(f' inserted: {inserted}', flush=True)
  90. # ===== 4. requirement_resource 复制(remap resource_id)=====
  91. print('\n[4] 复制 requirement_resource(remap resource_id)', flush=True)
  92. cur.execute("""SELECT rr.requirement_id, rr.resource_id
  93. FROM requirement_resource rr
  94. JOIN resource r ON r.id=rr.resource_id
  95. WHERE r.version='dev_abstract'""")
  96. rr_rows = cur.fetchall()
  97. print(f' 源 req_resource 行数: {len(rr_rows)}', flush=True)
  98. inserted = 0
  99. for row in rr_rows:
  100. cur.execute("""INSERT INTO requirement_resource (requirement_id, resource_id)
  101. VALUES (%s, %s) ON CONFLICT DO NOTHING""",
  102. (row['requirement_id'], remap(row['resource_id'])))
  103. inserted += cur.rowcount or 0
  104. print(f' inserted: {inserted}', flush=True)
  105. # ===== 5. strategy 从 backup 恢复 =====
  106. print('\n[5] strategy 从 bk_20260422_strategy 恢复', flush=True)
  107. cur.execute("""SELECT id, name, description, body, status, created_at, updated_at
  108. FROM bk_20260422_strategy""")
  109. bk_strats = cur.fetchall()
  110. print(f' 备份 strategy 行数: {len(bk_strats)}', flush=True)
  111. inserted = 0
  112. for st in bk_strats:
  113. cur.execute("""INSERT INTO strategy (id, name, description, body, status,
  114. created_at, updated_at, version)
  115. VALUES (%s,%s,%s,%s,%s,%s,%s,'dev_dedup')
  116. ON CONFLICT (id) DO NOTHING""",
  117. (st['id'], st['name'], st['description'],
  118. psycopg2.extras.Json(st['body']) if st['body'] is not None else None,
  119. st['status'], st['created_at'], st['updated_at']))
  120. inserted += cur.rowcount or 0
  121. print(f' inserted: {inserted}', flush=True)
  122. # ===== 6. requirement_strategy 从 backup 恢复 =====
  123. print('\n[6] requirement_strategy 从备份恢复(is_selected=TRUE)', flush=True)
  124. cur.execute("""SELECT column_name FROM information_schema.columns
  125. WHERE table_name='bk_20260422_requirement_strategy' ORDER BY ordinal_position""")
  126. bk_rs_cols = [r['column_name'] for r in cur.fetchall()]
  127. print(f' bk_rs 列: {bk_rs_cols}', flush=True)
  128. cur.execute(f"SELECT * FROM bk_20260422_requirement_strategy")
  129. bk_rs = cur.fetchall()
  130. print(f' bk req_strategy 行数: {len(bk_rs)}', flush=True)
  131. inserted = 0
  132. # live req_strategy 目前有 (req_id, strat_id, [role], is_selected, coverage_score, coverage_explanation)
  133. cur.execute("""SELECT column_name FROM information_schema.columns
  134. WHERE table_name='requirement_strategy' ORDER BY ordinal_position""")
  135. live_rs_cols = [r['column_name'] for r in cur.fetchall()]
  136. print(f' live_rs 列: {live_rs_cols}', flush=True)
  137. for row in bk_rs:
  138. # 只取 requirement_id / strategy_id;is_selected 默认 TRUE(备份都是选中的)
  139. # 其他 coverage 字段留 NULL
  140. cur.execute("""INSERT INTO requirement_strategy (requirement_id, strategy_id, is_selected)
  141. VALUES (%s, %s, TRUE) ON CONFLICT DO NOTHING""",
  142. (row['requirement_id'], row['strategy_id']))
  143. inserted += cur.rowcount or 0
  144. print(f' inserted: {inserted}', flush=True)
  145. # ===== 7. strategy_capability 从 backup 恢复(remap cap_id)=====
  146. print('\n[7] strategy_capability 从备份恢复(remap cap_id)', flush=True)
  147. cur.execute("SELECT strategy_id, capability_id FROM bk_20260422_strategy_capability")
  148. bk_sc = cur.fetchall()
  149. print(f' bk strat_cap 行数: {len(bk_sc)}', flush=True)
  150. inserted = 0
  151. skipped_nocap = 0
  152. # 检查哪些 backup cap_id 存在于当前 capability(dev_abstract)
  153. cur.execute("SELECT id FROM capability WHERE version='dev_abstract'")
  154. valid_old_caps = {r['id'] for r in cur.fetchall()}
  155. for row in bk_sc:
  156. if row['capability_id'] not in valid_old_caps:
  157. skipped_nocap += 1; continue
  158. cur.execute("""INSERT INTO strategy_capability (strategy_id, capability_id)
  159. VALUES (%s, %s) ON CONFLICT DO NOTHING""",
  160. (row['strategy_id'], remap(row['capability_id'])))
  161. inserted += cur.rowcount or 0
  162. print(f' inserted: {inserted} (skipped: {skipped_nocap} cap_id 不在当前 cap 表)', flush=True)
  163. # ===== 8. strategy_resource 从 backup 恢复(remap resource_id)=====
  164. print('\n[8] strategy_resource 从备份恢复(remap resource_id)', flush=True)
  165. cur.execute("SELECT strategy_id, resource_id FROM bk_20260422_strategy_resource")
  166. bk_sr = cur.fetchall()
  167. print(f' bk strat_resource 行数: {len(bk_sr)}', flush=True)
  168. inserted = 0; skipped_nores = 0
  169. cur.execute("SELECT id FROM resource WHERE version='dev_abstract'")
  170. valid_old_ress = {r['id'] for r in cur.fetchall()}
  171. for row in bk_sr:
  172. if row['resource_id'] not in valid_old_ress:
  173. skipped_nores += 1; continue
  174. cur.execute("""INSERT INTO strategy_resource (strategy_id, resource_id)
  175. VALUES (%s, %s) ON CONFLICT DO NOTHING""",
  176. (row['strategy_id'], remap(row['resource_id'])))
  177. inserted += cur.rowcount or 0
  178. print(f' inserted: {inserted} (skipped: {skipped_nores} resource_id 不在当前 resource 表)', flush=True)
  179. # ===== 汇总 =====
  180. print('\n=== 迁移后汇总 ===', flush=True)
  181. for tbl in ['capability', 'resource', 'strategy', 'knowledge']:
  182. cur.execute(f"SELECT version, COUNT(*) n FROM {tbl} GROUP BY version ORDER BY n DESC")
  183. parts = [f'{r["version"]}={r["n"]}' for r in cur.fetchall()]
  184. print(f' {tbl}: {", ".join(parts)}', flush=True)
  185. for tbl in ['requirement_capability', 'requirement_resource',
  186. 'requirement_strategy', 'strategy_capability', 'strategy_resource']:
  187. cur.execute(f'SELECT COUNT(*) c FROM {tbl}')
  188. print(f' {tbl}: {cur.fetchone()["c"]}', flush=True)
  189. finally:
  190. cur.close(); s.close()
  191. if __name__ == '__main__':
  192. main()