#!/usr/bin/env python3 """ Migration: add requirement_node / requirement_pattern junction tables. - DDL: create both tables + reverse-lookup indexes - Backfill: requirement_node only (pattern 表先空着) - execution_id 固定为 56 - 从每条 requirement.source_nodes 里取 node_name 路径 - 去掉 /root 前缀后在 category_tree?execution_id=56 里查 category.id - 跳过 __meta__ / __abstract__ 伪节点 规范:autocommit=True、statement_timeout=30s、print flush=True。 参考 knowhub/docs/db-operations.md。 """ import json import sys import time import urllib.request from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore EXECUTION_ID = 56 CATEGORY_TREE_URL = f'https://pattern.aiddit.com/api/pattern/category_tree?execution_id={EXECUTION_ID}' ROOT_PREFIX = '/root' DDL_REQUIREMENT_NODE = """ CREATE TABLE IF NOT EXISTS requirement_node ( requirement_id VARCHAR NOT NULL, node_id INTEGER NOT NULL, execution_id INTEGER NOT NULL, node_path TEXT, PRIMARY KEY (requirement_id, node_id, execution_id) ) """ DDL_REQUIREMENT_PATTERN = """ CREATE TABLE IF NOT EXISTS requirement_pattern ( requirement_id VARCHAR NOT NULL, itemset_id INTEGER NOT NULL, execution_id INTEGER NOT NULL, PRIMARY KEY (requirement_id, itemset_id, execution_id) ) """ DDL_IDX_NODE_REV = 'CREATE INDEX IF NOT EXISTS idx_req_node_rev ON requirement_node (node_id, execution_id)' DDL_IDX_PATTERN_REV = 'CREATE INDEX IF NOT EXISTS idx_req_pattern_rev ON requirement_pattern (itemset_id, execution_id)' def log(msg): print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True) def fetch_category_tree(): log(f'Fetching category_tree (execution_id={EXECUTION_ID})...') with urllib.request.urlopen(CATEGORY_TREE_URL, timeout=30) as resp: data = json.loads(resp.read()) cats = data.get('categories', []) log(f' got {len(cats)} categories') return {c['path']: c['id'] for c in cats} def kill_idle_in_tx(cur): cur.execute(""" SELECT pid FROM pg_stat_activity WHERE state='idle in transaction' AND pid != pg_backend_pid() AND datname = current_database() """) pids = [r['pid'] for r in cur.fetchall()] for pid in pids: cur.execute('SELECT pg_terminate_backend(%s)', (pid,)) if pids: log(f' killed {len(pids)} idle-in-tx sessions: {pids}') else: log(' no idle-in-tx sessions to clean') def run_ddl(cur): for label, sql in [ ('requirement_node', DDL_REQUIREMENT_NODE), ('requirement_pattern', DDL_REQUIREMENT_PATTERN), ('idx_req_node_rev', DDL_IDX_NODE_REV), ('idx_req_pattern_rev', DDL_IDX_PATTERN_REV), ]: log(f'DDL: {label}') cur.execute(sql) log(f' ✓ done') def normalize_path(p): if not isinstance(p, str): return None if not p.startswith('/'): return None if p == '__meta__' or p == '__abstract__': return None if p.startswith(ROOT_PREFIX + '/'): return p[len(ROOT_PREFIX):] # '/root/a/b' → '/a/b' return p def backfill_requirement_node(cur, path2id): cur.execute('SELECT id, source_nodes FROM requirement') rows = cur.fetchall() log(f'Scanning {len(rows)} requirements...') inserted = 0 skipped_meta = 0 unresolved_paths = set() reqs_with_rows = 0 for r in rows: req_id = r['id'] nodes = r['source_nodes'] or [] pairs = set() for n in nodes: name = (n or {}).get('node_name') if isinstance(n, dict) else None norm = normalize_path(name) if norm is None: skipped_meta += 1 continue node_id = path2id.get(norm) if node_id is None: unresolved_paths.add(norm) continue pairs.add((node_id, norm)) if pairs: reqs_with_rows += 1 for node_id, norm in pairs: cur.execute(""" INSERT INTO requirement_node (requirement_id, node_id, execution_id, node_path) VALUES (%s, %s, %s, %s) ON CONFLICT DO NOTHING """, (req_id, node_id, EXECUTION_ID, norm)) inserted += cur.rowcount or 0 log(f'Backfill complete:') log(f' reqs with ≥1 row: {reqs_with_rows} / {len(rows)}') log(f' rows inserted: {inserted}') log(f' meta/abstract skipped: {skipped_meta}') if unresolved_paths: log(f' unresolved paths ({len(unresolved_paths)}): {sorted(unresolved_paths)[:10]}') else: log(f' unresolved paths: 0') def verify(cur): cur.execute('SELECT COUNT(*) c FROM requirement_node') log(f'Post-check requirement_node rows: {cur.fetchone()["c"]}') cur.execute('SELECT COUNT(*) c FROM requirement_pattern') log(f'Post-check requirement_pattern rows: {cur.fetchone()["c"]}') cur.execute('SELECT COUNT(DISTINCT requirement_id) c FROM requirement_node') log(f'Distinct reqs with node refs: {cur.fetchone()["c"]}') def main(): path2id = fetch_category_tree() s = PostgreSQLCapabilityStore() cur = s._get_cursor() try: cur.execute("SET statement_timeout = '30s'") kill_idle_in_tx(cur) run_ddl(cur) backfill_requirement_node(cur, path2id) verify(cur) finally: cur.close() s.close() if __name__ == '__main__': main()