pg_resource_store.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. #!/usr/bin/env python3
  2. """
  3. PostgreSQL Resources 存储封装
  4. """
  5. import os
  6. import json
  7. import time
  8. import psycopg2
  9. from psycopg2.extras import RealDictCursor
  10. from typing import Optional, List, Dict
  11. from dotenv import load_dotenv
  12. load_dotenv()
  13. class PostgreSQLResourceStore:
  14. def __init__(self):
  15. """初始化 PostgreSQL 连接"""
  16. self.conn = psycopg2.connect(
  17. host=os.getenv('KNOWHUB_DB'),
  18. port=int(os.getenv('KNOWHUB_PORT', 5432)),
  19. user=os.getenv('KNOWHUB_USER'),
  20. password=os.getenv('KNOWHUB_PASSWORD'),
  21. database=os.getenv('KNOWHUB_DB_NAME')
  22. )
  23. self.conn.autocommit = False
  24. def _get_cursor(self):
  25. """获取游标"""
  26. return self.conn.cursor(cursor_factory=RealDictCursor)
  27. def insert_or_update(self, resource: Dict):
  28. """插入或更新资源"""
  29. cursor = self._get_cursor()
  30. try:
  31. now_ts = int(time.time())
  32. cursor.execute("""
  33. INSERT INTO resources (id, title, body, secure_body, content_type,
  34. metadata, sort_order, submitted_by, created_at, updated_at)
  35. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
  36. ON CONFLICT (id) DO UPDATE SET
  37. title = EXCLUDED.title,
  38. body = EXCLUDED.body,
  39. secure_body = EXCLUDED.secure_body,
  40. content_type = EXCLUDED.content_type,
  41. metadata = EXCLUDED.metadata,
  42. sort_order = EXCLUDED.sort_order,
  43. updated_at = EXCLUDED.updated_at
  44. """, (
  45. resource['id'],
  46. resource['title'],
  47. resource['body'],
  48. resource.get('secure_body', ''),
  49. resource['content_type'],
  50. json.dumps(resource.get('metadata', {})),
  51. resource.get('sort_order', 0),
  52. resource.get('submitted_by', ''),
  53. resource.get('created_at', now_ts),
  54. now_ts
  55. ))
  56. self.conn.commit()
  57. finally:
  58. cursor.close()
  59. def get_by_id(self, resource_id: str) -> Optional[Dict]:
  60. """根据ID获取资源"""
  61. cursor = self._get_cursor()
  62. try:
  63. cursor.execute("""
  64. SELECT id, title, body, secure_body, content_type, metadata, sort_order,
  65. created_at, updated_at
  66. FROM resources WHERE id = %s
  67. """, (resource_id,))
  68. row = cursor.fetchone()
  69. if not row:
  70. return None
  71. result = dict(row)
  72. if result.get('metadata'):
  73. result['metadata'] = json.loads(result['metadata']) if isinstance(result['metadata'], str) else result['metadata']
  74. return result
  75. finally:
  76. cursor.close()
  77. def list_resources(self, prefix: Optional[str] = None, content_type: Optional[str] = None,
  78. limit: int = 100, offset: int = 0) -> List[Dict]:
  79. """列出资源"""
  80. cursor = self._get_cursor()
  81. try:
  82. conditions = []
  83. params = []
  84. if prefix:
  85. conditions.append("id LIKE %s")
  86. params.append(f"{prefix}%")
  87. if content_type:
  88. conditions.append("content_type = %s")
  89. params.append(content_type)
  90. where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
  91. sql = f"""
  92. SELECT id, title, content_type, metadata, created_at, updated_at
  93. FROM resources
  94. {where_clause}
  95. ORDER BY sort_order, id
  96. LIMIT %s OFFSET %s
  97. """
  98. params.extend([limit, offset])
  99. cursor.execute(sql, params)
  100. results = cursor.fetchall()
  101. return [dict(r) for r in results]
  102. finally:
  103. cursor.close()
  104. def update(self, resource_id: str, updates: Dict):
  105. """更新资源"""
  106. cursor = self._get_cursor()
  107. try:
  108. set_parts = []
  109. params = []
  110. for key, value in updates.items():
  111. if key == 'metadata':
  112. set_parts.append(f"{key} = %s")
  113. params.append(json.dumps(value))
  114. else:
  115. set_parts.append(f"{key} = %s")
  116. params.append(value)
  117. set_parts.append("updated_at = %s")
  118. params.append(int(time.time()))
  119. params.append(resource_id)
  120. sql = f"UPDATE resources SET {', '.join(set_parts)} WHERE id = %s"
  121. cursor.execute(sql, params)
  122. self.conn.commit()
  123. finally:
  124. cursor.close()
  125. def delete(self, resource_id: str):
  126. """删除资源"""
  127. cursor = self._get_cursor()
  128. try:
  129. cursor.execute("DELETE FROM resources WHERE id = %s", (resource_id,))
  130. self.conn.commit()
  131. finally:
  132. cursor.close()
  133. def get_siblings(self, resource_id: str) -> tuple:
  134. """获取同级资源(用于导航)"""
  135. cursor = self._get_cursor()
  136. try:
  137. # 获取父级前缀
  138. parts = resource_id.split("/")
  139. if len(parts) <= 1:
  140. return None, None
  141. parent_prefix = "/".join(parts[:-1])
  142. # 获取前一个
  143. cursor.execute("""
  144. SELECT id, title FROM resources
  145. WHERE id LIKE %s AND id < %s
  146. ORDER BY id DESC LIMIT 1
  147. """, (f"{parent_prefix}/%", resource_id))
  148. prev_row = cursor.fetchone()
  149. # 获取后一个
  150. cursor.execute("""
  151. SELECT id, title FROM resources
  152. WHERE id LIKE %s AND id > %s
  153. ORDER BY id ASC LIMIT 1
  154. """, (f"{parent_prefix}/%", resource_id))
  155. next_row = cursor.fetchone()
  156. return (dict(prev_row) if prev_row else None,
  157. dict(next_row) if next_row else None)
  158. finally:
  159. cursor.close()
  160. def close(self):
  161. """关闭连接"""
  162. if self.conn:
  163. self.conn.close()