pg_resource_store.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. #!/usr/bin/env python3
  2. """
  3. PostgreSQL Resources 存储封装
  4. """
  5. import os
  6. import json
  7. import time
  8. import psycopg2
  9. from psycopg2.extras import RealDictCursor
  10. from typing import Optional, List, Dict
  11. from dotenv import load_dotenv
  12. from knowhub.knowhub_db.cascade import cascade_delete
  13. load_dotenv()
  14. class PostgreSQLResourceStore:
  15. def __init__(self):
  16. """初始化 PostgreSQL 连接"""
  17. self.conn = psycopg2.connect(
  18. host=os.getenv('KNOWHUB_DB'),
  19. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  20. user=os.getenv('KNOWHUB_USER'),
  21. password=os.getenv('KNOWHUB_PASSWORD'),
  22. database=os.getenv('KNOWHUB_DB_NAME')
  23. )
  24. self.conn.autocommit = True
  25. def _reconnect(self):
  26. self.conn = psycopg2.connect(
  27. host=os.getenv('KNOWHUB_DB'),
  28. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  29. user=os.getenv('KNOWHUB_USER'),
  30. password=os.getenv('KNOWHUB_PASSWORD'),
  31. database=os.getenv('KNOWHUB_DB_NAME')
  32. )
  33. self.conn.autocommit = True
  34. def _ensure_connection(self):
  35. if self.conn.closed != 0:
  36. self._reconnect()
  37. else:
  38. try:
  39. c = self.conn.cursor()
  40. c.execute("SELECT 1")
  41. c.close()
  42. except (psycopg2.OperationalError, psycopg2.InterfaceError):
  43. self._reconnect()
  44. def _get_cursor(self):
  45. """获取游标"""
  46. self._ensure_connection()
  47. return self.conn.cursor(cursor_factory=RealDictCursor)
  48. def insert_or_update(self, resource: Dict):
  49. """插入或更新资源。
  50. 注:AnalyticDB beam 表不支持 ON CONFLICT DO UPDATE 当含 ALTER 新增列,改用 DELETE+INSERT。
  51. junction 不受影响(不带 FK,DELETE 仅删实体行)。
  52. """
  53. cursor = self._get_cursor()
  54. try:
  55. now_ts = int(time.time())
  56. cursor.execute("DELETE FROM resource WHERE id = %s", (resource['id'],))
  57. cursor.execute("""
  58. INSERT INTO resource (id, title, body, secure_body, content_type,
  59. metadata, sort_order, submitted_by, created_at, updated_at,
  60. version, images)
  61. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  62. """, (
  63. resource['id'],
  64. resource['title'],
  65. resource['body'],
  66. resource.get('secure_body', ''),
  67. resource['content_type'],
  68. json.dumps(resource.get('metadata', {})),
  69. resource.get('sort_order', 0),
  70. resource.get('submitted_by', ''),
  71. resource.get('created_at', now_ts),
  72. now_ts,
  73. resource.get('version', 'v0'),
  74. json.dumps(resource.get('images', [])),
  75. ))
  76. self.conn.commit()
  77. finally:
  78. cursor.close()
  79. def get_by_id(self, resource_id: str) -> Optional[Dict]:
  80. """根据ID获取资源"""
  81. from knowhub.knowhub_db.version_context import version_where
  82. cursor = self._get_cursor()
  83. try:
  84. vf, vp = version_where()
  85. cursor.execute(f"""
  86. SELECT id, title, body, secure_body, content_type, metadata, sort_order,
  87. created_at, updated_at, version, images
  88. FROM resource WHERE id = %s AND {vf}
  89. """, (resource_id, *vp))
  90. row = cursor.fetchone()
  91. if not row:
  92. return None
  93. result = dict(row)
  94. if result.get('metadata') and isinstance(result['metadata'], str):
  95. result['metadata'] = json.loads(result['metadata'])
  96. if result.get('images') and isinstance(result['images'], str):
  97. result['images'] = json.loads(result['images'])
  98. elif result.get('images') is None:
  99. result['images'] = []
  100. return result
  101. finally:
  102. cursor.close()
  103. def list_resources(self, prefix: Optional[str] = None, content_type: Optional[str] = None,
  104. version: Optional[str] = None,
  105. limit: int = 100, offset: int = 0) -> List[Dict]:
  106. """列出资源。version 显式传入时用传入值;未传时使用当前 contextvar 版本。"""
  107. from knowhub.knowhub_db.version_context import get_version
  108. cursor = self._get_cursor()
  109. try:
  110. conditions = []
  111. params = []
  112. if prefix:
  113. conditions.append("id LIKE %s")
  114. params.append(f"{prefix}%")
  115. if content_type:
  116. conditions.append("content_type = %s")
  117. params.append(content_type)
  118. effective_version = version if version is not None else get_version()
  119. conditions.append("version = %s")
  120. params.append(effective_version)
  121. where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
  122. sql = f"""
  123. SELECT id, title, content_type, metadata, images, version, created_at, updated_at
  124. FROM resource
  125. {where_clause}
  126. ORDER BY sort_order, id
  127. LIMIT %s OFFSET %s
  128. """
  129. params.extend([limit, offset])
  130. cursor.execute(sql, params)
  131. results = cursor.fetchall()
  132. out = []
  133. for r in results:
  134. d = dict(r)
  135. if d.get('images') and isinstance(d['images'], str):
  136. d['images'] = json.loads(d['images'])
  137. elif d.get('images') is None:
  138. d['images'] = []
  139. if d.get('metadata') and isinstance(d['metadata'], str):
  140. d['metadata'] = json.loads(d['metadata'])
  141. out.append(d)
  142. return out
  143. finally:
  144. cursor.close()
  145. def update(self, resource_id: str, updates: Dict):
  146. """更新资源"""
  147. cursor = self._get_cursor()
  148. try:
  149. set_parts = []
  150. params = []
  151. for key, value in updates.items():
  152. if key == 'metadata':
  153. set_parts.append(f"{key} = %s")
  154. params.append(json.dumps(value))
  155. else:
  156. set_parts.append(f"{key} = %s")
  157. params.append(value)
  158. set_parts.append("updated_at = %s")
  159. params.append(int(time.time()))
  160. params.append(resource_id)
  161. sql = f"UPDATE resource SET {', '.join(set_parts)} WHERE id = %s"
  162. cursor.execute(sql, params)
  163. self.conn.commit()
  164. finally:
  165. cursor.close()
  166. def delete(self, resource_id: str):
  167. """删除资源及其关联表记录"""
  168. cursor = self._get_cursor()
  169. try:
  170. cascade_delete(cursor, 'resource', resource_id)
  171. self.conn.commit()
  172. finally:
  173. cursor.close()
  174. def get_siblings(self, resource_id: str) -> tuple:
  175. """获取同级资源(用于导航)"""
  176. cursor = self._get_cursor()
  177. try:
  178. # 获取父级前缀
  179. parts = resource_id.split("/")
  180. if len(parts) <= 1:
  181. return None, None
  182. parent_prefix = "/".join(parts[:-1])
  183. # 获取前一个
  184. cursor.execute("""
  185. SELECT id, title FROM resource
  186. WHERE id LIKE %s AND id < %s
  187. ORDER BY id DESC LIMIT 1
  188. """, (f"{parent_prefix}/%", resource_id))
  189. prev_row = cursor.fetchone()
  190. # 获取后一个
  191. cursor.execute("""
  192. SELECT id, title FROM resource
  193. WHERE id LIKE %s AND id > %s
  194. ORDER BY id ASC LIMIT 1
  195. """, (f"{parent_prefix}/%", resource_id))
  196. next_row = cursor.fetchone()
  197. return (dict(prev_row) if prev_row else None,
  198. dict(next_row) if next_row else None)
  199. finally:
  200. cursor.close()
  201. def close(self):
  202. """关闭连接"""
  203. if self.conn:
  204. self.conn.close()