versioning_contract.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. """
  2. KnowHub 版本冗余契约(Versioning Redundancy Contract)
  3. ===================================================
  4. **背景**:我们对核心实体(requirement / capability / resource / strategy)采用
  5. **严格冗余**的版本策略——每个版本都持有自己的一套完整行,而不是用别名 / 指针复用。
  6. 这样 `version='tao_dev'` 可以独立演化而不影响 `version='v0'`。
  7. 但"冗余"这件事在系统里的**落点分散**:
  8. - ingest 脚本(新版本从头入库)
  9. - duplicate 脚本(从已有版本复制出新版本)
  10. - migration 脚本(加表 / 加列 / 回填)
  11. 历史上栽过的坑:**新加了一张 junction 表,但 ingest/duplicate 脚本忘了同步**,导致
  12. 对应版本的 req 在那张表里**零覆盖**——dashboard 关联筛选直接失效。典型案例:
  13. `requirement_pattern` 和 `requirement_node`(2026-04 修复,见
  14. `scripts/backfill_requirement_pattern_versions.py`)。
  15. ---
  16. ## 本契约做三件事
  17. 1. **列表化** —— 单一真相源:所有 `requirement_id` 外键表在此集中声明。
  18. 以后加新 junction 表,**必须**往 `REQUIREMENT_JUNCTION_TABLES` 追加一行。
  19. 2. **分类冗余语义** —— 每张表标注 `copy_semantics`:
  20. - `'versioned'` : 有 `version` 列,**可以整批 bulk-copy**(remap requirement_id)
  21. - `'fresh-per-version'` : 无 version 列,payload 另一端是版本化 entity
  22. → 必须由 ingest / duplicate 脚本 **fresh 生成**,不能 bulk-copy
  23. 3. **工具函数** ——
  24. - `duplicate_versioned_junctions(cur, suffix, version)` :把所有 `'versioned'` 表的
  25. 基础行复制一份给带后缀的 req。幂等,可重跑。
  26. - `audit_req_junction_coverage(cur, version)` :诊断——
  27. 告诉你某版本在每张表里有多少 req 被覆盖、缺多少。
  28. ---
  29. ## 契约(Contract Rules)
  30. 任何人引入**新的 `requirement_id` 外键表**时必须:
  31. 1. 在 `REQUIREMENT_JUNCTION_TABLES` 里添加 `JunctionSpec` 条目
  32. 2. 决定其 `copy_semantics`:
  33. - 如果是"跨版本语义相同的外部引用"(比如 itemset_id、node_id 这种只是个 id),
  34. **加 `version` 列并标 `'versioned'`**——可以 bulk-copy,省事
  35. - 如果 payload 指向的 entity 本身是版本化的(比如 capability_id、resource_id),
  36. 则标 `'fresh-per-version'`——必须在 ingest/duplicate 脚本里自己管
  37. 3. 在 ingest / duplicate 脚本里加上对应的写入逻辑(如果是 fresh-per-version)
  38. ---
  39. ## 反面模式(DON'T)
  40. - 不要**只加表不标契约**。下次有人做新版本,这张表又漏一次
  41. - 不要给 `'versioned'` 表的 PK 少包含 `version` 列——会冲突覆盖
  42. - 不要把 `ON CONFLICT DO UPDATE` 用在加过新列的表上(AnalyticDB beam 限制,
  43. 见 `docs/db-operations.md §1`)——改用 `ON CONFLICT DO NOTHING`
  44. """
  45. from __future__ import annotations
  46. from dataclasses import dataclass, field
  47. from typing import List, Literal, Optional
  48. CopySemantics = Literal['versioned', 'fresh-per-version']
  49. @dataclass(frozen=True)
  50. class JunctionSpec:
  51. """某张 requirement 外键关系表的版本冗余规格。"""
  52. table: str
  53. """表名,如 'requirement_pattern'"""
  54. payload_columns: List[str]
  55. """除 requirement_id 和 version 之外,需要被原样保留的列(建 bulk-copy SQL 用)。
  56. 对 'fresh-per-version' 的表,此字段仅用于文档 / 诊断,不参与自动复制。"""
  57. has_version_column: bool
  58. """该表是否有 version 列。'versioned' 必为 True;'fresh-per-version' 通常为 False。"""
  59. copy_semantics: CopySemantics
  60. """决定 new-version 的行从哪来:
  61. - 'versioned' : 从 base-version 行直接复制
  62. - 'fresh-per-version' : 由 ingest/duplicate 脚本生成"""
  63. notes: str = ''
  64. """对维护者的说明:什么时候要 touch 这张表,payload 另一端怎么版本化等。"""
  65. # -----------------------------------------------------------------------------
  66. # 单一真相源:所有含 requirement_id 的 junction 表
  67. # -----------------------------------------------------------------------------
  68. REQUIREMENT_JUNCTION_TABLES: List[JunctionSpec] = [
  69. JunctionSpec(
  70. table='requirement_pattern',
  71. payload_columns=['itemset_id', 'execution_id'],
  72. has_version_column=True,
  73. copy_semantics='versioned',
  74. notes='pattern (itemset) 来自外部服务 (aiddit),跨 knowhub 版本语义相同,整批复制即可。',
  75. ),
  76. JunctionSpec(
  77. table='requirement_node',
  78. payload_columns=['node_id', 'execution_id', 'node_path'],
  79. has_version_column=True,
  80. copy_semantics='versioned',
  81. notes='category_tree 的节点 id 全局共享,整批复制即可。',
  82. ),
  83. JunctionSpec(
  84. table='requirement_capability',
  85. payload_columns=['capability_id'],
  86. has_version_column=False,
  87. copy_semantics='fresh-per-version',
  88. notes='capability 本身版本化(id 带 __<ver> 后缀)。由 ingest/duplicate 脚本 '
  89. '随 cap 一起生成(见 taodev_ingest.py / version_step2_duplicate_*.py)。',
  90. ),
  91. JunctionSpec(
  92. table='requirement_resource',
  93. payload_columns=['resource_id'],
  94. has_version_column=False,
  95. copy_semantics='fresh-per-version',
  96. notes='resource 本身版本化。随 resource 一起由 ingest/duplicate 脚本生成。',
  97. ),
  98. JunctionSpec(
  99. table='requirement_strategy',
  100. payload_columns=['strategy_id', 'is_selected', 'coverage_score', 'coverage_explanation'],
  101. has_version_column=False,
  102. copy_semantics='fresh-per-version',
  103. notes='strategy 本身版本化。随 strategy 一起生成。',
  104. ),
  105. JunctionSpec(
  106. table='requirement_knowledge',
  107. payload_columns=['knowledge_id', 'relation_type', 'is_selected',
  108. 'coverage_score', 'coverage_explanation'],
  109. has_version_column=False,
  110. copy_semantics='fresh-per-version',
  111. notes='knowledge 使用 v0 共享基层;关联关系由业务逻辑 / LLM pipeline 生成,'
  112. '非 bulk-copy 场景。',
  113. ),
  114. ]
  115. # -----------------------------------------------------------------------------
  116. # 工具函数
  117. # -----------------------------------------------------------------------------
  118. def duplicate_versioned_junctions(
  119. cur,
  120. suffix: str,
  121. version: str,
  122. *,
  123. req_table: str = 'requirement',
  124. dry_run: bool = False,
  125. on_progress=None,
  126. ) -> dict:
  127. """
  128. 把所有 `copy_semantics == 'versioned'` 的 junction 表的 base 行复制给带 suffix 的 reqs。
  129. 例:`duplicate_versioned_junctions(cur, '__td', 'tao_dev')`
  130. 会把 `requirement_pattern` / `requirement_node` 里 requirement_id 为 base 的行
  131. 全部复制一份给 requirement_id 为 base + '__td' 且 version='tao_dev' 的 reqs。
  132. 幂等(ON CONFLICT DO NOTHING)。
  133. Args:
  134. cur: psycopg2 cursor(autocommit=True 连接)
  135. suffix: req id 后缀,如 '__td'
  136. version: 目标 version 值,如 'tao_dev'
  137. req_table:通常固定 'requirement';可指定以便在测试 schema 上跑
  138. dry_run: True 时只打印 SQL,不执行
  139. on_progress: 可选回调 (table:str, inserted:int, elapsed:float) → None
  140. Returns:
  141. {table_name: inserted_rowcount, ...}
  142. """
  143. result: dict = {}
  144. import time
  145. for spec in REQUIREMENT_JUNCTION_TABLES:
  146. if spec.copy_semantics != 'versioned':
  147. continue
  148. payload_csv = ', '.join(spec.payload_columns)
  149. source_csv = ', '.join(f'src.{c}' for c in spec.payload_columns)
  150. sql = f"""
  151. INSERT INTO {spec.table} (requirement_id, {payload_csv}, version)
  152. SELECT r.id, {source_csv}, %s
  153. FROM {spec.table} src
  154. JOIN {req_table} r ON r.id = src.requirement_id || %s
  155. WHERE r.version = %s
  156. ON CONFLICT DO NOTHING
  157. """
  158. if dry_run:
  159. result[spec.table] = 0
  160. if on_progress:
  161. on_progress(spec.table, 0, 0.0)
  162. continue
  163. t0 = time.time()
  164. cur.execute(sql, (version, suffix, version))
  165. inserted = cur.rowcount or 0
  166. result[spec.table] = inserted
  167. if on_progress:
  168. on_progress(spec.table, inserted, time.time() - t0)
  169. return result
  170. def audit_req_junction_coverage(cur, version: str, *, req_table: str = 'requirement') -> dict:
  171. """
  172. 诊断:给出某版本下每张 junction 表的 req 覆盖情况。
  173. Returns:
  174. {
  175. 'version': 'tao_dev',
  176. 'total_reqs': 99,
  177. 'tables': {
  178. 'requirement_pattern': {'covered': 77, 'missing': 22, 'semantics': 'versioned'},
  179. 'requirement_node': {'covered': 99, 'missing': 0, 'semantics': 'versioned'},
  180. ...
  181. }
  182. }
  183. """
  184. cur.execute(f'SELECT COUNT(*) AS c FROM {req_table} WHERE version = %s', (version,))
  185. total = cur.fetchone()['c']
  186. out = {'version': version, 'total_reqs': total, 'tables': {}}
  187. for spec in REQUIREMENT_JUNCTION_TABLES:
  188. cur.execute(
  189. f"""
  190. SELECT COUNT(DISTINCT r.id) AS c
  191. FROM {req_table} r
  192. JOIN {spec.table} j ON j.requirement_id = r.id
  193. WHERE r.version = %s
  194. """,
  195. (version,),
  196. )
  197. covered = cur.fetchone()['c']
  198. out['tables'][spec.table] = {
  199. 'covered': covered,
  200. 'missing': total - covered,
  201. 'semantics': spec.copy_semantics,
  202. }
  203. return out
  204. def junction_table_names(semantics: Optional[CopySemantics] = None) -> List[str]:
  205. """便捷函数:列出所有(或指定语义的)junction 表名。"""
  206. if semantics is None:
  207. return [s.table for s in REQUIREMENT_JUNCTION_TABLES]
  208. return [s.table for s in REQUIREMENT_JUNCTION_TABLES if s.copy_semantics == semantics]