import asyncio import httpx from app.config import settings import logging from typing import Optional logger = logging.getLogger(__name__) # Default timeout for API requests (seconds) _DEFAULT_TIMEOUT = 30.0 # Maximum concurrent requests to Gogs to avoid 401 rate-limiting _MAX_CONCURRENT = 3 class GogsClient: def __init__(self): self.base_url = settings.GOGS_URL.rstrip('/') self.token = settings.GOGS_TOKEN self.headers = {"Authorization": f"token {self.token}"} self._semaphore = asyncio.Semaphore(_MAX_CONCURRENT) self._client: httpx.AsyncClient | None = None # ---- shared client & throttled helpers -------------------------------- def _get_client(self) -> httpx.AsyncClient: """Lazily create a shared httpx client with connection pooling.""" if self._client is None or self._client.is_closed: self._client = httpx.AsyncClient( timeout=_DEFAULT_TIMEOUT, headers=self.headers, ) return self._client async def _get(self, url: str) -> httpx.Response: """Throttled GET via shared client.""" async with self._semaphore: return await self._get_client().get(url) async def _post(self, url: str, **kwargs) -> httpx.Response: """Throttled POST via shared client.""" async with self._semaphore: return await self._get_client().post(url, **kwargs) async def aclose(self): """Close the underlying HTTP client (connection pool).""" if self._client and not self._client.is_closed: await self._client.aclose() self._client = None # ------------------------------------------------------------------ # Repository discovery # ------------------------------------------------------------------ async def list_user_repos(self) -> list[dict]: """Fetch *all* repositories visible to the authenticated user. Gogs paginates with `?page=N` (default 20 per page). We iterate until an empty page is returned. """ repos: list[dict] = [] url = f"{self.base_url}/api/v1/user/repos" resp = await self._get(url) resp.raise_for_status() batch = resp.json() repos.extend(batch) logger.info(f"Fetched {len(repos)} repositories in total") return repos # ------------------------------------------------------------------ # Webhook management # ------------------------------------------------------------------ async def list_repo_webhooks(self, owner: str, repo: str) -> list[dict]: """List all webhooks configured on a repository.""" url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks" resp = await self._get(url) resp.raise_for_status() return resp.json() async def create_repo_webhook( self, owner: str, repo: str, webhook_url: str, secret: str = "", events: Optional[list[str]] = None, ) -> dict: """Create a push webhook on a repository. Returns the created webhook payload from Gogs. """ url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks" payload = { "type": "gogs", "config": { "url": webhook_url, "content_type": "json", "secret": secret, }, "events": events or ["push"], "active": True, } resp = await self._post(url, json=payload) resp.raise_for_status() return resp.json() # ------------------------------------------------------------------ # Manifest / file operations (existing) # ------------------------------------------------------------------ async def get_manifest(self, owner: str, repo: str, ref: str) -> str | None: """Fetch manifest.yaml raw content from a given ref (commit / branch).""" url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{ref}/manifest.yaml" resp = await self._get(url) if resp.status_code == 404: return None resp.raise_for_status() return resp.text @staticmethod def _candidate_refs(ref: str, fallback_ref: str | None = None) -> list[str]: """Build ordered unique refs for fallback lookup.""" refs = [ref] if fallback_ref: refs.append(fallback_ref) # Preserve order while deduplicating return list(dict.fromkeys(refs)) async def get_tree(self, owner: str, repo: str, path: str = "", *, ref: str) -> list: """Get the file tree of a repository.""" url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={ref}" resp = await self._get(url) resp.raise_for_status() return resp.json() async def get_file_info( self, owner: str, repo: str, file_path: str, *, ref: str, fallback_ref: str | None = None, ) -> dict | None: """Get single file info including SHA. Returns dict with 'sha', 'size', 'path' or None if not found. """ refs = self._candidate_refs(ref, fallback_ref) for i, candidate_ref in enumerate(refs): url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}?ref={candidate_ref}" try: resp = await self._get(url) if resp.status_code == 404: continue resp.raise_for_status() data = resp.json() # contents API returns file info directly for single file if isinstance(data, dict) and data.get("type") == "file": if i > 0: logger.info( f"File info fallback hit for {file_path}: " f"primary ref '{ref}' -> '{candidate_ref}'" ) return { "path": file_path, "sha": data.get("sha"), "size": data.get("size", 0), "type": "blob", "ref": candidate_ref, } except httpx.HTTPStatusError as e: logger.error( f"Failed to get file info for {file_path} at ref '{candidate_ref}': {e}" ) return None async def get_directory_tree(self, owner: str, repo: str, dir_path: str, *, ref: str) -> list: """Get all files under a specific directory (recursive) using concurrency.""" all_files = [] async def fetch_contents(path: str): """Recursively fetch directory contents using contents API in parallel.""" url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={ref}" try: resp = await self._get(url) if resp.status_code == 404: logger.warning(f"Directory not found: {path}") return resp.raise_for_status() data = resp.json() if isinstance(data, list): tasks = [] for item in data: if item.get("type") == "file": all_files.append({ "path": item.get("path"), "sha": item.get("sha"), "size": item.get("size", 0), "type": "blob" }) elif item.get("type") == "dir": tasks.append(fetch_contents(item.get("path"))) if tasks: await asyncio.gather(*tasks) except Exception as e: logger.error(f"Failed to get contents for {path}: {e}") await fetch_contents(dir_path) return all_files async def get_file_content( self, owner: str, repo: str, file_path: str, *, ref: str, fallback_ref: str | None = None, ) -> bytes: """Download raw file content.""" # Gogs raw file URL format: /{owner}/{repo}/raw/{ref}/{path} refs = self._candidate_refs(ref, fallback_ref) last_resp = None for i, candidate_ref in enumerate(refs): url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{candidate_ref}/{file_path}" resp = await self._get(url) last_resp = resp if resp.status_code == 404: continue resp.raise_for_status() if i > 0: logger.info( f"File content fallback hit for {file_path}: " f"primary ref '{ref}' -> '{candidate_ref}'" ) return resp.content if last_resp is not None: last_resp.raise_for_status() raise httpx.HTTPError(f"Failed to download content for {file_path}") async def main(): gog = GogsClient() r = await gog.get_file_info( owner="nieqi", repo="aigc_how_decode_base_project_0916_1125", file_path="examples/糯米和Kilala/待解构帖子.json", ref="what_decode_0104_tanjingyu",) print(r) await gog.aclose() import asyncio if __name__ == '__main__': asyncio.run(main())