| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- #!/usr/bin/env python3
- """
- PostgreSQL Resources 存储封装
- """
- import os
- import json
- import time
- import psycopg2
- from psycopg2.extras import RealDictCursor
- from typing import Optional, List, Dict
- from dotenv import load_dotenv
- load_dotenv()
- class PostgreSQLResourceStore:
- def __init__(self):
- """初始化 PostgreSQL 连接"""
- self.conn = psycopg2.connect(
- host=os.getenv('KNOWHUB_DB'),
- port=int(os.getenv('KNOWHUB_PORT', 5432)),
- user=os.getenv('KNOWHUB_USER'),
- password=os.getenv('KNOWHUB_PASSWORD'),
- database=os.getenv('KNOWHUB_DB_NAME')
- )
- self.conn.autocommit = False
- def _get_cursor(self):
- """获取游标"""
- return self.conn.cursor(cursor_factory=RealDictCursor)
- def insert_or_update(self, resource: Dict):
- """插入或更新资源"""
- cursor = self._get_cursor()
- try:
- now_ts = int(time.time())
- cursor.execute("""
- INSERT INTO resources (id, title, body, secure_body, content_type,
- metadata, sort_order, submitted_by, created_at, updated_at)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON CONFLICT (id) DO UPDATE SET
- title = EXCLUDED.title,
- body = EXCLUDED.body,
- secure_body = EXCLUDED.secure_body,
- content_type = EXCLUDED.content_type,
- metadata = EXCLUDED.metadata,
- sort_order = EXCLUDED.sort_order,
- updated_at = EXCLUDED.updated_at
- """, (
- resource['id'],
- resource['title'],
- resource['body'],
- resource.get('secure_body', ''),
- resource['content_type'],
- json.dumps(resource.get('metadata', {})),
- resource.get('sort_order', 0),
- resource.get('submitted_by', ''),
- resource.get('created_at', now_ts),
- now_ts
- ))
- self.conn.commit()
- finally:
- cursor.close()
- def get_by_id(self, resource_id: str) -> Optional[Dict]:
- """根据ID获取资源"""
- cursor = self._get_cursor()
- try:
- cursor.execute("""
- SELECT id, title, body, secure_body, content_type, metadata, sort_order,
- created_at, updated_at
- FROM resources WHERE id = %s
- """, (resource_id,))
- row = cursor.fetchone()
- if not row:
- return None
- result = dict(row)
- if result.get('metadata'):
- result['metadata'] = json.loads(result['metadata']) if isinstance(result['metadata'], str) else result['metadata']
- return result
- finally:
- cursor.close()
- def list_resources(self, prefix: Optional[str] = None, content_type: Optional[str] = None,
- limit: int = 100, offset: int = 0) -> List[Dict]:
- """列出资源"""
- cursor = self._get_cursor()
- try:
- conditions = []
- params = []
- if prefix:
- conditions.append("id LIKE %s")
- params.append(f"{prefix}%")
- if content_type:
- conditions.append("content_type = %s")
- params.append(content_type)
- where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
- sql = f"""
- SELECT id, title, content_type, metadata, created_at, updated_at
- FROM resources
- {where_clause}
- ORDER BY sort_order, id
- LIMIT %s OFFSET %s
- """
- params.extend([limit, offset])
- cursor.execute(sql, params)
- results = cursor.fetchall()
- return [dict(r) for r in results]
- finally:
- cursor.close()
- def update(self, resource_id: str, updates: Dict):
- """更新资源"""
- cursor = self._get_cursor()
- try:
- set_parts = []
- params = []
- for key, value in updates.items():
- if key == 'metadata':
- set_parts.append(f"{key} = %s")
- params.append(json.dumps(value))
- else:
- set_parts.append(f"{key} = %s")
- params.append(value)
- set_parts.append("updated_at = %s")
- params.append(int(time.time()))
- params.append(resource_id)
- sql = f"UPDATE resources SET {', '.join(set_parts)} WHERE id = %s"
- cursor.execute(sql, params)
- self.conn.commit()
- finally:
- cursor.close()
- def delete(self, resource_id: str):
- """删除资源"""
- cursor = self._get_cursor()
- try:
- cursor.execute("DELETE FROM resources WHERE id = %s", (resource_id,))
- self.conn.commit()
- finally:
- cursor.close()
- def get_siblings(self, resource_id: str) -> tuple:
- """获取同级资源(用于导航)"""
- cursor = self._get_cursor()
- try:
- # 获取父级前缀
- parts = resource_id.split("/")
- if len(parts) <= 1:
- return None, None
- parent_prefix = "/".join(parts[:-1])
- # 获取前一个
- cursor.execute("""
- SELECT id, title FROM resources
- WHERE id LIKE %s AND id < %s
- ORDER BY id DESC LIMIT 1
- """, (f"{parent_prefix}/%", resource_id))
- prev_row = cursor.fetchone()
- # 获取后一个
- cursor.execute("""
- SELECT id, title FROM resources
- WHERE id LIKE %s AND id > %s
- ORDER BY id ASC LIMIT 1
- """, (f"{parent_prefix}/%", resource_id))
- next_row = cursor.fetchone()
- return (dict(prev_row) if prev_row else None,
- dict(next_row) if next_row else None)
- finally:
- cursor.close()
- def close(self):
- """关闭连接"""
- if self.conn:
- self.conn.close()
|