pg_resource_store.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. #!/usr/bin/env python3
  2. """
  3. PostgreSQL Resources 存储封装
  4. """
  5. import os
  6. import json
  7. import time
  8. import psycopg2
  9. from psycopg2.extras import RealDictCursor
  10. from typing import Optional, List, Dict
  11. from dotenv import load_dotenv
  12. load_dotenv()
  13. class PostgreSQLResourceStore:
  14. def __init__(self):
  15. """初始化 PostgreSQL 连接"""
  16. self.conn = psycopg2.connect(
  17. host=os.getenv('KNOWHUB_DB'),
  18. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  19. user=os.getenv('KNOWHUB_USER'),
  20. password=os.getenv('KNOWHUB_PASSWORD'),
  21. database=os.getenv('KNOWHUB_DB_NAME')
  22. )
  23. self.conn.autocommit = False
  24. def _reconnect(self):
  25. self.conn = psycopg2.connect(
  26. host=os.getenv('KNOWHUB_DB'),
  27. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  28. user=os.getenv('KNOWHUB_USER'),
  29. password=os.getenv('KNOWHUB_PASSWORD'),
  30. database=os.getenv('KNOWHUB_DB_NAME')
  31. )
  32. self.conn.autocommit = False
  33. def _ensure_connection(self):
  34. if self.conn.closed != 0:
  35. self._reconnect()
  36. else:
  37. try:
  38. c = self.conn.cursor()
  39. c.execute("SELECT 1")
  40. c.close()
  41. except (psycopg2.OperationalError, psycopg2.InterfaceError):
  42. self._reconnect()
  43. def _get_cursor(self):
  44. """获取游标"""
  45. self._ensure_connection()
  46. return self.conn.cursor(cursor_factory=RealDictCursor)
  47. def insert_or_update(self, resource: Dict):
  48. """插入或更新资源"""
  49. cursor = self._get_cursor()
  50. try:
  51. now_ts = int(time.time())
  52. cursor.execute("""
  53. INSERT INTO resources (id, title, body, secure_body, content_type,
  54. metadata, sort_order, submitted_by, created_at, updated_at)
  55. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  56. ON CONFLICT (id) DO UPDATE SET
  57. title = EXCLUDED.title,
  58. body = EXCLUDED.body,
  59. secure_body = EXCLUDED.secure_body,
  60. content_type = EXCLUDED.content_type,
  61. metadata = EXCLUDED.metadata,
  62. sort_order = EXCLUDED.sort_order,
  63. updated_at = EXCLUDED.updated_at
  64. """, (
  65. resource['id'],
  66. resource['title'],
  67. resource['body'],
  68. resource.get('secure_body', ''),
  69. resource['content_type'],
  70. json.dumps(resource.get('metadata', {})),
  71. resource.get('sort_order', 0),
  72. resource.get('submitted_by', ''),
  73. resource.get('created_at', now_ts),
  74. now_ts
  75. ))
  76. self.conn.commit()
  77. finally:
  78. cursor.close()
  79. def get_by_id(self, resource_id: str) -> Optional[Dict]:
  80. """根据ID获取资源"""
  81. cursor = self._get_cursor()
  82. try:
  83. cursor.execute("""
  84. SELECT id, title, body, secure_body, content_type, metadata, sort_order,
  85. created_at, updated_at
  86. FROM resources WHERE id = %s
  87. """, (resource_id,))
  88. row = cursor.fetchone()
  89. if not row:
  90. return None
  91. result = dict(row)
  92. if result.get('metadata'):
  93. result['metadata'] = json.loads(result['metadata']) if isinstance(result['metadata'], str) else result['metadata']
  94. return result
  95. finally:
  96. cursor.close()
  97. def list_resources(self, prefix: Optional[str] = None, content_type: Optional[str] = None,
  98. limit: int = 100, offset: int = 0) -> List[Dict]:
  99. """列出资源"""
  100. cursor = self._get_cursor()
  101. try:
  102. conditions = []
  103. params = []
  104. if prefix:
  105. conditions.append("id LIKE %s")
  106. params.append(f"{prefix}%")
  107. if content_type:
  108. conditions.append("content_type = %s")
  109. params.append(content_type)
  110. where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
  111. sql = f"""
  112. SELECT id, title, content_type, metadata, created_at, updated_at
  113. FROM resources
  114. {where_clause}
  115. ORDER BY sort_order, id
  116. LIMIT %s OFFSET %s
  117. """
  118. params.extend([limit, offset])
  119. cursor.execute(sql, params)
  120. results = cursor.fetchall()
  121. return [dict(r) for r in results]
  122. finally:
  123. cursor.close()
  124. def update(self, resource_id: str, updates: Dict):
  125. """更新资源"""
  126. cursor = self._get_cursor()
  127. try:
  128. set_parts = []
  129. params = []
  130. for key, value in updates.items():
  131. if key == 'metadata':
  132. set_parts.append(f"{key} = %s")
  133. params.append(json.dumps(value))
  134. else:
  135. set_parts.append(f"{key} = %s")
  136. params.append(value)
  137. set_parts.append("updated_at = %s")
  138. params.append(int(time.time()))
  139. params.append(resource_id)
  140. sql = f"UPDATE resources SET {', '.join(set_parts)} WHERE id = %s"
  141. cursor.execute(sql, params)
  142. self.conn.commit()
  143. finally:
  144. cursor.close()
  145. def delete(self, resource_id: str):
  146. """删除资源"""
  147. cursor = self._get_cursor()
  148. try:
  149. cursor.execute("DELETE FROM resources WHERE id = %s", (resource_id,))
  150. self.conn.commit()
  151. finally:
  152. cursor.close()
  153. def get_siblings(self, resource_id: str) -> tuple:
  154. """获取同级资源(用于导航)"""
  155. cursor = self._get_cursor()
  156. try:
  157. # 获取父级前缀
  158. parts = resource_id.split("/")
  159. if len(parts) <= 1:
  160. return None, None
  161. parent_prefix = "/".join(parts[:-1])
  162. # 获取前一个
  163. cursor.execute("""
  164. SELECT id, title FROM resources
  165. WHERE id LIKE %s AND id < %s
  166. ORDER BY id DESC LIMIT 1
  167. """, (f"{parent_prefix}/%", resource_id))
  168. prev_row = cursor.fetchone()
  169. # 获取后一个
  170. cursor.execute("""
  171. SELECT id, title FROM resources
  172. WHERE id LIKE %s AND id > %s
  173. ORDER BY id ASC LIMIT 1
  174. """, (f"{parent_prefix}/%", resource_id))
  175. next_row = cursor.fetchone()
  176. return (dict(prev_row) if prev_row else None,
  177. dict(next_row) if next_row else None)
  178. finally:
  179. cursor.close()
  180. def close(self):
  181. """关闭连接"""
  182. if self.conn:
  183. self.conn.close()