| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- #!/usr/bin/env python3
- """
- Migration: add requirement_node / requirement_pattern junction tables.
- - DDL: create both tables + reverse-lookup indexes
- - Backfill: requirement_node only (pattern 表先空着)
- - execution_id 固定为 56
- - 从每条 requirement.source_nodes 里取 node_name 路径
- - 去掉 /root 前缀后在 category_tree?execution_id=56 里查 category.id
- - 跳过 __meta__ / __abstract__ 伪节点
- 规范:autocommit=True、statement_timeout=30s、print flush=True。
- 参考 knowhub/docs/db-operations.md。
- """
- import json
- import sys
- import time
- import urllib.request
- from pathlib import Path
- sys.path.insert(0, str(Path(__file__).parent.parent.parent))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
- EXECUTION_ID = 56
- CATEGORY_TREE_URL = f'https://pattern.aiddit.com/api/pattern/category_tree?execution_id={EXECUTION_ID}'
- ROOT_PREFIX = '/root'
- DDL_REQUIREMENT_NODE = """
- CREATE TABLE IF NOT EXISTS requirement_node (
- requirement_id VARCHAR NOT NULL,
- node_id INTEGER NOT NULL,
- execution_id INTEGER NOT NULL,
- node_path TEXT,
- PRIMARY KEY (requirement_id, node_id, execution_id)
- )
- """
- DDL_REQUIREMENT_PATTERN = """
- CREATE TABLE IF NOT EXISTS requirement_pattern (
- requirement_id VARCHAR NOT NULL,
- itemset_id INTEGER NOT NULL,
- execution_id INTEGER NOT NULL,
- PRIMARY KEY (requirement_id, itemset_id, execution_id)
- )
- """
- DDL_IDX_NODE_REV = 'CREATE INDEX IF NOT EXISTS idx_req_node_rev ON requirement_node (node_id, execution_id)'
- DDL_IDX_PATTERN_REV = 'CREATE INDEX IF NOT EXISTS idx_req_pattern_rev ON requirement_pattern (itemset_id, execution_id)'
- def log(msg):
- print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True)
- def fetch_category_tree():
- log(f'Fetching category_tree (execution_id={EXECUTION_ID})...')
- with urllib.request.urlopen(CATEGORY_TREE_URL, timeout=30) as resp:
- data = json.loads(resp.read())
- cats = data.get('categories', [])
- log(f' got {len(cats)} categories')
- return {c['path']: c['id'] for c in cats}
- def kill_idle_in_tx(cur):
- cur.execute("""
- SELECT pid FROM pg_stat_activity
- WHERE state='idle in transaction'
- AND pid != pg_backend_pid()
- AND datname = current_database()
- """)
- pids = [r['pid'] for r in cur.fetchall()]
- for pid in pids:
- cur.execute('SELECT pg_terminate_backend(%s)', (pid,))
- if pids:
- log(f' killed {len(pids)} idle-in-tx sessions: {pids}')
- else:
- log(' no idle-in-tx sessions to clean')
- def run_ddl(cur):
- for label, sql in [
- ('requirement_node', DDL_REQUIREMENT_NODE),
- ('requirement_pattern', DDL_REQUIREMENT_PATTERN),
- ('idx_req_node_rev', DDL_IDX_NODE_REV),
- ('idx_req_pattern_rev', DDL_IDX_PATTERN_REV),
- ]:
- log(f'DDL: {label}')
- cur.execute(sql)
- log(f' ✓ done')
- def normalize_path(p):
- if not isinstance(p, str):
- return None
- if not p.startswith('/'):
- return None
- if p == '__meta__' or p == '__abstract__':
- return None
- if p.startswith(ROOT_PREFIX + '/'):
- return p[len(ROOT_PREFIX):] # '/root/a/b' → '/a/b'
- return p
- def backfill_requirement_node(cur, path2id):
- cur.execute('SELECT id, source_nodes FROM requirement')
- rows = cur.fetchall()
- log(f'Scanning {len(rows)} requirements...')
- inserted = 0
- skipped_meta = 0
- unresolved_paths = set()
- reqs_with_rows = 0
- for r in rows:
- req_id = r['id']
- nodes = r['source_nodes'] or []
- pairs = set()
- for n in nodes:
- name = (n or {}).get('node_name') if isinstance(n, dict) else None
- norm = normalize_path(name)
- if norm is None:
- skipped_meta += 1
- continue
- node_id = path2id.get(norm)
- if node_id is None:
- unresolved_paths.add(norm)
- continue
- pairs.add((node_id, norm))
- if pairs:
- reqs_with_rows += 1
- for node_id, norm in pairs:
- cur.execute("""
- INSERT INTO requirement_node (requirement_id, node_id, execution_id, node_path)
- VALUES (%s, %s, %s, %s)
- ON CONFLICT DO NOTHING
- """, (req_id, node_id, EXECUTION_ID, norm))
- inserted += cur.rowcount or 0
- log(f'Backfill complete:')
- log(f' reqs with ≥1 row: {reqs_with_rows} / {len(rows)}')
- log(f' rows inserted: {inserted}')
- log(f' meta/abstract skipped: {skipped_meta}')
- if unresolved_paths:
- log(f' unresolved paths ({len(unresolved_paths)}): {sorted(unresolved_paths)[:10]}')
- else:
- log(f' unresolved paths: 0')
- def verify(cur):
- cur.execute('SELECT COUNT(*) c FROM requirement_node')
- log(f'Post-check requirement_node rows: {cur.fetchone()["c"]}')
- cur.execute('SELECT COUNT(*) c FROM requirement_pattern')
- log(f'Post-check requirement_pattern rows: {cur.fetchone()["c"]}')
- cur.execute('SELECT COUNT(DISTINCT requirement_id) c FROM requirement_node')
- log(f'Distinct reqs with node refs: {cur.fetchone()["c"]}')
- def main():
- path2id = fetch_category_tree()
- s = PostgreSQLCapabilityStore()
- cur = s._get_cursor()
- try:
- cur.execute("SET statement_timeout = '30s'")
- kill_idle_in_tx(cur)
- run_ddl(cur)
- backfill_requirement_node(cur, path2id)
- verify(cur)
- finally:
- cur.close()
- s.close()
- if __name__ == '__main__':
- main()
|