#!/usr/bin/env python3 """ PostgreSQL Resources 存储封装 """ import os import json import time import psycopg2 from psycopg2.extras import RealDictCursor from typing import Optional, List, Dict from dotenv import load_dotenv load_dotenv() class PostgreSQLResourceStore: def __init__(self): """初始化 PostgreSQL 连接""" self.conn = psycopg2.connect( host=os.getenv('KNOWHUB_DB'), port=int(os.getenv('KNOWHUB_PORT', 5432)), user=os.getenv('KNOWHUB_USER'), password=os.getenv('KNOWHUB_PASSWORD'), database=os.getenv('KNOWHUB_DB_NAME') ) self.conn.autocommit = False def _get_cursor(self): """获取游标""" return self.conn.cursor(cursor_factory=RealDictCursor) def insert_or_update(self, resource: Dict): """插入或更新资源""" cursor = self._get_cursor() try: now_ts = int(time.time()) cursor.execute(""" INSERT INTO resources (id, title, body, secure_body, content_type, metadata, sort_order, submitted_by, created_at, updated_at) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT (id) DO UPDATE SET title = EXCLUDED.title, body = EXCLUDED.body, secure_body = EXCLUDED.secure_body, content_type = EXCLUDED.content_type, metadata = EXCLUDED.metadata, sort_order = EXCLUDED.sort_order, updated_at = EXCLUDED.updated_at """, ( resource['id'], resource['title'], resource['body'], resource.get('secure_body', ''), resource['content_type'], json.dumps(resource.get('metadata', {})), resource.get('sort_order', 0), resource.get('submitted_by', ''), resource.get('created_at', now_ts), now_ts )) self.conn.commit() finally: cursor.close() def get_by_id(self, resource_id: str) -> Optional[Dict]: """根据ID获取资源""" cursor = self._get_cursor() try: cursor.execute(""" SELECT id, title, body, secure_body, content_type, metadata, sort_order, created_at, updated_at FROM resources WHERE id = %s """, (resource_id,)) row = cursor.fetchone() if not row: return None result = dict(row) if result.get('metadata'): result['metadata'] = json.loads(result['metadata']) if isinstance(result['metadata'], str) else result['metadata'] return result finally: cursor.close() def list_resources(self, prefix: Optional[str] = None, content_type: Optional[str] = None, limit: int = 100, offset: int = 0) -> List[Dict]: """列出资源""" cursor = self._get_cursor() try: conditions = [] params = [] if prefix: conditions.append("id LIKE %s") params.append(f"{prefix}%") if content_type: conditions.append("content_type = %s") params.append(content_type) where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else "" sql = f""" SELECT id, title, content_type, metadata, created_at, updated_at FROM resources {where_clause} ORDER BY sort_order, id LIMIT %s OFFSET %s """ params.extend([limit, offset]) cursor.execute(sql, params) results = cursor.fetchall() return [dict(r) for r in results] finally: cursor.close() def update(self, resource_id: str, updates: Dict): """更新资源""" cursor = self._get_cursor() try: set_parts = [] params = [] for key, value in updates.items(): if key == 'metadata': set_parts.append(f"{key} = %s") params.append(json.dumps(value)) else: set_parts.append(f"{key} = %s") params.append(value) set_parts.append("updated_at = %s") params.append(int(time.time())) params.append(resource_id) sql = f"UPDATE resources SET {', '.join(set_parts)} WHERE id = %s" cursor.execute(sql, params) self.conn.commit() finally: cursor.close() def delete(self, resource_id: str): """删除资源""" cursor = self._get_cursor() try: cursor.execute("DELETE FROM resources WHERE id = %s", (resource_id,)) self.conn.commit() finally: cursor.close() def get_siblings(self, resource_id: str) -> tuple: """获取同级资源(用于导航)""" cursor = self._get_cursor() try: # 获取父级前缀 parts = resource_id.split("/") if len(parts) <= 1: return None, None parent_prefix = "/".join(parts[:-1]) # 获取前一个 cursor.execute(""" SELECT id, title FROM resources WHERE id LIKE %s AND id < %s ORDER BY id DESC LIMIT 1 """, (f"{parent_prefix}/%", resource_id)) prev_row = cursor.fetchone() # 获取后一个 cursor.execute(""" SELECT id, title FROM resources WHERE id LIKE %s AND id > %s ORDER BY id ASC LIMIT 1 """, (f"{parent_prefix}/%", resource_id)) next_row = cursor.fetchone() return (dict(prev_row) if prev_row else None, dict(next_row) if next_row else None) finally: cursor.close() def close(self): """关闭连接""" if self.conn: self.conn.close()