migrate_add_version_and_patterns.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. #!/usr/bin/env python3
  2. """
  3. Migration 续:
  4. 1. 给 requirement_node / requirement_pattern 加 `version` 列,并把 version 并入 PK
  5. - 老数据(requirement_node 210 行)默认 version='v0'
  6. 2. 从 frontend/public/requirements_planb.json 回填 requirement_pattern
  7. - version='ruotian'
  8. - execution_id=56
  9. 按 db-operations.md:
  10. - autocommit=True
  11. - SET statement_timeout='30s'
  12. - ADD COLUMN NOT NULL DEFAULT 拆两步:先 DEFAULT(补齐数据),再 SET NOT NULL
  13. - DROP CONSTRAINT + ADD PRIMARY KEY 替换主键(不使用 RENAME / DROP COLUMN)
  14. """
  15. import json
  16. import sys
  17. import time
  18. from pathlib import Path
  19. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  20. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  21. EXECUTION_ID = 56
  22. PATTERN_VERSION = 'ruotian'
  23. PLANB_PATH = Path('/Users/sunlit/Code/Agent/knowhub/frontend/public/requirements_planb.json')
  24. def log(msg):
  25. print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True)
  26. def kill_idle_in_tx(cur):
  27. cur.execute("""
  28. SELECT pid FROM pg_stat_activity
  29. WHERE state='idle in transaction'
  30. AND pid != pg_backend_pid()
  31. AND datname = current_database()
  32. """)
  33. pids = [r['pid'] for r in cur.fetchall()]
  34. for pid in pids:
  35. cur.execute('SELECT pg_terminate_backend(%s)', (pid,))
  36. log(f' killed idle-in-tx sessions: {pids or "none"}')
  37. def add_version_and_rebuild_pk(cur, table, pk_cols):
  38. log(f'[{table}] ADD COLUMN version VARCHAR(32) DEFAULT v0')
  39. cur.execute(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS version VARCHAR(32) DEFAULT 'v0'")
  40. log(f' ✓ done')
  41. # 补齐空值(幂等)
  42. cur.execute(f"UPDATE {table} SET version = 'v0' WHERE version IS NULL")
  43. log(f' filled-in NULL -> v0: {cur.rowcount or 0} rows')
  44. log(f'[{table}] ALTER COLUMN version SET NOT NULL')
  45. cur.execute(f"ALTER TABLE {table} ALTER COLUMN version SET NOT NULL")
  46. log(f' ✓ done')
  47. log(f'[{table}] DROP existing PK constraint')
  48. cur.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {table}_pkey")
  49. log(f' ✓ done')
  50. cols = ', '.join(pk_cols)
  51. log(f'[{table}] ADD PRIMARY KEY ({cols})')
  52. cur.execute(f"ALTER TABLE {table} ADD PRIMARY KEY ({cols})")
  53. log(f' ✓ done')
  54. def backfill_requirement_pattern(cur):
  55. data = json.loads(PLANB_PATH.read_text(encoding='utf-8'))
  56. reqs = data['requirements']
  57. log(f'Loaded planb: {len(reqs)} requirements (source: {data["source"]})')
  58. # 筛已在 DB 的 requirement_id
  59. cur.execute('SELECT id FROM requirement')
  60. valid_ids = {r['id'] for r in cur.fetchall()}
  61. inserted = 0
  62. missing_reqs = []
  63. total_pairs = 0
  64. reqs_with_rows = 0
  65. for r in reqs:
  66. rid = r.get('requirement_id')
  67. if rid not in valid_ids:
  68. missing_reqs.append(rid)
  69. continue
  70. patterns = r.get('patterns') or []
  71. pattern_ids = {p.get('pattern_id') for p in patterns if p.get('pattern_id') is not None}
  72. if pattern_ids:
  73. reqs_with_rows += 1
  74. total_pairs += len(pattern_ids)
  75. for pid in pattern_ids:
  76. cur.execute("""
  77. INSERT INTO requirement_pattern (requirement_id, itemset_id, execution_id, version)
  78. VALUES (%s, %s, %s, %s)
  79. ON CONFLICT DO NOTHING
  80. """, (rid, pid, EXECUTION_ID, PATTERN_VERSION))
  81. inserted += cur.rowcount or 0
  82. log(f'Pattern backfill complete:')
  83. log(f' reqs in planb: {len(reqs)}')
  84. log(f' reqs present in DB: {len(reqs) - len(missing_reqs)}')
  85. log(f' reqs with ≥1 pattern row: {reqs_with_rows}')
  86. log(f' total (req,pattern) distinct: {total_pairs}')
  87. log(f' rows inserted (after conflict):{inserted}')
  88. if missing_reqs:
  89. log(f' ⚠️ reqs in planb not in DB ({len(missing_reqs)}): {missing_reqs[:10]}')
  90. def verify(cur):
  91. for t in ('requirement_node', 'requirement_pattern'):
  92. cur.execute(f'SELECT COUNT(*) c FROM {t}')
  93. total = cur.fetchone()['c']
  94. cur.execute(f'SELECT version, COUNT(*) c FROM {t} GROUP BY version ORDER BY version')
  95. by_ver = [(r['version'], r['c']) for r in cur.fetchall()]
  96. log(f'{t}: total={total}, by version: {by_ver}')
  97. def main():
  98. s = PostgreSQLCapabilityStore()
  99. cur = s._get_cursor()
  100. try:
  101. cur.execute("SET statement_timeout = '30s'")
  102. kill_idle_in_tx(cur)
  103. add_version_and_rebuild_pk(
  104. cur, 'requirement_node',
  105. ['requirement_id', 'node_id', 'execution_id', 'version'],
  106. )
  107. add_version_and_rebuild_pk(
  108. cur, 'requirement_pattern',
  109. ['requirement_id', 'itemset_id', 'execution_id', 'version'],
  110. )
  111. backfill_requirement_pattern(cur)
  112. verify(cur)
  113. finally:
  114. cur.close()
  115. s.close()
  116. if __name__ == '__main__':
  117. main()