| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- """
- KnowHub 版本冗余契约(Versioning Redundancy Contract)
- ===================================================
- **背景**:我们对核心实体(requirement / capability / resource / strategy)采用
- **严格冗余**的版本策略——每个版本都持有自己的一套完整行,而不是用别名 / 指针复用。
- 这样 `version='tao_dev'` 可以独立演化而不影响 `version='v0'`。
- 但"冗余"这件事在系统里的**落点分散**:
- - ingest 脚本(新版本从头入库)
- - duplicate 脚本(从已有版本复制出新版本)
- - migration 脚本(加表 / 加列 / 回填)
- 历史上栽过的坑:**新加了一张 junction 表,但 ingest/duplicate 脚本忘了同步**,导致
- 对应版本的 req 在那张表里**零覆盖**——dashboard 关联筛选直接失效。典型案例:
- `requirement_pattern` 和 `requirement_node`(2026-04 修复,见
- `scripts/backfill_requirement_pattern_versions.py`)。
- ---
- ## 本契约做三件事
- 1. **列表化** —— 单一真相源:所有 `requirement_id` 外键表在此集中声明。
- 以后加新 junction 表,**必须**往 `REQUIREMENT_JUNCTION_TABLES` 追加一行。
- 2. **分类冗余语义** —— 每张表标注 `copy_semantics`:
- - `'versioned'` : 有 `version` 列,**可以整批 bulk-copy**(remap requirement_id)
- - `'fresh-per-version'` : 无 version 列,payload 另一端是版本化 entity
- → 必须由 ingest / duplicate 脚本 **fresh 生成**,不能 bulk-copy
- 3. **工具函数** ——
- - `duplicate_versioned_junctions(cur, suffix, version)` :把所有 `'versioned'` 表的
- 基础行复制一份给带后缀的 req。幂等,可重跑。
- - `audit_req_junction_coverage(cur, version)` :诊断——
- 告诉你某版本在每张表里有多少 req 被覆盖、缺多少。
- ---
- ## 契约(Contract Rules)
- 任何人引入**新的 `requirement_id` 外键表**时必须:
- 1. 在 `REQUIREMENT_JUNCTION_TABLES` 里添加 `JunctionSpec` 条目
- 2. 决定其 `copy_semantics`:
- - 如果是"跨版本语义相同的外部引用"(比如 itemset_id、node_id 这种只是个 id),
- **加 `version` 列并标 `'versioned'`**——可以 bulk-copy,省事
- - 如果 payload 指向的 entity 本身是版本化的(比如 capability_id、resource_id),
- 则标 `'fresh-per-version'`——必须在 ingest/duplicate 脚本里自己管
- 3. 在 ingest / duplicate 脚本里加上对应的写入逻辑(如果是 fresh-per-version)
- ---
- ## 反面模式(DON'T)
- - 不要**只加表不标契约**。下次有人做新版本,这张表又漏一次
- - 不要给 `'versioned'` 表的 PK 少包含 `version` 列——会冲突覆盖
- - 不要把 `ON CONFLICT DO UPDATE` 用在加过新列的表上(AnalyticDB beam 限制,
- 见 `docs/db-operations.md §1`)——改用 `ON CONFLICT DO NOTHING`
- """
- from __future__ import annotations
- from dataclasses import dataclass, field
- from typing import List, Literal, Optional
- CopySemantics = Literal['versioned', 'fresh-per-version']
- @dataclass(frozen=True)
- class JunctionSpec:
- """某张 requirement 外键关系表的版本冗余规格。"""
- table: str
- """表名,如 'requirement_pattern'"""
- payload_columns: List[str]
- """除 requirement_id 和 version 之外,需要被原样保留的列(建 bulk-copy SQL 用)。
- 对 'fresh-per-version' 的表,此字段仅用于文档 / 诊断,不参与自动复制。"""
- has_version_column: bool
- """该表是否有 version 列。'versioned' 必为 True;'fresh-per-version' 通常为 False。"""
- copy_semantics: CopySemantics
- """决定 new-version 的行从哪来:
- - 'versioned' : 从 base-version 行直接复制
- - 'fresh-per-version' : 由 ingest/duplicate 脚本生成"""
- notes: str = ''
- """对维护者的说明:什么时候要 touch 这张表,payload 另一端怎么版本化等。"""
- # -----------------------------------------------------------------------------
- # 单一真相源:所有含 requirement_id 的 junction 表
- # -----------------------------------------------------------------------------
- REQUIREMENT_JUNCTION_TABLES: List[JunctionSpec] = [
- JunctionSpec(
- table='requirement_pattern',
- payload_columns=['itemset_id', 'execution_id'],
- has_version_column=True,
- copy_semantics='versioned',
- notes='pattern (itemset) 来自外部服务 (aiddit),跨 knowhub 版本语义相同,整批复制即可。',
- ),
- JunctionSpec(
- table='requirement_node',
- payload_columns=['node_id', 'execution_id', 'node_path'],
- has_version_column=True,
- copy_semantics='versioned',
- notes='category_tree 的节点 id 全局共享,整批复制即可。',
- ),
- JunctionSpec(
- table='requirement_capability',
- payload_columns=['capability_id'],
- has_version_column=False,
- copy_semantics='fresh-per-version',
- notes='capability 本身版本化(id 带 __<ver> 后缀)。由 ingest/duplicate 脚本 '
- '随 cap 一起生成(见 taodev_ingest.py / version_step2_duplicate_*.py)。',
- ),
- JunctionSpec(
- table='requirement_resource',
- payload_columns=['resource_id'],
- has_version_column=False,
- copy_semantics='fresh-per-version',
- notes='resource 本身版本化。随 resource 一起由 ingest/duplicate 脚本生成。',
- ),
- JunctionSpec(
- table='requirement_strategy',
- payload_columns=['strategy_id', 'is_selected', 'coverage_score', 'coverage_explanation'],
- has_version_column=False,
- copy_semantics='fresh-per-version',
- notes='strategy 本身版本化。随 strategy 一起生成。',
- ),
- JunctionSpec(
- table='requirement_knowledge',
- payload_columns=['knowledge_id', 'relation_type', 'is_selected',
- 'coverage_score', 'coverage_explanation'],
- has_version_column=False,
- copy_semantics='fresh-per-version',
- notes='knowledge 使用 v0 共享基层;关联关系由业务逻辑 / LLM pipeline 生成,'
- '非 bulk-copy 场景。',
- ),
- ]
- # -----------------------------------------------------------------------------
- # 工具函数
- # -----------------------------------------------------------------------------
- def duplicate_versioned_junctions(
- cur,
- suffix: str,
- version: str,
- *,
- req_table: str = 'requirement',
- dry_run: bool = False,
- on_progress=None,
- ) -> dict:
- """
- 把所有 `copy_semantics == 'versioned'` 的 junction 表的 base 行复制给带 suffix 的 reqs。
- 例:`duplicate_versioned_junctions(cur, '__td', 'tao_dev')`
- 会把 `requirement_pattern` / `requirement_node` 里 requirement_id 为 base 的行
- 全部复制一份给 requirement_id 为 base + '__td' 且 version='tao_dev' 的 reqs。
- 幂等(ON CONFLICT DO NOTHING)。
- Args:
- cur: psycopg2 cursor(autocommit=True 连接)
- suffix: req id 后缀,如 '__td'
- version: 目标 version 值,如 'tao_dev'
- req_table:通常固定 'requirement';可指定以便在测试 schema 上跑
- dry_run: True 时只打印 SQL,不执行
- on_progress: 可选回调 (table:str, inserted:int, elapsed:float) → None
- Returns:
- {table_name: inserted_rowcount, ...}
- """
- result: dict = {}
- import time
- for spec in REQUIREMENT_JUNCTION_TABLES:
- if spec.copy_semantics != 'versioned':
- continue
- payload_csv = ', '.join(spec.payload_columns)
- source_csv = ', '.join(f'src.{c}' for c in spec.payload_columns)
- sql = f"""
- INSERT INTO {spec.table} (requirement_id, {payload_csv}, version)
- SELECT r.id, {source_csv}, %s
- FROM {spec.table} src
- JOIN {req_table} r ON r.id = src.requirement_id || %s
- WHERE r.version = %s
- ON CONFLICT DO NOTHING
- """
- if dry_run:
- result[spec.table] = 0
- if on_progress:
- on_progress(spec.table, 0, 0.0)
- continue
- t0 = time.time()
- cur.execute(sql, (version, suffix, version))
- inserted = cur.rowcount or 0
- result[spec.table] = inserted
- if on_progress:
- on_progress(spec.table, inserted, time.time() - t0)
- return result
- def audit_req_junction_coverage(cur, version: str, *, req_table: str = 'requirement') -> dict:
- """
- 诊断:给出某版本下每张 junction 表的 req 覆盖情况。
- Returns:
- {
- 'version': 'tao_dev',
- 'total_reqs': 99,
- 'tables': {
- 'requirement_pattern': {'covered': 77, 'missing': 22, 'semantics': 'versioned'},
- 'requirement_node': {'covered': 99, 'missing': 0, 'semantics': 'versioned'},
- ...
- }
- }
- """
- cur.execute(f'SELECT COUNT(*) AS c FROM {req_table} WHERE version = %s', (version,))
- total = cur.fetchone()['c']
- out = {'version': version, 'total_reqs': total, 'tables': {}}
- for spec in REQUIREMENT_JUNCTION_TABLES:
- cur.execute(
- f"""
- SELECT COUNT(DISTINCT r.id) AS c
- FROM {req_table} r
- JOIN {spec.table} j ON j.requirement_id = r.id
- WHERE r.version = %s
- """,
- (version,),
- )
- covered = cur.fetchone()['c']
- out['tables'][spec.table] = {
- 'covered': covered,
- 'missing': total - covered,
- 'semantics': spec.copy_semantics,
- }
- return out
- def junction_table_names(semantics: Optional[CopySemantics] = None) -> List[str]:
- """便捷函数:列出所有(或指定语义的)junction 表名。"""
- if semantics is None:
- return [s.table for s in REQUIREMENT_JUNCTION_TABLES]
- return [s.table for s in REQUIREMENT_JUNCTION_TABLES if s.copy_semantics == semantics]
|