""" KnowHub 版本冗余契约(Versioning Redundancy Contract) =================================================== **背景**:我们对核心实体(requirement / capability / resource / strategy)采用 **严格冗余**的版本策略——每个版本都持有自己的一套完整行,而不是用别名 / 指针复用。 这样 `version='tao_dev'` 可以独立演化而不影响 `version='v0'`。 但"冗余"这件事在系统里的**落点分散**: - ingest 脚本(新版本从头入库) - duplicate 脚本(从已有版本复制出新版本) - migration 脚本(加表 / 加列 / 回填) 历史上栽过的坑:**新加了一张 junction 表,但 ingest/duplicate 脚本忘了同步**,导致 对应版本的 req 在那张表里**零覆盖**——dashboard 关联筛选直接失效。典型案例: `requirement_pattern` 和 `requirement_node`(2026-04 修复,见 `scripts/backfill_requirement_pattern_versions.py`)。 --- ## 本契约做三件事 1. **列表化** —— 单一真相源:所有 `requirement_id` 外键表在此集中声明。 以后加新 junction 表,**必须**往 `REQUIREMENT_JUNCTION_TABLES` 追加一行。 2. **分类冗余语义** —— 每张表标注 `copy_semantics`: - `'versioned'` : 有 `version` 列,**可以整批 bulk-copy**(remap requirement_id) - `'fresh-per-version'` : 无 version 列,payload 另一端是版本化 entity → 必须由 ingest / duplicate 脚本 **fresh 生成**,不能 bulk-copy 3. **工具函数** —— - `duplicate_versioned_junctions(cur, suffix, version)` :把所有 `'versioned'` 表的 基础行复制一份给带后缀的 req。幂等,可重跑。 - `audit_req_junction_coverage(cur, version)` :诊断—— 告诉你某版本在每张表里有多少 req 被覆盖、缺多少。 --- ## 契约(Contract Rules) 任何人引入**新的 `requirement_id` 外键表**时必须: 1. 在 `REQUIREMENT_JUNCTION_TABLES` 里添加 `JunctionSpec` 条目 2. 决定其 `copy_semantics`: - 如果是"跨版本语义相同的外部引用"(比如 itemset_id、node_id 这种只是个 id), **加 `version` 列并标 `'versioned'`**——可以 bulk-copy,省事 - 如果 payload 指向的 entity 本身是版本化的(比如 capability_id、resource_id), 则标 `'fresh-per-version'`——必须在 ingest/duplicate 脚本里自己管 3. 在 ingest / duplicate 脚本里加上对应的写入逻辑(如果是 fresh-per-version) --- ## 反面模式(DON'T) - 不要**只加表不标契约**。下次有人做新版本,这张表又漏一次 - 不要给 `'versioned'` 表的 PK 少包含 `version` 列——会冲突覆盖 - 不要把 `ON CONFLICT DO UPDATE` 用在加过新列的表上(AnalyticDB beam 限制, 见 `docs/db-operations.md §1`)——改用 `ON CONFLICT DO NOTHING` """ from __future__ import annotations from dataclasses import dataclass, field from typing import List, Literal, Optional CopySemantics = Literal['versioned', 'fresh-per-version'] @dataclass(frozen=True) class JunctionSpec: """某张 requirement 外键关系表的版本冗余规格。""" table: str """表名,如 'requirement_pattern'""" payload_columns: List[str] """除 requirement_id 和 version 之外,需要被原样保留的列(建 bulk-copy SQL 用)。 对 'fresh-per-version' 的表,此字段仅用于文档 / 诊断,不参与自动复制。""" has_version_column: bool """该表是否有 version 列。'versioned' 必为 True;'fresh-per-version' 通常为 False。""" copy_semantics: CopySemantics """决定 new-version 的行从哪来: - 'versioned' : 从 base-version 行直接复制 - 'fresh-per-version' : 由 ingest/duplicate 脚本生成""" notes: str = '' """对维护者的说明:什么时候要 touch 这张表,payload 另一端怎么版本化等。""" # ----------------------------------------------------------------------------- # 单一真相源:所有含 requirement_id 的 junction 表 # ----------------------------------------------------------------------------- REQUIREMENT_JUNCTION_TABLES: List[JunctionSpec] = [ JunctionSpec( table='requirement_pattern', payload_columns=['itemset_id', 'execution_id'], has_version_column=True, copy_semantics='versioned', notes='pattern (itemset) 来自外部服务 (aiddit),跨 knowhub 版本语义相同,整批复制即可。', ), JunctionSpec( table='requirement_node', payload_columns=['node_id', 'execution_id', 'node_path'], has_version_column=True, copy_semantics='versioned', notes='category_tree 的节点 id 全局共享,整批复制即可。', ), JunctionSpec( table='requirement_capability', payload_columns=['capability_id'], has_version_column=False, copy_semantics='fresh-per-version', notes='capability 本身版本化(id 带 __ 后缀)。由 ingest/duplicate 脚本 ' '随 cap 一起生成(见 taodev_ingest.py / version_step2_duplicate_*.py)。', ), JunctionSpec( table='requirement_resource', payload_columns=['resource_id'], has_version_column=False, copy_semantics='fresh-per-version', notes='resource 本身版本化。随 resource 一起由 ingest/duplicate 脚本生成。', ), JunctionSpec( table='requirement_strategy', payload_columns=['strategy_id', 'is_selected', 'coverage_score', 'coverage_explanation'], has_version_column=False, copy_semantics='fresh-per-version', notes='strategy 本身版本化。随 strategy 一起生成。', ), JunctionSpec( table='requirement_knowledge', payload_columns=['knowledge_id', 'relation_type', 'is_selected', 'coverage_score', 'coverage_explanation'], has_version_column=False, copy_semantics='fresh-per-version', notes='knowledge 使用 v0 共享基层;关联关系由业务逻辑 / LLM pipeline 生成,' '非 bulk-copy 场景。', ), ] # ----------------------------------------------------------------------------- # 工具函数 # ----------------------------------------------------------------------------- def duplicate_versioned_junctions( cur, suffix: str, version: str, *, req_table: str = 'requirement', dry_run: bool = False, on_progress=None, ) -> dict: """ 把所有 `copy_semantics == 'versioned'` 的 junction 表的 base 行复制给带 suffix 的 reqs。 例:`duplicate_versioned_junctions(cur, '__td', 'tao_dev')` 会把 `requirement_pattern` / `requirement_node` 里 requirement_id 为 base 的行 全部复制一份给 requirement_id 为 base + '__td' 且 version='tao_dev' 的 reqs。 幂等(ON CONFLICT DO NOTHING)。 Args: cur: psycopg2 cursor(autocommit=True 连接) suffix: req id 后缀,如 '__td' version: 目标 version 值,如 'tao_dev' req_table:通常固定 'requirement';可指定以便在测试 schema 上跑 dry_run: True 时只打印 SQL,不执行 on_progress: 可选回调 (table:str, inserted:int, elapsed:float) → None Returns: {table_name: inserted_rowcount, ...} """ result: dict = {} import time for spec in REQUIREMENT_JUNCTION_TABLES: if spec.copy_semantics != 'versioned': continue payload_csv = ', '.join(spec.payload_columns) source_csv = ', '.join(f'src.{c}' for c in spec.payload_columns) sql = f""" INSERT INTO {spec.table} (requirement_id, {payload_csv}, version) SELECT r.id, {source_csv}, %s FROM {spec.table} src JOIN {req_table} r ON r.id = src.requirement_id || %s WHERE r.version = %s ON CONFLICT DO NOTHING """ if dry_run: result[spec.table] = 0 if on_progress: on_progress(spec.table, 0, 0.0) continue t0 = time.time() cur.execute(sql, (version, suffix, version)) inserted = cur.rowcount or 0 result[spec.table] = inserted if on_progress: on_progress(spec.table, inserted, time.time() - t0) return result def audit_req_junction_coverage(cur, version: str, *, req_table: str = 'requirement') -> dict: """ 诊断:给出某版本下每张 junction 表的 req 覆盖情况。 Returns: { 'version': 'tao_dev', 'total_reqs': 99, 'tables': { 'requirement_pattern': {'covered': 77, 'missing': 22, 'semantics': 'versioned'}, 'requirement_node': {'covered': 99, 'missing': 0, 'semantics': 'versioned'}, ... } } """ cur.execute(f'SELECT COUNT(*) AS c FROM {req_table} WHERE version = %s', (version,)) total = cur.fetchone()['c'] out = {'version': version, 'total_reqs': total, 'tables': {}} for spec in REQUIREMENT_JUNCTION_TABLES: cur.execute( f""" SELECT COUNT(DISTINCT r.id) AS c FROM {req_table} r JOIN {spec.table} j ON j.requirement_id = r.id WHERE r.version = %s """, (version,), ) covered = cur.fetchone()['c'] out['tables'][spec.table] = { 'covered': covered, 'missing': total - covered, 'semantics': spec.copy_semantics, } return out def junction_table_names(semantics: Optional[CopySemantics] = None) -> List[str]: """便捷函数:列出所有(或指定语义的)junction 表名。""" if semantics is None: return [s.table for s in REQUIREMENT_JUNCTION_TABLES] return [s.table for s in REQUIREMENT_JUNCTION_TABLES if s.copy_semantics == semantics]