| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- #!/usr/bin/env python3
- """
- Diagnose: 列出每个已知版本在所有 requirement junction 表里的覆盖情况。
- 无写入。适合在 CI / 定期巡检 / "感觉哪里不对时先跑一下" 的场景。
- 用法:
- python3 scripts/audit_req_junctions.py
- python3 scripts/audit_req_junctions.py --versions tao_dev,v0
- """
- import argparse
- import sys
- import time
- from pathlib import Path
- from dotenv import load_dotenv
- PROJECT_ROOT = Path(__file__).resolve().parent.parent
- load_dotenv(PROJECT_ROOT / '.env')
- sys.path.insert(0, str(PROJECT_ROOT))
- from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore # noqa
- from knowhub.knowhub_db.versioning_contract import ( # noqa
- audit_req_junction_coverage,
- REQUIREMENT_JUNCTION_TABLES,
- )
- def log(m): print(f'[{time.strftime("%H:%M:%S")}] {m}', flush=True)
- def main():
- ap = argparse.ArgumentParser()
- ap.add_argument('--versions', default=None,
- help='逗号分隔的版本列表;默认自动从 requirement 表取 DISTINCT version')
- args = ap.parse_args()
- log('== req junction 覆盖诊断 ==')
- s = PostgreSQLCapabilityStore()
- cur = s._get_cursor()
- cur.execute("SET statement_timeout = '30s'")
- if args.versions:
- versions = [v.strip() for v in args.versions.split(',') if v.strip()]
- else:
- cur.execute("SELECT DISTINCT version FROM requirement ORDER BY version")
- versions = [r['version'] for r in cur.fetchall()]
- log(f'扫描版本:{versions}')
- log(f'契约声明的 junction 表 {len(REQUIREMENT_JUNCTION_TABLES)} 张:')
- for spec in REQUIREMENT_JUNCTION_TABLES:
- log(f' - {spec.table} [{spec.copy_semantics}]')
- # 先把每个版本的 audit 全量收上来,再做版本间对比
- audits = {v: audit_req_junction_coverage(cur, v) for v in versions}
- for v in versions:
- audit = audits[v]
- log('')
- log(f'>> version={v!r} total reqs = {audit["total_reqs"]}')
- for table, info in audit['tables'].items():
- log(f' {table:<24} covered {info["covered"]}/{audit["total_reqs"]:<4} '
- f'[{info["semantics"]}]')
- # 版本间比对:versioned 表各版本 covered 数应该相等。
- # 差异 = 冗余漏;一致(即便覆盖不满)= 源数据缺口,非本契约问题。
- log('')
- log('========== 版本冗余一致性检查 ==========')
- any_critical = False
- any_info = False
- for spec_table in [t for t in audits[versions[0]]['tables']
- if audits[versions[0]]['tables'][t]['semantics'] == 'versioned']:
- per_version_covered = {v: audits[v]['tables'][spec_table]['covered'] for v in versions}
- max_cov = max(per_version_covered.values())
- max_total = max(audits[v]['total_reqs'] for v in versions)
- gaps = {v: max_cov - c for v, c in per_version_covered.items() if c < max_cov}
- if gaps:
- any_critical = True
- log(f' 🚨 {spec_table}: versioned 表在版本间不一致')
- for v, c in per_version_covered.items():
- marker = ' ← 少' if v in gaps else ''
- log(f' {v:<15} covered {c}/{audits[v]["total_reqs"]}{marker}')
- log(f' 建议:跑 scripts/backfill_requirement_pattern_versions.py')
- else:
- # 一致——即使覆盖不满也不是本契约的问题
- data_gap = max_total - max_cov
- if data_gap > 0:
- any_info = True
- log(f' ℹ️ {spec_table}: 版本间一致 (所有版本都 {max_cov}/{max_total}),'
- f'但源数据本身缺 {data_gap} 条——非冗余问题')
- else:
- log(f' ✅ {spec_table}: 所有版本覆盖一致 {max_cov}/{max_total}')
- # fresh-per-version 表:版本间 coverage 差异由 ingest 决定,这里只报告,不红灯
- log('')
- log('========== fresh-per-version 表(仅供参考)==========')
- for spec_table in [t for t in audits[versions[0]]['tables']
- if audits[versions[0]]['tables'][t]['semantics'] == 'fresh-per-version']:
- per_version_covered = {v: audits[v]['tables'][spec_table]['covered'] for v in versions}
- per_version_total = {v: audits[v]['total_reqs'] for v in versions}
- line = ' '.join(f'{v}={per_version_covered[v]}/{per_version_total[v]}' for v in versions)
- log(f' {spec_table:<24} {line}')
- log('')
- if any_critical:
- log('❌ 检测到版本冗余缺口')
- sys.exit(2)
- if any_info:
- log('✅ 冗余契约无问题(个别 versioned 表有源数据缺口,见 ℹ️ 行)')
- else:
- log('✅ 冗余契约无问题')
- if __name__ == '__main__':
- main()
|