audit_req_junctions.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. #!/usr/bin/env python3
  2. """
  3. Diagnose: 列出每个已知版本在所有 requirement junction 表里的覆盖情况。
  4. 无写入。适合在 CI / 定期巡检 / "感觉哪里不对时先跑一下" 的场景。
  5. 用法:
  6. python3 scripts/audit_req_junctions.py
  7. python3 scripts/audit_req_junctions.py --versions tao_dev,v0
  8. """
  9. import argparse
  10. import sys
  11. import time
  12. from pathlib import Path
  13. from dotenv import load_dotenv
  14. PROJECT_ROOT = Path(__file__).resolve().parent.parent
  15. load_dotenv(PROJECT_ROOT / '.env')
  16. sys.path.insert(0, str(PROJECT_ROOT))
  17. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore # noqa
  18. from knowhub.knowhub_db.versioning_contract import ( # noqa
  19. audit_req_junction_coverage,
  20. REQUIREMENT_JUNCTION_TABLES,
  21. )
  22. def log(m): print(f'[{time.strftime("%H:%M:%S")}] {m}', flush=True)
  23. def main():
  24. ap = argparse.ArgumentParser()
  25. ap.add_argument('--versions', default=None,
  26. help='逗号分隔的版本列表;默认自动从 requirement 表取 DISTINCT version')
  27. args = ap.parse_args()
  28. log('== req junction 覆盖诊断 ==')
  29. s = PostgreSQLCapabilityStore()
  30. cur = s._get_cursor()
  31. cur.execute("SET statement_timeout = '30s'")
  32. if args.versions:
  33. versions = [v.strip() for v in args.versions.split(',') if v.strip()]
  34. else:
  35. cur.execute("SELECT DISTINCT version FROM requirement ORDER BY version")
  36. versions = [r['version'] for r in cur.fetchall()]
  37. log(f'扫描版本:{versions}')
  38. log(f'契约声明的 junction 表 {len(REQUIREMENT_JUNCTION_TABLES)} 张:')
  39. for spec in REQUIREMENT_JUNCTION_TABLES:
  40. log(f' - {spec.table} [{spec.copy_semantics}]')
  41. # 先把每个版本的 audit 全量收上来,再做版本间对比
  42. audits = {v: audit_req_junction_coverage(cur, v) for v in versions}
  43. for v in versions:
  44. audit = audits[v]
  45. log('')
  46. log(f'>> version={v!r} total reqs = {audit["total_reqs"]}')
  47. for table, info in audit['tables'].items():
  48. log(f' {table:<24} covered {info["covered"]}/{audit["total_reqs"]:<4} '
  49. f'[{info["semantics"]}]')
  50. # 版本间比对:versioned 表各版本 covered 数应该相等。
  51. # 差异 = 冗余漏;一致(即便覆盖不满)= 源数据缺口,非本契约问题。
  52. log('')
  53. log('========== 版本冗余一致性检查 ==========')
  54. any_critical = False
  55. any_info = False
  56. for spec_table in [t for t in audits[versions[0]]['tables']
  57. if audits[versions[0]]['tables'][t]['semantics'] == 'versioned']:
  58. per_version_covered = {v: audits[v]['tables'][spec_table]['covered'] for v in versions}
  59. max_cov = max(per_version_covered.values())
  60. max_total = max(audits[v]['total_reqs'] for v in versions)
  61. gaps = {v: max_cov - c for v, c in per_version_covered.items() if c < max_cov}
  62. if gaps:
  63. any_critical = True
  64. log(f' 🚨 {spec_table}: versioned 表在版本间不一致')
  65. for v, c in per_version_covered.items():
  66. marker = ' ← 少' if v in gaps else ''
  67. log(f' {v:<15} covered {c}/{audits[v]["total_reqs"]}{marker}')
  68. log(f' 建议:跑 scripts/backfill_requirement_pattern_versions.py')
  69. else:
  70. # 一致——即使覆盖不满也不是本契约的问题
  71. data_gap = max_total - max_cov
  72. if data_gap > 0:
  73. any_info = True
  74. log(f' ℹ️ {spec_table}: 版本间一致 (所有版本都 {max_cov}/{max_total}),'
  75. f'但源数据本身缺 {data_gap} 条——非冗余问题')
  76. else:
  77. log(f' ✅ {spec_table}: 所有版本覆盖一致 {max_cov}/{max_total}')
  78. # fresh-per-version 表:版本间 coverage 差异由 ingest 决定,这里只报告,不红灯
  79. log('')
  80. log('========== fresh-per-version 表(仅供参考)==========')
  81. for spec_table in [t for t in audits[versions[0]]['tables']
  82. if audits[versions[0]]['tables'][t]['semantics'] == 'fresh-per-version']:
  83. per_version_covered = {v: audits[v]['tables'][spec_table]['covered'] for v in versions}
  84. per_version_total = {v: audits[v]['total_reqs'] for v in versions}
  85. line = ' '.join(f'{v}={per_version_covered[v]}/{per_version_total[v]}' for v in versions)
  86. log(f' {spec_table:<24} {line}')
  87. log('')
  88. if any_critical:
  89. log('❌ 检测到版本冗余缺口')
  90. sys.exit(2)
  91. if any_info:
  92. log('✅ 冗余契约无问题(个别 versioned 表有源数据缺口,见 ℹ️ 行)')
  93. else:
  94. log('✅ 冗余契约无问题')
  95. if __name__ == '__main__':
  96. main()