pg_resource_store.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. #!/usr/bin/env python3
  2. """
  3. PostgreSQL Resources 存储封装
  4. """
  5. import os
  6. import json
  7. import time
  8. import psycopg2
  9. from psycopg2.extras import RealDictCursor
  10. from typing import Optional, List, Dict
  11. from dotenv import load_dotenv
  12. from knowhub.knowhub_db.cascade import cascade_delete
  13. load_dotenv()
  14. class PostgreSQLResourceStore:
  15. def __init__(self):
  16. """初始化 PostgreSQL 连接"""
  17. self.conn = psycopg2.connect(
  18. host=os.getenv('KNOWHUB_DB'),
  19. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  20. user=os.getenv('KNOWHUB_USER'),
  21. password=os.getenv('KNOWHUB_PASSWORD'),
  22. database=os.getenv('KNOWHUB_DB_NAME')
  23. )
  24. self.conn.autocommit = False
  25. def _reconnect(self):
  26. self.conn = psycopg2.connect(
  27. host=os.getenv('KNOWHUB_DB'),
  28. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  29. user=os.getenv('KNOWHUB_USER'),
  30. password=os.getenv('KNOWHUB_PASSWORD'),
  31. database=os.getenv('KNOWHUB_DB_NAME')
  32. )
  33. self.conn.autocommit = False
  34. def _ensure_connection(self):
  35. if self.conn.closed != 0:
  36. self._reconnect()
  37. else:
  38. try:
  39. c = self.conn.cursor()
  40. c.execute("SELECT 1")
  41. c.close()
  42. except (psycopg2.OperationalError, psycopg2.InterfaceError):
  43. self._reconnect()
  44. def _get_cursor(self):
  45. """获取游标"""
  46. self._ensure_connection()
  47. return self.conn.cursor(cursor_factory=RealDictCursor)
  48. def insert_or_update(self, resource: Dict):
  49. """插入或更新资源"""
  50. cursor = self._get_cursor()
  51. try:
  52. now_ts = int(time.time())
  53. cursor.execute("""
  54. INSERT INTO resource (id, title, body, secure_body, content_type,
  55. metadata, sort_order, submitted_by, created_at, updated_at)
  56. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  57. ON CONFLICT (id) DO UPDATE SET
  58. title = EXCLUDED.title,
  59. body = EXCLUDED.body,
  60. secure_body = EXCLUDED.secure_body,
  61. content_type = EXCLUDED.content_type,
  62. metadata = EXCLUDED.metadata,
  63. sort_order = EXCLUDED.sort_order,
  64. updated_at = EXCLUDED.updated_at
  65. """, (
  66. resource['id'],
  67. resource['title'],
  68. resource['body'],
  69. resource.get('secure_body', ''),
  70. resource['content_type'],
  71. json.dumps(resource.get('metadata', {})),
  72. resource.get('sort_order', 0),
  73. resource.get('submitted_by', ''),
  74. resource.get('created_at', now_ts),
  75. now_ts
  76. ))
  77. self.conn.commit()
  78. finally:
  79. cursor.close()
  80. def get_by_id(self, resource_id: str) -> Optional[Dict]:
  81. """根据ID获取资源"""
  82. cursor = self._get_cursor()
  83. try:
  84. cursor.execute("""
  85. SELECT id, title, body, secure_body, content_type, metadata, sort_order,
  86. created_at, updated_at
  87. FROM resource WHERE id = %s
  88. """, (resource_id,))
  89. row = cursor.fetchone()
  90. if not row:
  91. return None
  92. result = dict(row)
  93. if result.get('metadata'):
  94. result['metadata'] = json.loads(result['metadata']) if isinstance(result['metadata'], str) else result['metadata']
  95. return result
  96. finally:
  97. cursor.close()
  98. def list_resources(self, prefix: Optional[str] = None, content_type: Optional[str] = None,
  99. limit: int = 100, offset: int = 0) -> List[Dict]:
  100. """列出资源"""
  101. cursor = self._get_cursor()
  102. try:
  103. conditions = []
  104. params = []
  105. if prefix:
  106. conditions.append("id LIKE %s")
  107. params.append(f"{prefix}%")
  108. if content_type:
  109. conditions.append("content_type = %s")
  110. params.append(content_type)
  111. where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
  112. sql = f"""
  113. SELECT id, title, content_type, metadata, created_at, updated_at
  114. FROM resource
  115. {where_clause}
  116. ORDER BY sort_order, id
  117. LIMIT %s OFFSET %s
  118. """
  119. params.extend([limit, offset])
  120. cursor.execute(sql, params)
  121. results = cursor.fetchall()
  122. return [dict(r) for r in results]
  123. finally:
  124. cursor.close()
  125. def update(self, resource_id: str, updates: Dict):
  126. """更新资源"""
  127. cursor = self._get_cursor()
  128. try:
  129. set_parts = []
  130. params = []
  131. for key, value in updates.items():
  132. if key == 'metadata':
  133. set_parts.append(f"{key} = %s")
  134. params.append(json.dumps(value))
  135. else:
  136. set_parts.append(f"{key} = %s")
  137. params.append(value)
  138. set_parts.append("updated_at = %s")
  139. params.append(int(time.time()))
  140. params.append(resource_id)
  141. sql = f"UPDATE resource SET {', '.join(set_parts)} WHERE id = %s"
  142. cursor.execute(sql, params)
  143. self.conn.commit()
  144. finally:
  145. cursor.close()
  146. def delete(self, resource_id: str):
  147. """删除资源及其关联表记录"""
  148. cursor = self._get_cursor()
  149. try:
  150. cascade_delete(cursor, 'resource', resource_id)
  151. self.conn.commit()
  152. finally:
  153. cursor.close()
  154. def get_siblings(self, resource_id: str) -> tuple:
  155. """获取同级资源(用于导航)"""
  156. cursor = self._get_cursor()
  157. try:
  158. # 获取父级前缀
  159. parts = resource_id.split("/")
  160. if len(parts) <= 1:
  161. return None, None
  162. parent_prefix = "/".join(parts[:-1])
  163. # 获取前一个
  164. cursor.execute("""
  165. SELECT id, title FROM resource
  166. WHERE id LIKE %s AND id < %s
  167. ORDER BY id DESC LIMIT 1
  168. """, (f"{parent_prefix}/%", resource_id))
  169. prev_row = cursor.fetchone()
  170. # 获取后一个
  171. cursor.execute("""
  172. SELECT id, title FROM resource
  173. WHERE id LIKE %s AND id > %s
  174. ORDER BY id ASC LIMIT 1
  175. """, (f"{parent_prefix}/%", resource_id))
  176. next_row = cursor.fetchone()
  177. return (dict(prev_row) if prev_row else None,
  178. dict(next_row) if next_row else None)
  179. finally:
  180. cursor.close()
  181. def close(self):
  182. """关闭连接"""
  183. if self.conn:
  184. self.conn.close()