phase2_schema_migration.py 4.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. #!/usr/bin/env python3
  2. """
  3. Phase 2 schema migration:
  4. 1. ALTER requirement_strategy 加 3 列(is_selected / coverage_score / coverage_explanation)
  5. 2. CREATE TABLE requirement_knowledge(带 metadata)
  6. 3. CREATE TABLE knowledge_resource(纯 junction)
  7. 幂等:IF NOT EXISTS / 检查列存在。autocommit=True 安全。
  8. """
  9. import sys
  10. from pathlib import Path
  11. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  12. from knowhub.knowhub_db.pg_capability_store import PostgreSQLCapabilityStore
  13. def column_exists(cur, table, col):
  14. cur.execute("""SELECT EXISTS (SELECT 1 FROM information_schema.columns
  15. WHERE table_name=%s AND column_name=%s)""", (table, col))
  16. return cur.fetchone()['exists']
  17. def table_exists(cur, table):
  18. cur.execute("""SELECT EXISTS (SELECT 1 FROM information_schema.tables
  19. WHERE table_name=%s)""", (table,))
  20. return cur.fetchone()['exists']
  21. def main():
  22. s = PostgreSQLCapabilityStore()
  23. cur = s._get_cursor()
  24. try:
  25. # ─────────────────────────────────────────────────
  26. # Step 1: ALTER requirement_strategy
  27. print('=== Step 1: ALTER requirement_strategy ===', flush=True)
  28. for col, typ in [('is_selected', 'BOOLEAN'),
  29. ('coverage_score', 'FLOAT'),
  30. ('coverage_explanation', 'TEXT')]:
  31. if column_exists(cur, 'requirement_strategy', col):
  32. print(f' {col} 已存在,跳过', flush=True)
  33. else:
  34. cur.execute(f'ALTER TABLE requirement_strategy ADD COLUMN {col} {typ}')
  35. print(f' ✓ 添加 {col} {typ}', flush=True)
  36. # ─────────────────────────────────────────────────
  37. # Step 2: CREATE requirement_knowledge
  38. print('\n=== Step 2: CREATE requirement_knowledge ===', flush=True)
  39. if table_exists(cur, 'requirement_knowledge'):
  40. print(' 已存在,跳过', flush=True)
  41. else:
  42. cur.execute("""
  43. CREATE TABLE requirement_knowledge (
  44. requirement_id VARCHAR NOT NULL,
  45. knowledge_id VARCHAR NOT NULL,
  46. is_selected BOOLEAN,
  47. coverage_score FLOAT,
  48. coverage_explanation TEXT,
  49. PRIMARY KEY (requirement_id, knowledge_id)
  50. ) DISTRIBUTED BY (requirement_id)
  51. """)
  52. print(' ✓ 已建表', flush=True)
  53. # ─────────────────────────────────────────────────
  54. # Step 3: CREATE knowledge_resource
  55. print('\n=== Step 3: CREATE knowledge_resource ===', flush=True)
  56. if table_exists(cur, 'knowledge_resource'):
  57. print(' 已存在,跳过', flush=True)
  58. else:
  59. cur.execute("""
  60. CREATE TABLE knowledge_resource (
  61. knowledge_id VARCHAR NOT NULL,
  62. resource_id VARCHAR NOT NULL,
  63. PRIMARY KEY (knowledge_id, resource_id)
  64. ) DISTRIBUTED BY (knowledge_id)
  65. """)
  66. print(' ✓ 已建表', flush=True)
  67. # ─────────────────────────────────────────────────
  68. # 验证
  69. print('\n=== 验证 ===', flush=True)
  70. cur.execute("""SELECT column_name, data_type FROM information_schema.columns
  71. WHERE table_name='requirement_strategy' ORDER BY ordinal_position""")
  72. print('requirement_strategy 列:')
  73. for r in cur.fetchall():
  74. print(f' {r["column_name"]:28s} {r["data_type"]}')
  75. for t in ['requirement_knowledge', 'knowledge_resource']:
  76. cur.execute("""SELECT column_name, data_type FROM information_schema.columns
  77. WHERE table_name=%s ORDER BY ordinal_position""", (t,))
  78. print(f'\n{t} 列:')
  79. for r in cur.fetchall():
  80. print(f' {r["column_name"]:28s} {r["data_type"]}')
  81. finally:
  82. cur.close()
  83. s.close()
  84. if __name__ == '__main__':
  85. main()