gogs_client.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. import asyncio
  2. import httpx
  3. from app.config import settings
  4. import logging
  5. from typing import Optional
  6. logger = logging.getLogger(__name__)
  7. # Default timeout for API requests (seconds)
  8. _DEFAULT_TIMEOUT = 30.0
  9. # Maximum concurrent requests to Gogs to avoid 401 rate-limiting
  10. _MAX_CONCURRENT = 3
  11. class GogsClient:
  12. def __init__(self):
  13. self.base_url = settings.GOGS_URL.rstrip('/')
  14. self.token = settings.GOGS_TOKEN
  15. self.headers = {"Authorization": f"token {self.token}"}
  16. self._semaphore = asyncio.Semaphore(_MAX_CONCURRENT)
  17. self._client: httpx.AsyncClient | None = None
  18. # ---- shared client & throttled helpers --------------------------------
  19. def _get_client(self) -> httpx.AsyncClient:
  20. """Lazily create a shared httpx client with connection pooling."""
  21. if self._client is None or self._client.is_closed:
  22. self._client = httpx.AsyncClient(
  23. timeout=_DEFAULT_TIMEOUT,
  24. headers=self.headers,
  25. )
  26. return self._client
  27. async def _get(self, url: str) -> httpx.Response:
  28. """Throttled GET via shared client."""
  29. async with self._semaphore:
  30. return await self._get_client().get(url)
  31. async def _post(self, url: str, **kwargs) -> httpx.Response:
  32. """Throttled POST via shared client."""
  33. async with self._semaphore:
  34. return await self._get_client().post(url, **kwargs)
  35. async def aclose(self):
  36. """Close the underlying HTTP client (connection pool)."""
  37. if self._client and not self._client.is_closed:
  38. await self._client.aclose()
  39. self._client = None
  40. # ------------------------------------------------------------------
  41. # Repository discovery
  42. # ------------------------------------------------------------------
  43. async def list_user_repos(self) -> list[dict]:
  44. """Fetch *all* repositories visible to the authenticated user.
  45. Gogs paginates with `?page=N` (default 20 per page).
  46. We iterate until an empty page is returned.
  47. """
  48. repos: list[dict] = []
  49. url = f"{self.base_url}/api/v1/user/repos"
  50. resp = await self._get(url)
  51. resp.raise_for_status()
  52. batch = resp.json()
  53. repos.extend(batch)
  54. logger.info(f"Fetched {len(repos)} repositories in total")
  55. return repos
  56. # ------------------------------------------------------------------
  57. # Webhook management
  58. # ------------------------------------------------------------------
  59. async def list_repo_webhooks(self, owner: str, repo: str) -> list[dict]:
  60. """List all webhooks configured on a repository."""
  61. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
  62. resp = await self._get(url)
  63. resp.raise_for_status()
  64. return resp.json()
  65. async def create_repo_webhook(
  66. self,
  67. owner: str,
  68. repo: str,
  69. webhook_url: str,
  70. secret: str = "",
  71. events: Optional[list[str]] = None,
  72. ) -> dict:
  73. """Create a push webhook on a repository.
  74. Returns the created webhook payload from Gogs.
  75. """
  76. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
  77. payload = {
  78. "type": "gogs",
  79. "config": {
  80. "url": webhook_url,
  81. "content_type": "json",
  82. "secret": secret,
  83. },
  84. "events": events or ["push"],
  85. "active": True,
  86. }
  87. resp = await self._post(url, json=payload)
  88. resp.raise_for_status()
  89. return resp.json()
  90. # ------------------------------------------------------------------
  91. # Manifest / file operations (existing)
  92. # ------------------------------------------------------------------
  93. async def get_manifest(self, owner: str, repo: str, ref: str) -> str | None:
  94. """Fetch manifest.yaml raw content from a given ref (commit / branch)."""
  95. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{ref}/manifest.yaml"
  96. resp = await self._get(url)
  97. if resp.status_code == 404:
  98. return None
  99. resp.raise_for_status()
  100. return resp.text
  101. @staticmethod
  102. def _candidate_refs(ref: str, fallback_ref: str | None = None) -> list[str]:
  103. """Build ordered unique refs for fallback lookup."""
  104. refs = [ref]
  105. if fallback_ref:
  106. refs.append(fallback_ref)
  107. # Preserve order while deduplicating
  108. return list(dict.fromkeys(refs))
  109. async def get_tree(self, owner: str, repo: str, path: str = "", *, ref: str) -> list:
  110. """Get the file tree of a repository."""
  111. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={ref}"
  112. resp = await self._get(url)
  113. resp.raise_for_status()
  114. return resp.json()
  115. async def get_file_info(
  116. self,
  117. owner: str,
  118. repo: str,
  119. file_path: str,
  120. *,
  121. ref: str,
  122. fallback_ref: str | None = None,
  123. ) -> dict | None:
  124. """Get single file info including SHA.
  125. Returns dict with 'sha', 'size', 'path' or None if not found.
  126. """
  127. refs = self._candidate_refs(ref, fallback_ref)
  128. for i, candidate_ref in enumerate(refs):
  129. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}?ref={candidate_ref}"
  130. try:
  131. resp = await self._get(url)
  132. if resp.status_code == 404:
  133. continue
  134. resp.raise_for_status()
  135. data = resp.json()
  136. # contents API returns file info directly for single file
  137. if isinstance(data, dict) and data.get("type") == "file":
  138. if i > 0:
  139. logger.info(
  140. f"File info fallback hit for {file_path}: "
  141. f"primary ref '{ref}' -> '{candidate_ref}'"
  142. )
  143. return {
  144. "path": file_path,
  145. "sha": data.get("sha"),
  146. "size": data.get("size", 0),
  147. "type": "blob",
  148. "ref": candidate_ref,
  149. }
  150. except httpx.HTTPStatusError as e:
  151. logger.error(
  152. f"Failed to get file info for {file_path} at ref '{candidate_ref}': {e}"
  153. )
  154. return None
  155. async def get_directory_tree(self, owner: str, repo: str, dir_path: str, *, ref: str) -> list:
  156. """Get all files under a specific directory (recursive) using concurrency."""
  157. all_files = []
  158. async def fetch_contents(path: str):
  159. """Recursively fetch directory contents using contents API in parallel."""
  160. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={ref}"
  161. try:
  162. resp = await self._get(url)
  163. if resp.status_code == 404:
  164. logger.warning(f"Directory not found: {path}")
  165. return
  166. resp.raise_for_status()
  167. data = resp.json()
  168. if isinstance(data, list):
  169. tasks = []
  170. for item in data:
  171. if item.get("type") == "file":
  172. all_files.append({
  173. "path": item.get("path"),
  174. "sha": item.get("sha"),
  175. "size": item.get("size", 0),
  176. "type": "blob"
  177. })
  178. elif item.get("type") == "dir":
  179. tasks.append(fetch_contents(item.get("path")))
  180. if tasks:
  181. await asyncio.gather(*tasks)
  182. except Exception as e:
  183. logger.error(f"Failed to get contents for {path}: {e}")
  184. await fetch_contents(dir_path)
  185. return all_files
  186. async def get_file_content(
  187. self,
  188. owner: str,
  189. repo: str,
  190. file_path: str,
  191. *,
  192. ref: str,
  193. fallback_ref: str | None = None,
  194. ) -> bytes:
  195. """Download raw file content."""
  196. # Gogs raw file URL format: /{owner}/{repo}/raw/{ref}/{path}
  197. refs = self._candidate_refs(ref, fallback_ref)
  198. last_resp = None
  199. for i, candidate_ref in enumerate(refs):
  200. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{candidate_ref}/{file_path}"
  201. resp = await self._get(url)
  202. last_resp = resp
  203. if resp.status_code == 404:
  204. continue
  205. resp.raise_for_status()
  206. if i > 0:
  207. logger.info(
  208. f"File content fallback hit for {file_path}: "
  209. f"primary ref '{ref}' -> '{candidate_ref}'"
  210. )
  211. return resp.content
  212. if last_resp is not None:
  213. last_resp.raise_for_status()
  214. raise httpx.HTTPError(f"Failed to download content for {file_path}")
  215. async def main():
  216. gog = GogsClient()
  217. r = await gog.get_file_info(
  218. owner="nieqi",
  219. repo="aigc_how_decode_base_project_0916_1125",
  220. file_path="examples/糯米和Kilala/待解构帖子.json",
  221. ref="what_decode_0104_tanjingyu",)
  222. print(r)
  223. await gog.aclose()
  224. import asyncio
  225. if __name__ == '__main__':
  226. asyncio.run(main())