import logging import os from pathlib import Path from typing import Optional try: import alibabacloud_oss_v2 as oss except Exception: # pragma: no cover - import guard for optional dependency oss = None logger = logging.getLogger("oss.upload") class OssUploadError(RuntimeError): pass def _get_env(*keys: str, required: bool = False) -> Optional[str]: for key in keys: value = os.getenv(key) if value: return value if required: raise OssUploadError(f"missing env: {keys[0]}") return None def _build_object_key( html_path: Path, object_key: Optional[str], prefix: Optional[str], task_id: Optional[str], ) -> str: if object_key: key = object_key elif task_id: key = f"{task_id}.html" else: key = html_path.name if prefix: prefix_clean = prefix.strip().strip("/") if prefix_clean: return f"{prefix_clean}/{key.lstrip('/')}" return key.lstrip("/") def upload_html_to_oss( html_path: Path, *, object_key: Optional[str] = None, task_id: Optional[str] = None, ) -> str: """ Upload a generated HTML file to OSS and return a public URL. """ if not html_path.exists(): raise OssUploadError(f"html not found: {html_path}") if html_path.suffix.lower() != ".html": raise OssUploadError(f"invalid html suffix: {html_path}") if oss is None: raise OssUploadError("alibabacloud-oss-v2 not installed") access_key_id = _get_env("ALIYUN_OSS_ACCESS_KEY_ID") access_key_secret = _get_env("ALIYUN_OSS_ACCESS_KEY_SECRET") if not access_key_id: raise OssUploadError("missing env: ALIYUN_OSS_ACCESS_KEY_ID") if not access_key_secret: raise OssUploadError("missing env: ALIYUN_OSS_ACCESS_KEY_SECRET") bucket_name = _get_env("ALIYUN_OSS_BUCKET") if not bucket_name: raise OssUploadError("missing env: ALIYUN_OSS_BUCKET") region = _get_env("ALIYUN_OSS_REGION") if not region: raise OssUploadError("missing env: ALIYUN_OSS_REGION") prefix = _get_env("ALIYUN_OSS_PREFIX") if not prefix: raise OssUploadError("missing env: ALIYUN_OSS_PREFIX") public_base_url = _get_env("ALIYUN_OSS_PUBLIC_BASE_URL") if not public_base_url: raise OssUploadError("missing env: ALIYUN_OSS_PUBLIC_BASE_URL") key = _build_object_key(html_path, object_key, prefix, task_id) logger.info( "oss upload start", extra={ "bucket": bucket_name, "region": region, "key": key, "html_path": str(html_path), }, ) credentials_provider = oss.credentials.StaticCredentialsProvider( access_key_id=access_key_id, access_key_secret=access_key_secret, ) cfg = oss.config.load_default() cfg.credentials_provider = credentials_provider cfg.region = region client = oss.Client(cfg) request = oss.PutObjectRequest(bucket=bucket_name, key=key) result = client.put_object_from_file(request, str(html_path)) if result.status_code >= 300: logger.error( "oss upload failed", extra={ "bucket": bucket_name, "region": region, "key": key, "status_code": result.status_code, }, ) raise OssUploadError(f"oss upload failed: status={result.status_code}") logger.info( "oss upload success", extra={ "bucket": bucket_name, "region": region, "key": key, "status_code": result.status_code, }, ) base = public_base_url.rstrip("/") return f"{base}/{key}"