version_context.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. """
  2. 数据版本上下文:通过 ContextVar 传递"当前查询使用哪个版本的数据"。
  3. - 读路径:store 层的 list/search/count/get_by_id 从 ACTIVE_VERSION 读当前版本,加到 WHERE
  4. - 写路径:insert 时使用业务显式传入的 version(不从 contextvar 推断),避免误写入
  5. - knowledge 表默认带 'v0'(共享基层),其他表不带
  6. 典型调用链:
  7. FastAPI middleware 从请求头 X-KnowHub-Version 或 ?version=xxx 读取
  8. → set_version('dev_dedup')
  9. → store.list_all() 内部自动过滤 WHERE version = 'dev_dedup'
  10. """
  11. from contextvars import ContextVar
  12. from typing import Tuple, List
  13. DEFAULT_VERSION = 'tao_dev'
  14. KNOWLEDGE_BASE_VERSION = 'v0' # knowledge 共享基层,永远包含
  15. ACTIVE_VERSION: ContextVar[str] = ContextVar('knowhub_active_version', default=DEFAULT_VERSION)
  16. def get_version() -> str:
  17. return ACTIVE_VERSION.get()
  18. def set_version(v: str) -> None:
  19. ACTIVE_VERSION.set(v)
  20. def requirement_version() -> str:
  21. """
  22. reqs 的映射规则(非对称):
  23. - active='tao_dev' → 'tao_dev'(tao_dev 有自己的 req 行,带 __td 后缀)
  24. - active='dev_abstract' / 'dev_dedup' → 'v0'(这两版共享 v0 reqs,从未复制)
  25. - 其它 → 'v0'
  26. """
  27. v = ACTIVE_VERSION.get()
  28. return v if v == 'tao_dev' else 'v0'
  29. def req_version_where(alias: str = '') -> Tuple[str, List[str]]:
  30. col = f'{alias}.version' if alias else 'version'
  31. return f'{col} = %s', [requirement_version()]
  32. def version_where(alias: str = '', include_v0: bool = False) -> Tuple[str, List[str]]:
  33. """
  34. 构造 WHERE 版本过滤片段。
  35. Args:
  36. alias: 表别名(如 'k' / 'strategy'),空则不加前缀
  37. include_v0: 是否把 'v0' 也纳入(knowledge 用)
  38. Returns:
  39. (sql_fragment, params) — sql_fragment 用 %s 占位,params 给 cursor.execute
  40. """
  41. v = get_version()
  42. col = f'{alias}.version' if alias else 'version'
  43. if include_v0 and v != KNOWLEDGE_BASE_VERSION:
  44. return f'{col} = ANY(%s)', [[v, KNOWLEDGE_BASE_VERSION]]
  45. return f'{col} = %s', [v]