| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- #!/usr/bin/env python3
- """
- PostgreSQL Resources 存储封装
- """
- import os
- import json
- import time
- import psycopg2
- from psycopg2.extras import RealDictCursor
- from typing import Optional, List, Dict
- from dotenv import load_dotenv
- from knowhub.knowhub_db.cascade import cascade_delete
- load_dotenv()
- class PostgreSQLResourceStore:
- def __init__(self):
- """初始化 PostgreSQL 连接"""
- self.conn = psycopg2.connect(
- host=os.getenv('KNOWHUB_DB'),
- port=int(os.getenv('KNOWHUB_PORT', 5432)),
- user=os.getenv('KNOWHUB_USER'),
- password=os.getenv('KNOWHUB_PASSWORD'),
- database=os.getenv('KNOWHUB_DB_NAME')
- )
- self.conn.autocommit = True
- def _reconnect(self):
- self.conn = psycopg2.connect(
- host=os.getenv('KNOWHUB_DB'),
- port=int(os.getenv('KNOWHUB_PORT', 5432)),
- user=os.getenv('KNOWHUB_USER'),
- password=os.getenv('KNOWHUB_PASSWORD'),
- database=os.getenv('KNOWHUB_DB_NAME')
- )
- self.conn.autocommit = True
- def _ensure_connection(self):
- if self.conn.closed != 0:
- self._reconnect()
- else:
- try:
- c = self.conn.cursor()
- c.execute("SELECT 1")
- c.close()
- except (psycopg2.OperationalError, psycopg2.InterfaceError):
- self._reconnect()
- def _get_cursor(self):
- """获取游标"""
- self._ensure_connection()
- return self.conn.cursor(cursor_factory=RealDictCursor)
- def insert_or_update(self, resource: Dict):
- """插入或更新资源。
- 注:AnalyticDB beam 表不支持 ON CONFLICT DO UPDATE 当含 ALTER 新增列,改用 DELETE+INSERT。
- junction 不受影响(不带 FK,DELETE 仅删实体行)。
- """
- cursor = self._get_cursor()
- try:
- now_ts = int(time.time())
- cursor.execute("DELETE FROM resource WHERE id = %s", (resource['id'],))
- cursor.execute("""
- INSERT INTO resource (id, title, body, secure_body, content_type,
- metadata, sort_order, submitted_by, created_at, updated_at,
- version, images)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- """, (
- resource['id'],
- resource['title'],
- resource['body'],
- resource.get('secure_body', ''),
- resource['content_type'],
- json.dumps(resource.get('metadata', {})),
- resource.get('sort_order', 0),
- resource.get('submitted_by', ''),
- resource.get('created_at', now_ts),
- now_ts,
- resource.get('version', 'v0'),
- json.dumps(resource.get('images', [])),
- ))
- self.conn.commit()
- finally:
- cursor.close()
- def get_by_id(self, resource_id: str) -> Optional[Dict]:
- """根据ID获取资源"""
- from knowhub.knowhub_db.version_context import version_where
- cursor = self._get_cursor()
- try:
- vf, vp = version_where()
- cursor.execute(f"""
- SELECT id, title, body, secure_body, content_type, metadata, sort_order,
- created_at, updated_at, version, images
- FROM resource WHERE id = %s AND {vf}
- """, (resource_id, *vp))
- row = cursor.fetchone()
- if not row:
- return None
- result = dict(row)
- if result.get('metadata') and isinstance(result['metadata'], str):
- result['metadata'] = json.loads(result['metadata'])
- if result.get('images') and isinstance(result['images'], str):
- result['images'] = json.loads(result['images'])
- elif result.get('images') is None:
- result['images'] = []
- return result
- finally:
- cursor.close()
- def list_resources(self, prefix: Optional[str] = None, content_type: Optional[str] = None,
- version: Optional[str] = None,
- limit: int = 100, offset: int = 0) -> List[Dict]:
- """列出资源。version 显式传入时用传入值;未传时使用当前 contextvar 版本。"""
- from knowhub.knowhub_db.version_context import get_version
- cursor = self._get_cursor()
- try:
- conditions = []
- params = []
- if prefix:
- conditions.append("id LIKE %s")
- params.append(f"{prefix}%")
- if content_type:
- conditions.append("content_type = %s")
- params.append(content_type)
- effective_version = version if version is not None else get_version()
- conditions.append("version = %s")
- params.append(effective_version)
- where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else ""
- sql = f"""
- SELECT id, title, content_type, metadata, images, version, created_at, updated_at
- FROM resource
- {where_clause}
- ORDER BY sort_order, id
- LIMIT %s OFFSET %s
- """
- params.extend([limit, offset])
- cursor.execute(sql, params)
- results = cursor.fetchall()
- out = []
- for r in results:
- d = dict(r)
- if d.get('images') and isinstance(d['images'], str):
- d['images'] = json.loads(d['images'])
- elif d.get('images') is None:
- d['images'] = []
- if d.get('metadata') and isinstance(d['metadata'], str):
- d['metadata'] = json.loads(d['metadata'])
- out.append(d)
- return out
- finally:
- cursor.close()
- def update(self, resource_id: str, updates: Dict):
- """更新资源"""
- cursor = self._get_cursor()
- try:
- set_parts = []
- params = []
- for key, value in updates.items():
- if key == 'metadata':
- set_parts.append(f"{key} = %s")
- params.append(json.dumps(value))
- else:
- set_parts.append(f"{key} = %s")
- params.append(value)
- set_parts.append("updated_at = %s")
- params.append(int(time.time()))
- params.append(resource_id)
- sql = f"UPDATE resource SET {', '.join(set_parts)} WHERE id = %s"
- cursor.execute(sql, params)
- self.conn.commit()
- finally:
- cursor.close()
- def delete(self, resource_id: str):
- """删除资源及其关联表记录"""
- cursor = self._get_cursor()
- try:
- cascade_delete(cursor, 'resource', resource_id)
- self.conn.commit()
- finally:
- cursor.close()
- def get_siblings(self, resource_id: str) -> tuple:
- """获取同级资源(用于导航)"""
- cursor = self._get_cursor()
- try:
- # 获取父级前缀
- parts = resource_id.split("/")
- if len(parts) <= 1:
- return None, None
- parent_prefix = "/".join(parts[:-1])
- # 获取前一个
- cursor.execute("""
- SELECT id, title FROM resource
- WHERE id LIKE %s AND id < %s
- ORDER BY id DESC LIMIT 1
- """, (f"{parent_prefix}/%", resource_id))
- prev_row = cursor.fetchone()
- # 获取后一个
- cursor.execute("""
- SELECT id, title FROM resource
- WHERE id LIKE %s AND id > %s
- ORDER BY id ASC LIMIT 1
- """, (f"{parent_prefix}/%", resource_id))
- next_row = cursor.fetchone()
- return (dict(prev_row) if prev_row else None,
- dict(next_row) if next_row else None)
- finally:
- cursor.close()
- def close(self):
- """关闭连接"""
- if self.conn:
- self.conn.close()
|