cascade.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. """
  2. 级联删除:Greenplum 不支持 FK ON DELETE CASCADE,由应用层保证。
  3. 用法:
  4. from knowhub.knowhub_db.cascade import cascade_delete
  5. cascade_delete(cursor, 'knowledge', knowledge_id)
  6. """
  7. # 每个实体表涉及的关联表及其外键列名
  8. _JUNCTIONS = {
  9. 'knowledge': [
  10. ('requirement_knowledge', 'knowledge_id'),
  11. ('capability_knowledge', 'knowledge_id'),
  12. ('tool_knowledge', 'knowledge_id'),
  13. ('knowledge_resource', 'knowledge_id'),
  14. ('knowledge_relation', 'source_id'),
  15. ('knowledge_relation', 'target_id'),
  16. ('strategy_knowledge', 'knowledge_id'),
  17. ],
  18. 'tool': [
  19. ('capability_tool', 'tool_id'),
  20. ('tool_knowledge', 'tool_id'),
  21. ('tool_provider', 'tool_id'),
  22. ],
  23. 'capability': [
  24. ('requirement_capability', 'capability_id'),
  25. ('capability_tool', 'capability_id'),
  26. ('capability_knowledge', 'capability_id'),
  27. ('capability_resource', 'capability_id'),
  28. ('strategy_capability', 'capability_id'),
  29. ],
  30. 'requirement': [
  31. ('requirement_capability', 'requirement_id'),
  32. ('requirement_knowledge', 'requirement_id'),
  33. ('requirement_resource', 'requirement_id'),
  34. ('requirement_strategy', 'requirement_id'),
  35. ],
  36. 'resource': [
  37. ('knowledge_resource', 'resource_id'),
  38. ('capability_resource', 'resource_id'),
  39. ('requirement_resource', 'resource_id'),
  40. ('strategy_resource', 'resource_id'),
  41. ],
  42. 'strategy': [
  43. ('strategy_capability', 'strategy_id'),
  44. ('strategy_knowledge', 'strategy_id'),
  45. ('strategy_resource', 'strategy_id'),
  46. ('requirement_strategy', 'strategy_id'),
  47. ],
  48. }
  49. def cascade_delete(cursor, entity_table: str, entity_id: str):
  50. """先删除关联表中的引用行,再删除实体本身"""
  51. for junction_table, fk_column in _JUNCTIONS.get(entity_table, []):
  52. cursor.execute(
  53. f"DELETE FROM {junction_table} WHERE {fk_column} = %s",
  54. (entity_id,))
  55. cursor.execute(
  56. f"DELETE FROM {entity_table} WHERE id = %s",
  57. (entity_id,))
  58. # 带 version 字段的实体表(tool 不在其中——它的 version 是工具自身发布版本,不是多租户标签)
  59. _VERSIONED_ENTITIES = ['knowledge', 'resource', 'requirement', 'capability', 'strategy']
  60. def purge_version(cursor, version: str) -> dict:
  61. """
  62. 批量清除指定 version 的所有实体(及其 junction)。
  63. 常用于测试数据迭代——整批抹掉 tao_dev_1,重新入库。
  64. Returns: {table: row_count_deleted, ...}
  65. """
  66. if not version or version == 'v0':
  67. raise ValueError(f"Refusing to purge protected version: {version!r}(v0 和空字符串受保护)")
  68. stats = {}
  69. for table in _VERSIONED_ENTITIES:
  70. # 先清掉所有指向这个版本实体的 junction 行
  71. for junction, fk_col in _JUNCTIONS.get(table, []):
  72. cursor.execute(
  73. f"DELETE FROM {junction} WHERE {fk_col} IN "
  74. f"(SELECT id FROM {table} WHERE version = %s)",
  75. (version,))
  76. # 再删实体本身
  77. cursor.execute(f"DELETE FROM {table} WHERE version = %s", (version,))
  78. stats[table] = cursor.rowcount if cursor.rowcount is not None else 0
  79. return stats