import oss2 from app.config import settings import logging logger = logging.getLogger(__name__) class OSSClient: def __init__(self): self.auth = oss2.Auth( settings.OSS_ACCESS_KEY_ID, settings.OSS_ACCESS_KEY_SECRET ) self.bucket = oss2.Bucket( self.auth, settings.OSS_ENDPOINT, settings.OSS_BUCKET_NAME ) self.prefix = settings.OSS_PREFIX.strip("/") self.cdn_url = settings.OSS_CDN_URL.rstrip("/") def _build_key(self, project_name: str, stage: str, commit_id: str, relative_path: str) -> str: """Build OSS object key.""" return f"{self.prefix}/{project_name}/{stage}/{commit_id}/{relative_path}" def get_cdn_url(self, key: str) -> str: """Get CDN URL for the object.""" return f"{self.cdn_url}/{key}" def upload(self, key: str, content: bytes) -> bool: """Upload content to OSS.""" try: self.bucket.put_object(key, content) logger.info(f"Uploaded to OSS: {key}") return True except Exception as e: logger.error(f"Failed to upload to OSS: {e}") raise def download(self, key: str) -> bytes: """Download content from OSS.""" try: result = self.bucket.get_object(key) return result.read() except oss2.exceptions.NoSuchKey: logger.error(f"OSS key not found: {key}") return None except Exception as e: logger.error(f"Failed to download from OSS: {e}") raise def exists(self, key: str) -> bool: """Check if object exists.""" return self.bucket.object_exists(key) # Singleton instance oss_client = OSSClient() if settings.OSS_ACCESS_KEY_ID else None