migrate_add_external_refs.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. #!/usr/bin/env python3
  2. """
  3. Migration: add requirement_node / requirement_pattern junction tables.
  4. - DDL: create both tables + reverse-lookup indexes
  5. - Backfill: requirement_node only (pattern 表先空着)
  6. - execution_id 固定为 56
  7. - 从每条 requirement.source_nodes 里取 node_name 路径
  8. - 去掉 /root 前缀后在 category_tree?execution_id=56 里查 category.id
  9. - 跳过 __meta__ / __abstract__ 伪节点
  10. 规范:autocommit=True、statement_timeout=30s、print flush=True。
  11. 参考 knowhub/docs/db-operations.md。
  12. """
  13. import json
  14. import sys
  15. import time
  16. import urllib.request
  17. from pathlib import Path
  18. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  19. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  20. EXECUTION_ID = 56
  21. CATEGORY_TREE_URL = f'https://pattern.aiddit.com/api/pattern/category_tree?execution_id={EXECUTION_ID}'
  22. ROOT_PREFIX = '/root'
  23. DDL_REQUIREMENT_NODE = """
  24. CREATE TABLE IF NOT EXISTS requirement_node (
  25. requirement_id VARCHAR NOT NULL,
  26. node_id INTEGER NOT NULL,
  27. execution_id INTEGER NOT NULL,
  28. node_path TEXT,
  29. PRIMARY KEY (requirement_id, node_id, execution_id)
  30. )
  31. """
  32. DDL_REQUIREMENT_PATTERN = """
  33. CREATE TABLE IF NOT EXISTS requirement_pattern (
  34. requirement_id VARCHAR NOT NULL,
  35. itemset_id INTEGER NOT NULL,
  36. execution_id INTEGER NOT NULL,
  37. PRIMARY KEY (requirement_id, itemset_id, execution_id)
  38. )
  39. """
  40. DDL_IDX_NODE_REV = 'CREATE INDEX IF NOT EXISTS idx_req_node_rev ON requirement_node (node_id, execution_id)'
  41. DDL_IDX_PATTERN_REV = 'CREATE INDEX IF NOT EXISTS idx_req_pattern_rev ON requirement_pattern (itemset_id, execution_id)'
  42. def log(msg):
  43. print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True)
  44. def fetch_category_tree():
  45. log(f'Fetching category_tree (execution_id={EXECUTION_ID})...')
  46. with urllib.request.urlopen(CATEGORY_TREE_URL, timeout=30) as resp:
  47. data = json.loads(resp.read())
  48. cats = data.get('categories', [])
  49. log(f' got {len(cats)} categories')
  50. return {c['path']: c['id'] for c in cats}
  51. def kill_idle_in_tx(cur):
  52. cur.execute("""
  53. SELECT pid FROM pg_stat_activity
  54. WHERE state='idle in transaction'
  55. AND pid != pg_backend_pid()
  56. AND datname = current_database()
  57. """)
  58. pids = [r['pid'] for r in cur.fetchall()]
  59. for pid in pids:
  60. cur.execute('SELECT pg_terminate_backend(%s)', (pid,))
  61. if pids:
  62. log(f' killed {len(pids)} idle-in-tx sessions: {pids}')
  63. else:
  64. log(' no idle-in-tx sessions to clean')
  65. def run_ddl(cur):
  66. for label, sql in [
  67. ('requirement_node', DDL_REQUIREMENT_NODE),
  68. ('requirement_pattern', DDL_REQUIREMENT_PATTERN),
  69. ('idx_req_node_rev', DDL_IDX_NODE_REV),
  70. ('idx_req_pattern_rev', DDL_IDX_PATTERN_REV),
  71. ]:
  72. log(f'DDL: {label}')
  73. cur.execute(sql)
  74. log(f' ✓ done')
  75. def normalize_path(p):
  76. if not isinstance(p, str):
  77. return None
  78. if not p.startswith('/'):
  79. return None
  80. if p == '__meta__' or p == '__abstract__':
  81. return None
  82. if p.startswith(ROOT_PREFIX + '/'):
  83. return p[len(ROOT_PREFIX):] # '/root/a/b' → '/a/b'
  84. return p
  85. def backfill_requirement_node(cur, path2id):
  86. cur.execute('SELECT id, source_nodes FROM requirement')
  87. rows = cur.fetchall()
  88. log(f'Scanning {len(rows)} requirements...')
  89. inserted = 0
  90. skipped_meta = 0
  91. unresolved_paths = set()
  92. reqs_with_rows = 0
  93. for r in rows:
  94. req_id = r['id']
  95. nodes = r['source_nodes'] or []
  96. pairs = set()
  97. for n in nodes:
  98. name = (n or {}).get('node_name') if isinstance(n, dict) else None
  99. norm = normalize_path(name)
  100. if norm is None:
  101. skipped_meta += 1
  102. continue
  103. node_id = path2id.get(norm)
  104. if node_id is None:
  105. unresolved_paths.add(norm)
  106. continue
  107. pairs.add((node_id, norm))
  108. if pairs:
  109. reqs_with_rows += 1
  110. for node_id, norm in pairs:
  111. cur.execute("""
  112. INSERT INTO requirement_node (requirement_id, node_id, execution_id, node_path)
  113. VALUES (%s, %s, %s, %s)
  114. ON CONFLICT DO NOTHING
  115. """, (req_id, node_id, EXECUTION_ID, norm))
  116. inserted += cur.rowcount or 0
  117. log(f'Backfill complete:')
  118. log(f' reqs with ≥1 row: {reqs_with_rows} / {len(rows)}')
  119. log(f' rows inserted: {inserted}')
  120. log(f' meta/abstract skipped: {skipped_meta}')
  121. if unresolved_paths:
  122. log(f' unresolved paths ({len(unresolved_paths)}): {sorted(unresolved_paths)[:10]}')
  123. else:
  124. log(f' unresolved paths: 0')
  125. def verify(cur):
  126. cur.execute('SELECT COUNT(*) c FROM requirement_node')
  127. log(f'Post-check requirement_node rows: {cur.fetchone()["c"]}')
  128. cur.execute('SELECT COUNT(*) c FROM requirement_pattern')
  129. log(f'Post-check requirement_pattern rows: {cur.fetchone()["c"]}')
  130. cur.execute('SELECT COUNT(DISTINCT requirement_id) c FROM requirement_node')
  131. log(f'Distinct reqs with node refs: {cur.fetchone()["c"]}')
  132. def main():
  133. path2id = fetch_category_tree()
  134. s = PostgreSQLCapabilityStore()
  135. cur = s._get_cursor()
  136. try:
  137. cur.execute("SET statement_timeout = '30s'")
  138. kill_idle_in_tx(cur)
  139. run_ddl(cur)
  140. backfill_requirement_node(cur, path2id)
  141. verify(cur)
  142. finally:
  143. cur.close()
  144. s.close()
  145. if __name__ == '__main__':
  146. main()