| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- #!/usr/bin/env python3
- """
- Backfill versioned junction tables for requirement(strict-redundancy policy)。
- 使用 knowhub.knowhub_db.versioning_contract 里声明的契约——
- 只处理 `copy_semantics='versioned'` 的表(当前:requirement_pattern / requirement_node),
- 其余表(cap / resource / strategy / knowledge)属于 'fresh-per-version',
- 由 ingest/duplicate 脚本自行生成,不在此处触碰。
- Suffix → version 映射:目前只有 tao_dev 版带独立 req 行。
- 执行安全(见 docs/db-operations.md):
- - autocommit=True
- - SET statement_timeout='120s'
- - 先 kill idle-in-tx
- - ON CONFLICT DO NOTHING(幂等)
- - 每步打印 + flush
- """
- import sys
- import time
- from pathlib import Path
- from dotenv import load_dotenv
- PROJECT_ROOT = Path(__file__).resolve().parent.parent
- load_dotenv(PROJECT_ROOT / '.env')
- sys.path.insert(0, str(PROJECT_ROOT))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore # noqa: E402
- from knowhub.knowhub_db.versioning_contract import ( # noqa: E402
- duplicate_versioned_junctions,
- audit_req_junction_coverage,
- REQUIREMENT_JUNCTION_TABLES,
- )
- SUFFIX_TO_VERSION = {
- '__td': 'tao_dev',
- }
- def log(msg: str) -> None:
- print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True)
- def _print_audit(audit: dict) -> None:
- log(f" version={audit['version']!r} reqs={audit['total_reqs']}")
- for table, info in audit['tables'].items():
- tag = ''
- if info['semantics'] == 'versioned':
- tag = ' ✅' if info['missing'] == 0 else f" ⚠️ 缺 {info['missing']}"
- log(f" {table:<24} covered {info['covered']}/{audit['total_reqs']} "
- f" [{info['semantics']}]{tag}")
- def main():
- log('== 启动 backfill_requirement_pattern_versions (via contract) ==')
- log(f'契约声明 junction 表 {len(REQUIREMENT_JUNCTION_TABLES)} 张:')
- for spec in REQUIREMENT_JUNCTION_TABLES:
- log(f' - {spec.table} [{spec.copy_semantics}]')
- log('连接 KnowHub DB...')
- store = PostgreSQLCapabilityStore()
- cur = store._get_cursor()
- log('连接成功')
- try:
- cur.execute("SET statement_timeout = '120s'")
- log('SET statement_timeout = 120s')
- # Kill idle-in-tx
- cur.execute("""
- SELECT pid FROM pg_stat_activity
- WHERE state = 'idle in transaction'
- AND pid != pg_backend_pid()
- AND datname = current_database()
- """)
- pids = [r['pid'] for r in cur.fetchall()]
- if pids:
- log(f'发现 {len(pids)} 个 idle-in-tx:{pids},terminate')
- for pid in pids:
- cur.execute('SELECT pg_terminate_backend(%s)', (pid,))
- else:
- log('无 idle-in-tx 会话')
- # ---- PRE-CHECK ---------------------------------------------------
- log('')
- log('========== PRE-CHECK ==========')
- for version in set(SUFFIX_TO_VERSION.values()):
- _print_audit(audit_req_junction_coverage(cur, version))
- # ---- BACKFILL ----------------------------------------------------
- log('')
- log('========== BACKFILL ==========')
- def _progress(table: str, inserted: int, elapsed: float) -> None:
- log(f' [{table}] ✓ +{inserted} 行 ({elapsed:.2f}s)')
- for suffix, version in SUFFIX_TO_VERSION.items():
- log(f'-- suffix={suffix!r} → version={version!r} --')
- result = duplicate_versioned_junctions(
- cur, suffix, version, on_progress=_progress,
- )
- log(f' 合计:{sum(result.values())} 行')
- # ---- POST-CHECK --------------------------------------------------
- log('')
- log('========== POST-CHECK ==========')
- for version in set(SUFFIX_TO_VERSION.values()):
- _print_audit(audit_req_junction_coverage(cur, version))
- log('')
- log('== DONE ==')
- except Exception as e:
- log(f'❌ 错误:{type(e).__name__}: {e}')
- raise
- finally:
- try:
- cur.close()
- except Exception:
- pass
- if __name__ == '__main__':
- main()
|