| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- #!/usr/bin/env python3
- """
- Migration 续:
- 1. 给 requirement_node / requirement_pattern 加 `version` 列,并把 version 并入 PK
- - 老数据(requirement_node 210 行)默认 version='v0'
- 2. 从 frontend/public/requirements_planb.json 回填 requirement_pattern
- - version='ruotian'
- - execution_id=56
- 按 db-operations.md:
- - autocommit=True
- - SET statement_timeout='30s'
- - ADD COLUMN NOT NULL DEFAULT 拆两步:先 DEFAULT(补齐数据),再 SET NOT NULL
- - DROP CONSTRAINT + ADD PRIMARY KEY 替换主键(不使用 RENAME / DROP COLUMN)
- """
- import json
- import sys
- import time
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- EXECUTION_ID = 56
- PATTERN_VERSION = 'ruotian'
- PLANB_PATH = Path('/Users/sunlit/Code/Agent/knowhub/frontend/public/requirements_planb.json')
- def log(msg):
- print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True)
- def kill_idle_in_tx(cur):
- cur.execute("""
- SELECT pid FROM pg_stat_activity
- WHERE state='idle in transaction'
- AND pid != pg_backend_pid()
- AND datname = current_database()
- """)
- pids = [r['pid'] for r in cur.fetchall()]
- for pid in pids:
- cur.execute('SELECT pg_terminate_backend(%s)', (pid,))
- log(f' killed idle-in-tx sessions: {pids or "none"}')
- def add_version_and_rebuild_pk(cur, table, pk_cols):
- log(f'[{table}] ADD COLUMN version VARCHAR(32) DEFAULT v0')
- cur.execute(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS version VARCHAR(32) DEFAULT 'v0'")
- log(f' ✓ done')
- # 补齐空值(幂等)
- cur.execute(f"UPDATE {table} SET version = 'v0' WHERE version IS NULL")
- log(f' filled-in NULL -> v0: {cur.rowcount or 0} rows')
- log(f'[{table}] ALTER COLUMN version SET NOT NULL')
- cur.execute(f"ALTER TABLE {table} ALTER COLUMN version SET NOT NULL")
- log(f' ✓ done')
- log(f'[{table}] DROP existing PK constraint')
- cur.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {table}_pkey")
- log(f' ✓ done')
- cols = ', '.join(pk_cols)
- log(f'[{table}] ADD PRIMARY KEY ({cols})')
- cur.execute(f"ALTER TABLE {table} ADD PRIMARY KEY ({cols})")
- log(f' ✓ done')
- def backfill_requirement_pattern(cur):
- data = json.loads(PLANB_PATH.read_text(encoding='utf-8'))
- reqs = data['requirements']
- log(f'Loaded planb: {len(reqs)} requirements (source: {data["source"]})')
- # 筛已在 DB 的 requirement_id
- cur.execute('SELECT id FROM requirement')
- valid_ids = {r['id'] for r in cur.fetchall()}
- inserted = 0
- missing_reqs = []
- total_pairs = 0
- reqs_with_rows = 0
- for r in reqs:
- rid = r.get('requirement_id')
- if rid not in valid_ids:
- missing_reqs.append(rid)
- continue
- patterns = r.get('patterns') or []
- pattern_ids = {p.get('pattern_id') for p in patterns if p.get('pattern_id') is not None}
- if pattern_ids:
- reqs_with_rows += 1
- total_pairs += len(pattern_ids)
- for pid in pattern_ids:
- cur.execute("""
- INSERT INTO requirement_pattern (requirement_id, itemset_id, execution_id, version)
- VALUES (%s, %s, %s, %s)
- ON CONFLICT DO NOTHING
- """, (rid, pid, EXECUTION_ID, PATTERN_VERSION))
- inserted += cur.rowcount or 0
- log(f'Pattern backfill complete:')
- log(f' reqs in planb: {len(reqs)}')
- log(f' reqs present in DB: {len(reqs) - len(missing_reqs)}')
- log(f' reqs with ≥1 pattern row: {reqs_with_rows}')
- log(f' total (req,pattern) distinct: {total_pairs}')
- log(f' rows inserted (after conflict):{inserted}')
- if missing_reqs:
- log(f' ⚠️ reqs in planb not in DB ({len(missing_reqs)}): {missing_reqs[:10]}')
- def verify(cur):
- for t in ('requirement_node', 'requirement_pattern'):
- cur.execute(f'SELECT COUNT(*) c FROM {t}')
- total = cur.fetchone()['c']
- cur.execute(f'SELECT version, COUNT(*) c FROM {t} GROUP BY version ORDER BY version')
- by_ver = [(r['version'], r['c']) for r in cur.fetchall()]
- log(f'{t}: total={total}, by version: {by_ver}')
- def main():
- s = PostgreSQLCapabilityStore()
- cur = s._get_cursor()
- try:
- cur.execute("SET statement_timeout = '30s'")
- kill_idle_in_tx(cur)
- add_version_and_rebuild_pk(
- cur, 'requirement_node',
- ['requirement_id', 'node_id', 'execution_id', 'version'],
- )
- add_version_and_rebuild_pk(
- cur, 'requirement_pattern',
- ['requirement_id', 'itemset_id', 'execution_id', 'version'],
- )
- backfill_requirement_pattern(cur)
- verify(cur)
- finally:
- cur.close()
- s.close()
- if __name__ == '__main__':
- main()
|