#!/usr/bin/env python3 """ Backfill versioned junction tables for requirement(strict-redundancy policy)。 使用 knowhub.knowhub_db.versioning_contract 里声明的契约—— 只处理 `copy_semantics='versioned'` 的表(当前:requirement_pattern / requirement_node), 其余表(cap / resource / strategy / knowledge)属于 'fresh-per-version', 由 ingest/duplicate 脚本自行生成,不在此处触碰。 Suffix → version 映射:目前只有 tao_dev 版带独立 req 行。 执行安全(见 docs/db-operations.md): - autocommit=True - SET statement_timeout='120s' - 先 kill idle-in-tx - ON CONFLICT DO NOTHING(幂等) - 每步打印 + flush """ import sys import time from pathlib import Path from dotenv import load_dotenv PROJECT_ROOT = Path(__file__).resolve().parent.parent load_dotenv(PROJECT_ROOT / '.env') sys.path.insert(0, str(PROJECT_ROOT)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore # noqa: E402 from knowhub.knowhub_db.versioning_contract import ( # noqa: E402 duplicate_versioned_junctions, audit_req_junction_coverage, REQUIREMENT_JUNCTION_TABLES, ) SUFFIX_TO_VERSION = { '__td': 'tao_dev', } def log(msg: str) -> None: print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True) def _print_audit(audit: dict) -> None: log(f" version={audit['version']!r} reqs={audit['total_reqs']}") for table, info in audit['tables'].items(): tag = '' if info['semantics'] == 'versioned': tag = ' ✅' if info['missing'] == 0 else f" ⚠️ 缺 {info['missing']}" log(f" {table:<24} covered {info['covered']}/{audit['total_reqs']} " f" [{info['semantics']}]{tag}") def main(): log('== 启动 backfill_requirement_pattern_versions (via contract) ==') log(f'契约声明 junction 表 {len(REQUIREMENT_JUNCTION_TABLES)} 张:') for spec in REQUIREMENT_JUNCTION_TABLES: log(f' - {spec.table} [{spec.copy_semantics}]') log('连接 KnowHub DB...') store = PostgreSQLCapabilityStore() cur = store._get_cursor() log('连接成功') try: cur.execute("SET statement_timeout = '120s'") log('SET statement_timeout = 120s') # Kill idle-in-tx cur.execute(""" SELECT pid FROM pg_stat_activity WHERE state = 'idle in transaction' AND pid != pg_backend_pid() AND datname = current_database() """) pids = [r['pid'] for r in cur.fetchall()] if pids: log(f'发现 {len(pids)} 个 idle-in-tx:{pids},terminate') for pid in pids: cur.execute('SELECT pg_terminate_backend(%s)', (pid,)) else: log('无 idle-in-tx 会话') # ---- PRE-CHECK --------------------------------------------------- log('') log('========== PRE-CHECK ==========') for version in set(SUFFIX_TO_VERSION.values()): _print_audit(audit_req_junction_coverage(cur, version)) # ---- BACKFILL ---------------------------------------------------- log('') log('========== BACKFILL ==========') def _progress(table: str, inserted: int, elapsed: float) -> None: log(f' [{table}] ✓ +{inserted} 行 ({elapsed:.2f}s)') for suffix, version in SUFFIX_TO_VERSION.items(): log(f'-- suffix={suffix!r} → version={version!r} --') result = duplicate_versioned_junctions( cur, suffix, version, on_progress=_progress, ) log(f' 合计:{sum(result.values())} 行') # ---- POST-CHECK -------------------------------------------------- log('') log('========== POST-CHECK ==========') for version in set(SUFFIX_TO_VERSION.values()): _print_audit(audit_req_junction_coverage(cur, version)) log('') log('== DONE ==') except Exception as e: log(f'❌ 错误:{type(e).__name__}: {e}') raise finally: try: cur.close() except Exception: pass if __name__ == '__main__': main()