|
@@ -0,0 +1,244 @@
|
|
|
|
|
+"""
|
|
|
|
|
+KnowHub 版本冗余契约(Versioning Redundancy Contract)
|
|
|
|
|
+===================================================
|
|
|
|
|
+
|
|
|
|
|
+**背景**:我们对核心实体(requirement / capability / resource / strategy)采用
|
|
|
|
|
+**严格冗余**的版本策略——每个版本都持有自己的一套完整行,而不是用别名 / 指针复用。
|
|
|
|
|
+这样 `version='tao_dev'` 可以独立演化而不影响 `version='v0'`。
|
|
|
|
|
+
|
|
|
|
|
+但"冗余"这件事在系统里的**落点分散**:
|
|
|
|
|
+ - ingest 脚本(新版本从头入库)
|
|
|
|
|
+ - duplicate 脚本(从已有版本复制出新版本)
|
|
|
|
|
+ - migration 脚本(加表 / 加列 / 回填)
|
|
|
|
|
+
|
|
|
|
|
+历史上栽过的坑:**新加了一张 junction 表,但 ingest/duplicate 脚本忘了同步**,导致
|
|
|
|
|
+对应版本的 req 在那张表里**零覆盖**——dashboard 关联筛选直接失效。典型案例:
|
|
|
|
|
+`requirement_pattern` 和 `requirement_node`(2026-04 修复,见
|
|
|
|
|
+`scripts/backfill_requirement_pattern_versions.py`)。
|
|
|
|
|
+
|
|
|
|
|
+---
|
|
|
|
|
+
|
|
|
|
|
+## 本契约做三件事
|
|
|
|
|
+
|
|
|
|
|
+1. **列表化** —— 单一真相源:所有 `requirement_id` 外键表在此集中声明。
|
|
|
|
|
+ 以后加新 junction 表,**必须**往 `REQUIREMENT_JUNCTION_TABLES` 追加一行。
|
|
|
|
|
+
|
|
|
|
|
+2. **分类冗余语义** —— 每张表标注 `copy_semantics`:
|
|
|
|
|
+ - `'versioned'` : 有 `version` 列,**可以整批 bulk-copy**(remap requirement_id)
|
|
|
|
|
+ - `'fresh-per-version'` : 无 version 列,payload 另一端是版本化 entity
|
|
|
|
|
+ → 必须由 ingest / duplicate 脚本 **fresh 生成**,不能 bulk-copy
|
|
|
|
|
+
|
|
|
|
|
+3. **工具函数** ——
|
|
|
|
|
+ - `duplicate_versioned_junctions(cur, suffix, version)` :把所有 `'versioned'` 表的
|
|
|
|
|
+ 基础行复制一份给带后缀的 req。幂等,可重跑。
|
|
|
|
|
+ - `audit_req_junction_coverage(cur, version)` :诊断——
|
|
|
|
|
+ 告诉你某版本在每张表里有多少 req 被覆盖、缺多少。
|
|
|
|
|
+
|
|
|
|
|
+---
|
|
|
|
|
+
|
|
|
|
|
+## 契约(Contract Rules)
|
|
|
|
|
+
|
|
|
|
|
+任何人引入**新的 `requirement_id` 外键表**时必须:
|
|
|
|
|
+
|
|
|
|
|
+1. 在 `REQUIREMENT_JUNCTION_TABLES` 里添加 `JunctionSpec` 条目
|
|
|
|
|
+2. 决定其 `copy_semantics`:
|
|
|
|
|
+ - 如果是"跨版本语义相同的外部引用"(比如 itemset_id、node_id 这种只是个 id),
|
|
|
|
|
+ **加 `version` 列并标 `'versioned'`**——可以 bulk-copy,省事
|
|
|
|
|
+ - 如果 payload 指向的 entity 本身是版本化的(比如 capability_id、resource_id),
|
|
|
|
|
+ 则标 `'fresh-per-version'`——必须在 ingest/duplicate 脚本里自己管
|
|
|
|
|
+3. 在 ingest / duplicate 脚本里加上对应的写入逻辑(如果是 fresh-per-version)
|
|
|
|
|
+
|
|
|
|
|
+---
|
|
|
|
|
+
|
|
|
|
|
+## 反面模式(DON'T)
|
|
|
|
|
+
|
|
|
|
|
+- 不要**只加表不标契约**。下次有人做新版本,这张表又漏一次
|
|
|
|
|
+- 不要给 `'versioned'` 表的 PK 少包含 `version` 列——会冲突覆盖
|
|
|
|
|
+- 不要把 `ON CONFLICT DO UPDATE` 用在加过新列的表上(AnalyticDB beam 限制,
|
|
|
|
|
+ 见 `docs/db-operations.md §1`)——改用 `ON CONFLICT DO NOTHING`
|
|
|
|
|
+"""
|
|
|
|
|
+from __future__ import annotations
|
|
|
|
|
+
|
|
|
|
|
+from dataclasses import dataclass, field
|
|
|
|
|
+from typing import List, Literal, Optional
|
|
|
|
|
+
|
|
|
|
|
+CopySemantics = Literal['versioned', 'fresh-per-version']
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@dataclass(frozen=True)
|
|
|
|
|
+class JunctionSpec:
|
|
|
|
|
+ """某张 requirement 外键关系表的版本冗余规格。"""
|
|
|
|
|
+ table: str
|
|
|
|
|
+ """表名,如 'requirement_pattern'"""
|
|
|
|
|
+
|
|
|
|
|
+ payload_columns: List[str]
|
|
|
|
|
+ """除 requirement_id 和 version 之外,需要被原样保留的列(建 bulk-copy SQL 用)。
|
|
|
|
|
+ 对 'fresh-per-version' 的表,此字段仅用于文档 / 诊断,不参与自动复制。"""
|
|
|
|
|
+
|
|
|
|
|
+ has_version_column: bool
|
|
|
|
|
+ """该表是否有 version 列。'versioned' 必为 True;'fresh-per-version' 通常为 False。"""
|
|
|
|
|
+
|
|
|
|
|
+ copy_semantics: CopySemantics
|
|
|
|
|
+ """决定 new-version 的行从哪来:
|
|
|
|
|
+ - 'versioned' : 从 base-version 行直接复制
|
|
|
|
|
+ - 'fresh-per-version' : 由 ingest/duplicate 脚本生成"""
|
|
|
|
|
+
|
|
|
|
|
+ notes: str = ''
|
|
|
|
|
+ """对维护者的说明:什么时候要 touch 这张表,payload 另一端怎么版本化等。"""
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# -----------------------------------------------------------------------------
|
|
|
|
|
+# 单一真相源:所有含 requirement_id 的 junction 表
|
|
|
|
|
+# -----------------------------------------------------------------------------
|
|
|
|
|
+REQUIREMENT_JUNCTION_TABLES: List[JunctionSpec] = [
|
|
|
|
|
+ JunctionSpec(
|
|
|
|
|
+ table='requirement_pattern',
|
|
|
|
|
+ payload_columns=['itemset_id', 'execution_id'],
|
|
|
|
|
+ has_version_column=True,
|
|
|
|
|
+ copy_semantics='versioned',
|
|
|
|
|
+ notes='pattern (itemset) 来自外部服务 (aiddit),跨 knowhub 版本语义相同,整批复制即可。',
|
|
|
|
|
+ ),
|
|
|
|
|
+ JunctionSpec(
|
|
|
|
|
+ table='requirement_node',
|
|
|
|
|
+ payload_columns=['node_id', 'execution_id', 'node_path'],
|
|
|
|
|
+ has_version_column=True,
|
|
|
|
|
+ copy_semantics='versioned',
|
|
|
|
|
+ notes='category_tree 的节点 id 全局共享,整批复制即可。',
|
|
|
|
|
+ ),
|
|
|
|
|
+ JunctionSpec(
|
|
|
|
|
+ table='requirement_capability',
|
|
|
|
|
+ payload_columns=['capability_id'],
|
|
|
|
|
+ has_version_column=False,
|
|
|
|
|
+ copy_semantics='fresh-per-version',
|
|
|
|
|
+ notes='capability 本身版本化(id 带 __<ver> 后缀)。由 ingest/duplicate 脚本 '
|
|
|
|
|
+ '随 cap 一起生成(见 taodev_ingest.py / version_step2_duplicate_*.py)。',
|
|
|
|
|
+ ),
|
|
|
|
|
+ JunctionSpec(
|
|
|
|
|
+ table='requirement_resource',
|
|
|
|
|
+ payload_columns=['resource_id'],
|
|
|
|
|
+ has_version_column=False,
|
|
|
|
|
+ copy_semantics='fresh-per-version',
|
|
|
|
|
+ notes='resource 本身版本化。随 resource 一起由 ingest/duplicate 脚本生成。',
|
|
|
|
|
+ ),
|
|
|
|
|
+ JunctionSpec(
|
|
|
|
|
+ table='requirement_strategy',
|
|
|
|
|
+ payload_columns=['strategy_id', 'is_selected', 'coverage_score', 'coverage_explanation'],
|
|
|
|
|
+ has_version_column=False,
|
|
|
|
|
+ copy_semantics='fresh-per-version',
|
|
|
|
|
+ notes='strategy 本身版本化。随 strategy 一起生成。',
|
|
|
|
|
+ ),
|
|
|
|
|
+ JunctionSpec(
|
|
|
|
|
+ table='requirement_knowledge',
|
|
|
|
|
+ payload_columns=['knowledge_id', 'relation_type', 'is_selected',
|
|
|
|
|
+ 'coverage_score', 'coverage_explanation'],
|
|
|
|
|
+ has_version_column=False,
|
|
|
|
|
+ copy_semantics='fresh-per-version',
|
|
|
|
|
+ notes='knowledge 使用 v0 共享基层;关联关系由业务逻辑 / LLM pipeline 生成,'
|
|
|
|
|
+ '非 bulk-copy 场景。',
|
|
|
|
|
+ ),
|
|
|
|
|
+]
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# -----------------------------------------------------------------------------
|
|
|
|
|
+# 工具函数
|
|
|
|
|
+# -----------------------------------------------------------------------------
|
|
|
|
|
+def duplicate_versioned_junctions(
|
|
|
|
|
+ cur,
|
|
|
|
|
+ suffix: str,
|
|
|
|
|
+ version: str,
|
|
|
|
|
+ *,
|
|
|
|
|
+ req_table: str = 'requirement',
|
|
|
|
|
+ dry_run: bool = False,
|
|
|
|
|
+ on_progress=None,
|
|
|
|
|
+) -> dict:
|
|
|
|
|
+ """
|
|
|
|
|
+ 把所有 `copy_semantics == 'versioned'` 的 junction 表的 base 行复制给带 suffix 的 reqs。
|
|
|
|
|
+
|
|
|
|
|
+ 例:`duplicate_versioned_junctions(cur, '__td', 'tao_dev')`
|
|
|
|
|
+ 会把 `requirement_pattern` / `requirement_node` 里 requirement_id 为 base 的行
|
|
|
|
|
+ 全部复制一份给 requirement_id 为 base + '__td' 且 version='tao_dev' 的 reqs。
|
|
|
|
|
+
|
|
|
|
|
+ 幂等(ON CONFLICT DO NOTHING)。
|
|
|
|
|
+
|
|
|
|
|
+ Args:
|
|
|
|
|
+ cur: psycopg2 cursor(autocommit=True 连接)
|
|
|
|
|
+ suffix: req id 后缀,如 '__td'
|
|
|
|
|
+ version: 目标 version 值,如 'tao_dev'
|
|
|
|
|
+ req_table:通常固定 'requirement';可指定以便在测试 schema 上跑
|
|
|
|
|
+ dry_run: True 时只打印 SQL,不执行
|
|
|
|
|
+ on_progress: 可选回调 (table:str, inserted:int, elapsed:float) → None
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ {table_name: inserted_rowcount, ...}
|
|
|
|
|
+ """
|
|
|
|
|
+ result: dict = {}
|
|
|
|
|
+ import time
|
|
|
|
|
+ for spec in REQUIREMENT_JUNCTION_TABLES:
|
|
|
|
|
+ if spec.copy_semantics != 'versioned':
|
|
|
|
|
+ continue
|
|
|
|
|
+ payload_csv = ', '.join(spec.payload_columns)
|
|
|
|
|
+ source_csv = ', '.join(f'src.{c}' for c in spec.payload_columns)
|
|
|
|
|
+ sql = f"""
|
|
|
|
|
+ INSERT INTO {spec.table} (requirement_id, {payload_csv}, version)
|
|
|
|
|
+ SELECT r.id, {source_csv}, %s
|
|
|
|
|
+ FROM {spec.table} src
|
|
|
|
|
+ JOIN {req_table} r ON r.id = src.requirement_id || %s
|
|
|
|
|
+ WHERE r.version = %s
|
|
|
|
|
+ ON CONFLICT DO NOTHING
|
|
|
|
|
+ """
|
|
|
|
|
+ if dry_run:
|
|
|
|
|
+ result[spec.table] = 0
|
|
|
|
|
+ if on_progress:
|
|
|
|
|
+ on_progress(spec.table, 0, 0.0)
|
|
|
|
|
+ continue
|
|
|
|
|
+ t0 = time.time()
|
|
|
|
|
+ cur.execute(sql, (version, suffix, version))
|
|
|
|
|
+ inserted = cur.rowcount or 0
|
|
|
|
|
+ result[spec.table] = inserted
|
|
|
|
|
+ if on_progress:
|
|
|
|
|
+ on_progress(spec.table, inserted, time.time() - t0)
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def audit_req_junction_coverage(cur, version: str, *, req_table: str = 'requirement') -> dict:
|
|
|
|
|
+ """
|
|
|
|
|
+ 诊断:给出某版本下每张 junction 表的 req 覆盖情况。
|
|
|
|
|
+
|
|
|
|
|
+ Returns:
|
|
|
|
|
+ {
|
|
|
|
|
+ 'version': 'tao_dev',
|
|
|
|
|
+ 'total_reqs': 99,
|
|
|
|
|
+ 'tables': {
|
|
|
|
|
+ 'requirement_pattern': {'covered': 77, 'missing': 22, 'semantics': 'versioned'},
|
|
|
|
|
+ 'requirement_node': {'covered': 99, 'missing': 0, 'semantics': 'versioned'},
|
|
|
|
|
+ ...
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ """
|
|
|
|
|
+ cur.execute(f'SELECT COUNT(*) AS c FROM {req_table} WHERE version = %s', (version,))
|
|
|
|
|
+ total = cur.fetchone()['c']
|
|
|
|
|
+ out = {'version': version, 'total_reqs': total, 'tables': {}}
|
|
|
|
|
+ for spec in REQUIREMENT_JUNCTION_TABLES:
|
|
|
|
|
+ cur.execute(
|
|
|
|
|
+ f"""
|
|
|
|
|
+ SELECT COUNT(DISTINCT r.id) AS c
|
|
|
|
|
+ FROM {req_table} r
|
|
|
|
|
+ JOIN {spec.table} j ON j.requirement_id = r.id
|
|
|
|
|
+ WHERE r.version = %s
|
|
|
|
|
+ """,
|
|
|
|
|
+ (version,),
|
|
|
|
|
+ )
|
|
|
|
|
+ covered = cur.fetchone()['c']
|
|
|
|
|
+ out['tables'][spec.table] = {
|
|
|
|
|
+ 'covered': covered,
|
|
|
|
|
+ 'missing': total - covered,
|
|
|
|
|
+ 'semantics': spec.copy_semantics,
|
|
|
|
|
+ }
|
|
|
|
|
+ return out
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def junction_table_names(semantics: Optional[CopySemantics] = None) -> List[str]:
|
|
|
|
|
+ """便捷函数:列出所有(或指定语义的)junction 表名。"""
|
|
|
|
|
+ if semantics is None:
|
|
|
|
|
+ return [s.table for s in REQUIREMENT_JUNCTION_TABLES]
|
|
|
|
|
+ return [s.table for s in REQUIREMENT_JUNCTION_TABLES if s.copy_semantics == semantics]
|