#!/usr/bin/env python3 """ Diagnose: 列出每个已知版本在所有 requirement junction 表里的覆盖情况。 无写入。适合在 CI / 定期巡检 / "感觉哪里不对时先跑一下" 的场景。 用法: python3 scripts/audit_req_junctions.py python3 scripts/audit_req_junctions.py --versions tao_dev,v0 """ import argparse import sys import time from pathlib import Path from dotenv import load_dotenv PROJECT_ROOT = Path(__file__).resolve().parent.parent load_dotenv(PROJECT_ROOT / '.env') sys.path.insert(0, str(PROJECT_ROOT)) from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore # noqa from knowhub.knowhub_db.versioning_contract import ( # noqa audit_req_junction_coverage, REQUIREMENT_JUNCTION_TABLES, ) def log(m): print(f'[{time.strftime("%H:%M:%S")}] {m}', flush=True) def main(): ap = argparse.ArgumentParser() ap.add_argument('--versions', default=None, help='逗号分隔的版本列表;默认自动从 requirement 表取 DISTINCT version') args = ap.parse_args() log('== req junction 覆盖诊断 ==') s = PostgreSQLCapabilityStore() cur = s._get_cursor() cur.execute("SET statement_timeout = '30s'") if args.versions: versions = [v.strip() for v in args.versions.split(',') if v.strip()] else: cur.execute("SELECT DISTINCT version FROM requirement ORDER BY version") versions = [r['version'] for r in cur.fetchall()] log(f'扫描版本:{versions}') log(f'契约声明的 junction 表 {len(REQUIREMENT_JUNCTION_TABLES)} 张:') for spec in REQUIREMENT_JUNCTION_TABLES: log(f' - {spec.table} [{spec.copy_semantics}]') # 先把每个版本的 audit 全量收上来,再做版本间对比 audits = {v: audit_req_junction_coverage(cur, v) for v in versions} for v in versions: audit = audits[v] log('') log(f'>> version={v!r} total reqs = {audit["total_reqs"]}') for table, info in audit['tables'].items(): log(f' {table:<24} covered {info["covered"]}/{audit["total_reqs"]:<4} ' f'[{info["semantics"]}]') # 版本间比对:versioned 表各版本 covered 数应该相等。 # 差异 = 冗余漏;一致(即便覆盖不满)= 源数据缺口,非本契约问题。 log('') log('========== 版本冗余一致性检查 ==========') any_critical = False any_info = False for spec_table in [t for t in audits[versions[0]]['tables'] if audits[versions[0]]['tables'][t]['semantics'] == 'versioned']: per_version_covered = {v: audits[v]['tables'][spec_table]['covered'] for v in versions} max_cov = max(per_version_covered.values()) max_total = max(audits[v]['total_reqs'] for v in versions) gaps = {v: max_cov - c for v, c in per_version_covered.items() if c < max_cov} if gaps: any_critical = True log(f' 🚨 {spec_table}: versioned 表在版本间不一致') for v, c in per_version_covered.items(): marker = ' ← 少' if v in gaps else '' log(f' {v:<15} covered {c}/{audits[v]["total_reqs"]}{marker}') log(f' 建议:跑 scripts/backfill_requirement_pattern_versions.py') else: # 一致——即使覆盖不满也不是本契约的问题 data_gap = max_total - max_cov if data_gap > 0: any_info = True log(f' ℹ️ {spec_table}: 版本间一致 (所有版本都 {max_cov}/{max_total}),' f'但源数据本身缺 {data_gap} 条——非冗余问题') else: log(f' ✅ {spec_table}: 所有版本覆盖一致 {max_cov}/{max_total}') # fresh-per-version 表:版本间 coverage 差异由 ingest 决定,这里只报告,不红灯 log('') log('========== fresh-per-version 表(仅供参考)==========') for spec_table in [t for t in audits[versions[0]]['tables'] if audits[versions[0]]['tables'][t]['semantics'] == 'fresh-per-version']: per_version_covered = {v: audits[v]['tables'][spec_table]['covered'] for v in versions} per_version_total = {v: audits[v]['total_reqs'] for v in versions} line = ' '.join(f'{v}={per_version_covered[v]}/{per_version_total[v]}' for v in versions) log(f' {spec_table:<24} {line}') log('') if any_critical: log('❌ 检测到版本冗余缺口') sys.exit(2) if any_info: log('✅ 冗余契约无问题(个别 versioned 表有源数据缺口,见 ℹ️ 行)') else: log('✅ 冗余契约无问题') if __name__ == '__main__': main()