#!/usr/bin/env python3 """ Migration 续: 1. 给 requirement_node / requirement_pattern 加 `version` 列,并把 version 并入 PK - 老数据(requirement_node 210 行)默认 version='v0' 2. 从 frontend/public/requirements_planb.json 回填 requirement_pattern - version='ruotian' - execution_id=56 按 db-operations.md: - autocommit=True - SET statement_timeout='30s' - ADD COLUMN NOT NULL DEFAULT 拆两步:先 DEFAULT(补齐数据),再 SET NOT NULL - DROP CONSTRAINT + ADD PRIMARY KEY 替换主键(不使用 RENAME / DROP COLUMN) """ import json import sys import time from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore EXECUTION_ID = 56 PATTERN_VERSION = 'ruotian' PLANB_PATH = Path('/Users/sunlit/Code/Agent/knowhub/frontend/public/requirements_planb.json') def log(msg): print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True) def kill_idle_in_tx(cur): cur.execute(""" SELECT pid FROM pg_stat_activity WHERE state='idle in transaction' AND pid != pg_backend_pid() AND datname = current_database() """) pids = [r['pid'] for r in cur.fetchall()] for pid in pids: cur.execute('SELECT pg_terminate_backend(%s)', (pid,)) log(f' killed idle-in-tx sessions: {pids or "none"}') def add_version_and_rebuild_pk(cur, table, pk_cols): log(f'[{table}] ADD COLUMN version VARCHAR(32) DEFAULT v0') cur.execute(f"ALTER TABLE {table} ADD COLUMN IF NOT EXISTS version VARCHAR(32) DEFAULT 'v0'") log(f' ✓ done') # 补齐空值(幂等) cur.execute(f"UPDATE {table} SET version = 'v0' WHERE version IS NULL") log(f' filled-in NULL -> v0: {cur.rowcount or 0} rows') log(f'[{table}] ALTER COLUMN version SET NOT NULL') cur.execute(f"ALTER TABLE {table} ALTER COLUMN version SET NOT NULL") log(f' ✓ done') log(f'[{table}] DROP existing PK constraint') cur.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {table}_pkey") log(f' ✓ done') cols = ', '.join(pk_cols) log(f'[{table}] ADD PRIMARY KEY ({cols})') cur.execute(f"ALTER TABLE {table} ADD PRIMARY KEY ({cols})") log(f' ✓ done') def backfill_requirement_pattern(cur): data = json.loads(PLANB_PATH.read_text(encoding='utf-8')) reqs = data['requirements'] log(f'Loaded planb: {len(reqs)} requirements (source: {data["source"]})') # 筛已在 DB 的 requirement_id cur.execute('SELECT id FROM requirement') valid_ids = {r['id'] for r in cur.fetchall()} inserted = 0 missing_reqs = [] total_pairs = 0 reqs_with_rows = 0 for r in reqs: rid = r.get('requirement_id') if rid not in valid_ids: missing_reqs.append(rid) continue patterns = r.get('patterns') or [] pattern_ids = {p.get('pattern_id') for p in patterns if p.get('pattern_id') is not None} if pattern_ids: reqs_with_rows += 1 total_pairs += len(pattern_ids) for pid in pattern_ids: cur.execute(""" INSERT INTO requirement_pattern (requirement_id, itemset_id, execution_id, version) VALUES (%s, %s, %s, %s) ON CONFLICT DO NOTHING """, (rid, pid, EXECUTION_ID, PATTERN_VERSION)) inserted += cur.rowcount or 0 log(f'Pattern backfill complete:') log(f' reqs in planb: {len(reqs)}') log(f' reqs present in DB: {len(reqs) - len(missing_reqs)}') log(f' reqs with ≥1 pattern row: {reqs_with_rows}') log(f' total (req,pattern) distinct: {total_pairs}') log(f' rows inserted (after conflict):{inserted}') if missing_reqs: log(f' ⚠️ reqs in planb not in DB ({len(missing_reqs)}): {missing_reqs[:10]}') def verify(cur): for t in ('requirement_node', 'requirement_pattern'): cur.execute(f'SELECT COUNT(*) c FROM {t}') total = cur.fetchone()['c'] cur.execute(f'SELECT version, COUNT(*) c FROM {t} GROUP BY version ORDER BY version') by_ver = [(r['version'], r['c']) for r in cur.fetchall()] log(f'{t}: total={total}, by version: {by_ver}') def main(): s = PostgreSQLCapabilityStore() cur = s._get_cursor() try: cur.execute("SET statement_timeout = '30s'") kill_idle_in_tx(cur) add_version_and_rebuild_pk( cur, 'requirement_node', ['requirement_id', 'node_id', 'execution_id', 'version'], ) add_version_and_rebuild_pk( cur, 'requirement_pattern', ['requirement_id', 'itemset_id', 'execution_id', 'version'], ) backfill_requirement_pattern(cur) verify(cur) finally: cur.close() s.close() if __name__ == '__main__': main()