pg_resource_store.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. #!/usr/bin/env python3
  2. """
  3. PostgreSQL Resources 存储封装
  4. """
  5. import os
  6. import json
  7. import time
  8. import psycopg2
  9. from psycopg2.extras import RealDictCursor
  10. from typing import Optional, List, Dict
  11. from dotenv import load_dotenv
  12. from knowhub.knowhub_db.cascade import cascade_delete
  13. load_dotenv()
  14. class PostgreSQLResourceStore:
  15. def __init__(self):
  16. """初始化 PostgreSQL 连接"""
  17. self.conn = psycopg2.connect(
  18. host=os.getenv('KNOWHUB_DB'),
  19. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  20. user=os.getenv('KNOWHUB_USER'),
  21. password=os.getenv('KNOWHUB_PASSWORD'),
  22. database=os.getenv('KNOWHUB_DB_NAME')
  23. )
  24. self.conn.autocommit = True
  25. def _reconnect(self):
  26. self.conn = psycopg2.connect(
  27. host=os.getenv('KNOWHUB_DB'),
  28. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  29. user=os.getenv('KNOWHUB_USER'),
  30. password=os.getenv('KNOWHUB_PASSWORD'),
  31. database=os.getenv('KNOWHUB_DB_NAME')
  32. )
  33. self.conn.autocommit = True
  34. def _ensure_connection(self):
  35. if self.conn.closed != 0:
  36. self._reconnect()
  37. else:
  38. try:
  39. c = self.conn.cursor()
  40. c.execute("SELECT 1")
  41. c.close()
  42. except (psycopg2.OperationalError, psycopg2.InterfaceError):
  43. self._reconnect()
  44. def _get_cursor(self):
  45. """获取游标"""
  46. self._ensure_connection()
  47. return self.conn.cursor(cursor_factory=RealDictCursor)
  48. def insert_or_update(self, resource: Dict):
  49. """插入或更新资源。
  50. 注:AnalyticDB beam 表不支持 ON CONFLICT DO UPDATE 当含 ALTER 新增列,改用 DELETE+INSERT。
  51. junction 不受影响(不带 FK,DELETE 仅删实体行)。
  52. """
  53. cursor = self._get_cursor()
  54. try:
  55. now_ts = int(time.time())
  56. cursor.execute("DELETE FROM resource WHERE id = %s", (resource['id'],))
  57. cursor.execute("""
  58. INSERT INTO resource (id, title, body, secure_body, content_type,
  59. metadata, sort_order, submitted_by, created_at, updated_at,
  60. version, images)
  61. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  62. """, (
  63. resource['id'],
  64. resource['title'],
  65. resource['body'],
  66. resource.get('secure_body', ''),
  67. resource['content_type'],
  68. json.dumps(resource.get('metadata', {})),
  69. resource.get('sort_order', 0),
  70. resource.get('submitted_by', ''),
  71. resource.get('created_at', now_ts),
  72. now_ts,
  73. resource.get('version', 'v0'),
  74. json.dumps(resource.get('images', [])),
  75. ))
  76. self.conn.commit()
  77. finally:
  78. cursor.close()
  79. def get_by_id(self, resource_id: str) -> Optional[Dict]:
  80. """根据ID获取资源"""
  81. cursor = self._get_cursor()
  82. try:
  83. cursor.execute("""
  84. SELECT id, title, body, secure_body, content_type, metadata, sort_order,
  85. created_at, updated_at, version, images
  86. FROM resource WHERE id = %s
  87. """, (resource_id,))
  88. row = cursor.fetchone()
  89. if not row:
  90. return None
  91. result = dict(row)
  92. if result.get('metadata') and isinstance(result['metadata'], str):
  93. result['metadata'] = json.loads(result['metadata'])
  94. if result.get('images') and isinstance(result['images'], str):
  95. result['images'] = json.loads(result['images'])
  96. elif result.get('images') is None:
  97. result['images'] = []
  98. return result
  99. finally:
  100. cursor.close()
  101. def list_resources(self, prefix: Optional[str] = None, content_type: Optional[str] = None,
  102. version: Optional[str] = None,
  103. limit: int = 100, offset: int = 0) -> List[Dict]:
  104. """列出资源。version=None 返回所有版本。"""
  105. cursor = self._get_cursor()
  106. try:
  107. conditions = []
  108. params = []
  109. if prefix:
  110. conditions.append("id LIKE %s")
  111. params.append(f"{prefix}%")
  112. if content_type:
  113. conditions.append("content_type = %s")
  114. params.append(content_type)
  115. if version is not None:
  116. conditions.append("version = %s")
  117. params.append(version)
  118. where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
  119. sql = f"""
  120. SELECT id, title, content_type, metadata, images, version, created_at, updated_at
  121. FROM resource
  122. {where_clause}
  123. ORDER BY sort_order, id
  124. LIMIT %s OFFSET %s
  125. """
  126. params.extend([limit, offset])
  127. cursor.execute(sql, params)
  128. results = cursor.fetchall()
  129. out = []
  130. for r in results:
  131. d = dict(r)
  132. if d.get('images') and isinstance(d['images'], str):
  133. d['images'] = json.loads(d['images'])
  134. elif d.get('images') is None:
  135. d['images'] = []
  136. if d.get('metadata') and isinstance(d['metadata'], str):
  137. d['metadata'] = json.loads(d['metadata'])
  138. out.append(d)
  139. return out
  140. finally:
  141. cursor.close()
  142. def update(self, resource_id: str, updates: Dict):
  143. """更新资源"""
  144. cursor = self._get_cursor()
  145. try:
  146. set_parts = []
  147. params = []
  148. for key, value in updates.items():
  149. if key == 'metadata':
  150. set_parts.append(f"{key} = %s")
  151. params.append(json.dumps(value))
  152. else:
  153. set_parts.append(f"{key} = %s")
  154. params.append(value)
  155. set_parts.append("updated_at = %s")
  156. params.append(int(time.time()))
  157. params.append(resource_id)
  158. sql = f"UPDATE resource SET {', '.join(set_parts)} WHERE id = %s"
  159. cursor.execute(sql, params)
  160. self.conn.commit()
  161. finally:
  162. cursor.close()
  163. def delete(self, resource_id: str):
  164. """删除资源及其关联表记录"""
  165. cursor = self._get_cursor()
  166. try:
  167. cascade_delete(cursor, 'resource', resource_id)
  168. self.conn.commit()
  169. finally:
  170. cursor.close()
  171. def get_siblings(self, resource_id: str) -> tuple:
  172. """获取同级资源(用于导航)"""
  173. cursor = self._get_cursor()
  174. try:
  175. # 获取父级前缀
  176. parts = resource_id.split("/")
  177. if len(parts) <= 1:
  178. return None, None
  179. parent_prefix = "/".join(parts[:-1])
  180. # 获取前一个
  181. cursor.execute("""
  182. SELECT id, title FROM resource
  183. WHERE id LIKE %s AND id < %s
  184. ORDER BY id DESC LIMIT 1
  185. """, (f"{parent_prefix}/%", resource_id))
  186. prev_row = cursor.fetchone()
  187. # 获取后一个
  188. cursor.execute("""
  189. SELECT id, title FROM resource
  190. WHERE id LIKE %s AND id > %s
  191. ORDER BY id ASC LIMIT 1
  192. """, (f"{parent_prefix}/%", resource_id))
  193. next_row = cursor.fetchone()
  194. return (dict(prev_row) if prev_row else None,
  195. dict(next_row) if next_row else None)
  196. finally:
  197. cursor.close()
  198. def close(self):
  199. """关闭连接"""
  200. if self.conn:
  201. self.conn.close()