"""ODPS client factory.""" from __future__ import annotations from typing import Any from app.core.config import settings from app.hot_content.exceptions import HotContentFlowError def get_odps_client() -> Any: """Create a PyODPS client from environment-backed settings.""" if not settings.odps_access_id.strip(): raise HotContentFlowError("ODPS_ACCESS_ID is not configured") if not settings.odps_access_key.strip(): raise HotContentFlowError("ODPS_ACCESS_KEY is not configured") if not settings.odps_project.strip(): raise HotContentFlowError("ODPS_PROJECT is not configured") if not settings.odps_endpoint.strip(): raise HotContentFlowError("ODPS_ENDPOINT is not configured") from odps import ODPS kwargs: dict[str, Any] = { "access_id": settings.odps_access_id.strip(), "secret_access_key": settings.odps_access_key.strip(), "project": settings.odps_project.strip(), "endpoint": settings.odps_endpoint.strip(), } if settings.odps_tunnel_endpoint.strip(): kwargs["tunnel_endpoint"] = settings.odps_tunnel_endpoint.strip() return ODPS(**kwargs)