gogs_client.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. import httpx
  2. from app.config import settings
  3. import logging
  4. from typing import Optional
  5. logger = logging.getLogger(__name__)
  6. # Default timeout for API requests (seconds)
  7. _DEFAULT_TIMEOUT = 30.0
  8. class GogsClient:
  9. def __init__(self):
  10. self.base_url = settings.GOGS_URL.rstrip('/')
  11. self.token = settings.GOGS_TOKEN
  12. self.headers = {"Authorization": f"token {self.token}"}
  13. # ------------------------------------------------------------------
  14. # Repository discovery
  15. # ------------------------------------------------------------------
  16. async def list_user_repos(self) -> list[dict]:
  17. """Fetch *all* repositories visible to the authenticated user.
  18. Gogs paginates with `?page=N` (default 20 per page).
  19. We iterate until an empty page is returned.
  20. """
  21. repos: list[dict] = []
  22. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  23. url = f"{self.base_url}/api/v1/user/repos"
  24. resp = await client.get(url, headers=self.headers)
  25. resp.raise_for_status()
  26. batch = resp.json()
  27. repos.extend(batch)
  28. logger.info(f"Fetched {len(repos)} repositories in total")
  29. return repos
  30. # ------------------------------------------------------------------
  31. # Webhook management
  32. # ------------------------------------------------------------------
  33. async def list_repo_webhooks(self, owner: str, repo: str) -> list[dict]:
  34. """List all webhooks configured on a repository."""
  35. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
  36. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  37. resp = await client.get(url, headers=self.headers)
  38. resp.raise_for_status()
  39. return resp.json()
  40. async def create_repo_webhook(
  41. self,
  42. owner: str,
  43. repo: str,
  44. webhook_url: str,
  45. secret: str = "",
  46. events: Optional[list[str]] = None,
  47. ) -> dict:
  48. """Create a push webhook on a repository.
  49. Returns the created webhook payload from Gogs.
  50. """
  51. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
  52. payload = {
  53. "type": "gogs",
  54. "config": {
  55. "url": webhook_url,
  56. "content_type": "json",
  57. "secret": secret,
  58. },
  59. "events": events or ["push"],
  60. "active": True,
  61. }
  62. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  63. resp = await client.post(url, headers=self.headers, json=payload)
  64. resp.raise_for_status()
  65. return resp.json()
  66. # ------------------------------------------------------------------
  67. # Manifest / file operations (existing)
  68. # ------------------------------------------------------------------
  69. async def get_manifest(self, owner: str, repo: str, ref: str) -> str | None:
  70. """Fetch manifest.yaml raw content from a given ref (commit / branch)."""
  71. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{ref}/manifest.yaml"
  72. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  73. resp = await client.get(url, headers=self.headers)
  74. if resp.status_code == 404:
  75. return None
  76. resp.raise_for_status()
  77. return resp.text
  78. @staticmethod
  79. def _candidate_refs(ref: str, fallback_ref: str | None = None) -> list[str]:
  80. """Build ordered unique refs for fallback lookup."""
  81. refs = [ref]
  82. if fallback_ref:
  83. refs.append(fallback_ref)
  84. # Preserve order while deduplicating
  85. return list(dict.fromkeys(refs))
  86. async def get_tree(self, owner: str, repo: str, path: str = "", *, ref: str) -> list:
  87. """Get the file tree of a repository."""
  88. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={ref}"
  89. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  90. resp = await client.get(url, headers=self.headers)
  91. resp.raise_for_status()
  92. return resp.json()
  93. async def get_file_info(
  94. self,
  95. owner: str,
  96. repo: str,
  97. file_path: str,
  98. *,
  99. ref: str,
  100. fallback_ref: str | None = None,
  101. ) -> dict | None:
  102. """Get single file info including SHA.
  103. Returns dict with 'sha', 'size', 'path' or None if not found.
  104. """
  105. refs = self._candidate_refs(ref, fallback_ref)
  106. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  107. for i, candidate_ref in enumerate(refs):
  108. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}?ref={candidate_ref}"
  109. try:
  110. resp = await client.get(url, headers=self.headers)
  111. if resp.status_code == 404:
  112. continue
  113. resp.raise_for_status()
  114. data = resp.json()
  115. # contents API returns file info directly for single file
  116. if isinstance(data, dict) and data.get("type") == "file":
  117. if i > 0:
  118. logger.info(
  119. f"File info fallback hit for {file_path}: "
  120. f"primary ref '{ref}' -> '{candidate_ref}'"
  121. )
  122. return {
  123. "path": file_path,
  124. "sha": data.get("sha"),
  125. "size": data.get("size", 0),
  126. "type": "blob",
  127. "ref": candidate_ref,
  128. }
  129. except httpx.HTTPStatusError as e:
  130. logger.error(
  131. f"Failed to get file info for {file_path} at ref '{candidate_ref}': {e}"
  132. )
  133. return None
  134. async def get_directory_tree(self, owner: str, repo: str, dir_path: str, *, ref: str) -> list:
  135. """Get all files under a specific directory (recursive) using concurrency."""
  136. import asyncio
  137. all_files = []
  138. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT, headers=self.headers) as client:
  139. async def fetch_contents(path: str):
  140. """Recursively fetch directory contents using contents API in parallel."""
  141. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={ref}"
  142. try:
  143. resp = await client.get(url)
  144. if resp.status_code == 404:
  145. logger.warning(f"Directory not found: {path}")
  146. return
  147. resp.raise_for_status()
  148. data = resp.json()
  149. if isinstance(data, list):
  150. tasks = []
  151. for item in data:
  152. if item.get("type") == "file":
  153. all_files.append({
  154. "path": item.get("path"),
  155. "sha": item.get("sha"),
  156. "size": item.get("size", 0),
  157. "type": "blob"
  158. })
  159. elif item.get("type") == "dir":
  160. tasks.append(fetch_contents(item.get("path")))
  161. if tasks:
  162. await asyncio.gather(*tasks)
  163. except Exception as e:
  164. logger.error(f"Failed to get contents for {path}: {e}")
  165. await fetch_contents(dir_path)
  166. return all_files
  167. async def get_file_content(
  168. self,
  169. owner: str,
  170. repo: str,
  171. file_path: str,
  172. *,
  173. ref: str,
  174. fallback_ref: str | None = None,
  175. ) -> bytes:
  176. """Download raw file content."""
  177. # Gogs raw file URL format: /{owner}/{repo}/raw/{ref}/{path}
  178. refs = self._candidate_refs(ref, fallback_ref)
  179. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  180. last_resp = None
  181. for i, candidate_ref in enumerate(refs):
  182. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{candidate_ref}/{file_path}"
  183. resp = await client.get(url, headers=self.headers)
  184. last_resp = resp
  185. if resp.status_code == 404:
  186. continue
  187. resp.raise_for_status()
  188. if i > 0:
  189. logger.info(
  190. f"File content fallback hit for {file_path}: "
  191. f"primary ref '{ref}' -> '{candidate_ref}'"
  192. )
  193. return resp.content
  194. if last_resp is not None:
  195. last_resp.raise_for_status()
  196. raise httpx.HTTPError(f"Failed to download content for {file_path}")