backfill_requirement_pattern_versions.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. #!/usr/bin/env python3
  2. """
  3. Backfill versioned junction tables for requirement(strict-redundancy policy)。
  4. 使用 knowhub.knowhub_db.versioning_contract 里声明的契约——
  5. 只处理 `copy_semantics='versioned'` 的表(当前:requirement_pattern / requirement_node),
  6. 其余表(cap / resource / strategy / knowledge)属于 'fresh-per-version',
  7. 由 ingest/duplicate 脚本自行生成,不在此处触碰。
  8. Suffix → version 映射:目前只有 tao_dev 版带独立 req 行。
  9. 执行安全(见 docs/db-operations.md):
  10. - autocommit=True
  11. - SET statement_timeout='120s'
  12. - 先 kill idle-in-tx
  13. - ON CONFLICT DO NOTHING(幂等)
  14. - 每步打印 + flush
  15. """
  16. import sys
  17. import time
  18. from pathlib import Path
  19. from dotenv import load_dotenv
  20. PROJECT_ROOT = Path(__file__).resolve().parent.parent
  21. load_dotenv(PROJECT_ROOT / '.env')
  22. sys.path.insert(0, str(PROJECT_ROOT))
  23. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore # noqa: E402
  24. from knowhub.knowhub_db.versioning_contract import ( # noqa: E402
  25. duplicate_versioned_junctions,
  26. audit_req_junction_coverage,
  27. REQUIREMENT_JUNCTION_TABLES,
  28. )
  29. SUFFIX_TO_VERSION = {
  30. '__td': 'tao_dev',
  31. }
  32. def log(msg: str) -> None:
  33. print(f'[{time.strftime("%H:%M:%S")}] {msg}', flush=True)
  34. def _print_audit(audit: dict) -> None:
  35. log(f" version={audit['version']!r} reqs={audit['total_reqs']}")
  36. for table, info in audit['tables'].items():
  37. tag = ''
  38. if info['semantics'] == 'versioned':
  39. tag = ' ✅' if info['missing'] == 0 else f" ⚠️ 缺 {info['missing']}"
  40. log(f" {table:<24} covered {info['covered']}/{audit['total_reqs']} "
  41. f" [{info['semantics']}]{tag}")
  42. def main():
  43. log('== 启动 backfill_requirement_pattern_versions (via contract) ==')
  44. log(f'契约声明 junction 表 {len(REQUIREMENT_JUNCTION_TABLES)} 张:')
  45. for spec in REQUIREMENT_JUNCTION_TABLES:
  46. log(f' - {spec.table} [{spec.copy_semantics}]')
  47. log('连接 KnowHub DB...')
  48. store = PostgreSQLCapabilityStore()
  49. cur = store._get_cursor()
  50. log('连接成功')
  51. try:
  52. cur.execute("SET statement_timeout = '120s'")
  53. log('SET statement_timeout = 120s')
  54. # Kill idle-in-tx
  55. cur.execute("""
  56. SELECT pid FROM pg_stat_activity
  57. WHERE state = 'idle in transaction'
  58. AND pid != pg_backend_pid()
  59. AND datname = current_database()
  60. """)
  61. pids = [r['pid'] for r in cur.fetchall()]
  62. if pids:
  63. log(f'发现 {len(pids)} 个 idle-in-tx:{pids},terminate')
  64. for pid in pids:
  65. cur.execute('SELECT pg_terminate_backend(%s)', (pid,))
  66. else:
  67. log('无 idle-in-tx 会话')
  68. # ---- PRE-CHECK ---------------------------------------------------
  69. log('')
  70. log('========== PRE-CHECK ==========')
  71. for version in set(SUFFIX_TO_VERSION.values()):
  72. _print_audit(audit_req_junction_coverage(cur, version))
  73. # ---- BACKFILL ----------------------------------------------------
  74. log('')
  75. log('========== BACKFILL ==========')
  76. def _progress(table: str, inserted: int, elapsed: float) -> None:
  77. log(f' [{table}] ✓ +{inserted} 行 ({elapsed:.2f}s)')
  78. for suffix, version in SUFFIX_TO_VERSION.items():
  79. log(f'-- suffix={suffix!r} → version={version!r} --')
  80. result = duplicate_versioned_junctions(
  81. cur, suffix, version, on_progress=_progress,
  82. )
  83. log(f' 合计:{sum(result.values())} 行')
  84. # ---- POST-CHECK --------------------------------------------------
  85. log('')
  86. log('========== POST-CHECK ==========')
  87. for version in set(SUFFIX_TO_VERSION.values()):
  88. _print_audit(audit_req_junction_coverage(cur, version))
  89. log('')
  90. log('== DONE ==')
  91. except Exception as e:
  92. log(f'❌ 错误:{type(e).__name__}: {e}')
  93. raise
  94. finally:
  95. try:
  96. cur.close()
  97. except Exception:
  98. pass
  99. if __name__ == '__main__':
  100. main()