| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- import asyncio
- import httpx
- from app.config import settings
- import logging
- from typing import Optional
- logger = logging.getLogger(__name__)
- # Default timeout for API requests (seconds)
- _DEFAULT_TIMEOUT = 30.0
- # Maximum concurrent requests to Gogs to avoid 401 rate-limiting
- _MAX_CONCURRENT = 3
- class GogsClient:
- def __init__(self):
- self.base_url = settings.GOGS_URL.rstrip('/')
- self.token = settings.GOGS_TOKEN
- self.headers = {"Authorization": f"token {self.token}"}
- self._semaphore = asyncio.Semaphore(_MAX_CONCURRENT)
- self._client: httpx.AsyncClient | None = None
- # ---- shared client & throttled helpers --------------------------------
- def _get_client(self) -> httpx.AsyncClient:
- """Lazily create a shared httpx client with connection pooling."""
- if self._client is None or self._client.is_closed:
- self._client = httpx.AsyncClient(
- timeout=_DEFAULT_TIMEOUT,
- headers=self.headers,
- )
- return self._client
- async def _get(self, url: str) -> httpx.Response:
- """Throttled GET via shared client."""
- async with self._semaphore:
- return await self._get_client().get(url)
- async def _post(self, url: str, **kwargs) -> httpx.Response:
- """Throttled POST via shared client."""
- async with self._semaphore:
- return await self._get_client().post(url, **kwargs)
- async def aclose(self):
- """Close the underlying HTTP client (connection pool)."""
- if self._client and not self._client.is_closed:
- await self._client.aclose()
- self._client = None
- # ------------------------------------------------------------------
- # Repository discovery
- # ------------------------------------------------------------------
- async def list_user_repos(self) -> list[dict]:
- """Fetch *all* repositories visible to the authenticated user.
- Gogs paginates with `?page=N` (default 20 per page).
- We iterate until an empty page is returned.
- """
- repos: list[dict] = []
- url = f"{self.base_url}/api/v1/user/repos"
- resp = await self._get(url)
- resp.raise_for_status()
- batch = resp.json()
- repos.extend(batch)
- logger.info(f"Fetched {len(repos)} repositories in total")
- return repos
- # ------------------------------------------------------------------
- # Webhook management
- # ------------------------------------------------------------------
- async def list_repo_webhooks(self, owner: str, repo: str) -> list[dict]:
- """List all webhooks configured on a repository."""
- url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
- resp = await self._get(url)
- resp.raise_for_status()
- return resp.json()
- async def create_repo_webhook(
- self,
- owner: str,
- repo: str,
- webhook_url: str,
- secret: str = "",
- events: Optional[list[str]] = None,
- ) -> dict:
- """Create a push webhook on a repository.
- Returns the created webhook payload from Gogs.
- """
- url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
- payload = {
- "type": "gogs",
- "config": {
- "url": webhook_url,
- "content_type": "json",
- "secret": secret,
- },
- "events": events or ["push"],
- "active": True,
- }
- resp = await self._post(url, json=payload)
- resp.raise_for_status()
- return resp.json()
- # ------------------------------------------------------------------
- # Manifest / file operations (existing)
- # ------------------------------------------------------------------
- async def get_manifest(self, owner: str, repo: str, ref: str) -> str | None:
- """Fetch manifest.yaml raw content from a given ref (commit / branch)."""
- url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{ref}/manifest.yaml"
- resp = await self._get(url)
- if resp.status_code == 404:
- return None
- resp.raise_for_status()
- return resp.text
- @staticmethod
- def _candidate_refs(ref: str, fallback_ref: str | None = None) -> list[str]:
- """Build ordered unique refs for fallback lookup."""
- refs = [ref]
- if fallback_ref:
- refs.append(fallback_ref)
- # Preserve order while deduplicating
- return list(dict.fromkeys(refs))
- async def get_tree(self, owner: str, repo: str, path: str = "", *, ref: str) -> list:
- """Get the file tree of a repository."""
- url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={ref}"
- resp = await self._get(url)
- resp.raise_for_status()
- return resp.json()
- async def get_file_info(
- self,
- owner: str,
- repo: str,
- file_path: str,
- *,
- ref: str,
- fallback_ref: str | None = None,
- ) -> dict | None:
- """Get single file info including SHA.
- Returns dict with 'sha', 'size', 'path' or None if not found.
- """
- refs = self._candidate_refs(ref, fallback_ref)
- for i, candidate_ref in enumerate(refs):
- url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}?ref={candidate_ref}"
- try:
- resp = await self._get(url)
- if resp.status_code == 404:
- continue
- resp.raise_for_status()
- data = resp.json()
- # contents API returns file info directly for single file
- if isinstance(data, dict) and data.get("type") == "file":
- if i > 0:
- logger.info(
- f"File info fallback hit for {file_path}: "
- f"primary ref '{ref}' -> '{candidate_ref}'"
- )
- return {
- "path": file_path,
- "sha": data.get("sha"),
- "size": data.get("size", 0),
- "type": "blob",
- "ref": candidate_ref,
- }
- except httpx.HTTPStatusError as e:
- logger.error(
- f"Failed to get file info for {file_path} at ref '{candidate_ref}': {e}"
- )
- return None
- async def get_directory_tree(self, owner: str, repo: str, dir_path: str, *, ref: str) -> list:
- """Get all files under a specific directory (recursive) using concurrency."""
- all_files = []
- async def fetch_contents(path: str):
- """Recursively fetch directory contents using contents API in parallel."""
- url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={ref}"
- try:
- resp = await self._get(url)
- if resp.status_code == 404:
- logger.warning(f"Directory not found: {path}")
- return
- resp.raise_for_status()
- data = resp.json()
- if isinstance(data, list):
- tasks = []
- for item in data:
- if item.get("type") == "file":
- all_files.append({
- "path": item.get("path"),
- "sha": item.get("sha"),
- "size": item.get("size", 0),
- "type": "blob"
- })
- elif item.get("type") == "dir":
- tasks.append(fetch_contents(item.get("path")))
- if tasks:
- await asyncio.gather(*tasks)
- except Exception as e:
- logger.error(f"Failed to get contents for {path}: {e}")
- await fetch_contents(dir_path)
- return all_files
- async def get_file_content(
- self,
- owner: str,
- repo: str,
- file_path: str,
- *,
- ref: str,
- fallback_ref: str | None = None,
- ) -> bytes:
- """Download raw file content."""
- # Gogs raw file URL format: /{owner}/{repo}/raw/{ref}/{path}
- refs = self._candidate_refs(ref, fallback_ref)
- last_resp = None
- for i, candidate_ref in enumerate(refs):
- url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{candidate_ref}/{file_path}"
- resp = await self._get(url)
- last_resp = resp
- if resp.status_code == 404:
- continue
- resp.raise_for_status()
- if i > 0:
- logger.info(
- f"File content fallback hit for {file_path}: "
- f"primary ref '{ref}' -> '{candidate_ref}'"
- )
- return resp.content
- if last_resp is not None:
- last_resp.raise_for_status()
- raise httpx.HTTPError(f"Failed to download content for {file_path}")
- async def main():
- gog = GogsClient()
- r = await gog.get_file_info(
- owner="nieqi",
- repo="aigc_how_decode_base_project_0916_1125",
- file_path="examples/糯米和Kilala/待解构帖子.json",
- ref="what_decode_0104_tanjingyu",)
- print(r)
- await gog.aclose()
- import asyncio
- if __name__ == '__main__':
- asyncio.run(main())
|