gogs_client.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import httpx
  2. from app.config import settings
  3. import logging
  4. from typing import Optional
  5. logger = logging.getLogger(__name__)
  6. # Default timeout for API requests (seconds)
  7. _DEFAULT_TIMEOUT = 30.0
  8. class GogsClient:
  9. def __init__(self):
  10. self.base_url = settings.GOGS_URL.rstrip('/')
  11. self.token = settings.GOGS_TOKEN
  12. self.headers = {"Authorization": f"token {self.token}"}
  13. # ------------------------------------------------------------------
  14. # Repository discovery
  15. # ------------------------------------------------------------------
  16. async def list_user_repos(self) -> list[dict]:
  17. """Fetch *all* repositories visible to the authenticated user.
  18. Gogs paginates with `?page=N` (default 20 per page).
  19. We iterate until an empty page is returned.
  20. """
  21. repos: list[dict] = []
  22. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  23. url = f"{self.base_url}/api/v1/user/repos"
  24. resp = await client.get(url, headers=self.headers)
  25. resp.raise_for_status()
  26. batch = resp.json()
  27. repos.extend(batch)
  28. logger.info(f"Fetched {len(repos)} repositories in total")
  29. return repos
  30. # ------------------------------------------------------------------
  31. # Webhook management
  32. # ------------------------------------------------------------------
  33. async def list_repo_webhooks(self, owner: str, repo: str) -> list[dict]:
  34. """List all webhooks configured on a repository."""
  35. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
  36. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  37. resp = await client.get(url, headers=self.headers)
  38. resp.raise_for_status()
  39. return resp.json()
  40. async def create_repo_webhook(
  41. self,
  42. owner: str,
  43. repo: str,
  44. webhook_url: str,
  45. secret: str = "",
  46. events: Optional[list[str]] = None,
  47. ) -> dict:
  48. """Create a push webhook on a repository.
  49. Returns the created webhook payload from Gogs.
  50. """
  51. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
  52. payload = {
  53. "type": "gogs",
  54. "config": {
  55. "url": webhook_url,
  56. "content_type": "json",
  57. "secret": secret,
  58. },
  59. "events": events or ["push"],
  60. "active": True,
  61. }
  62. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  63. resp = await client.post(url, headers=self.headers, json=payload)
  64. resp.raise_for_status()
  65. return resp.json()
  66. # ------------------------------------------------------------------
  67. # Manifest / file operations (existing)
  68. # ------------------------------------------------------------------
  69. async def get_manifest(self, owner: str, repo: str, ref: str) -> str | None:
  70. """Fetch manifest.yaml raw content from a given ref (commit / branch)."""
  71. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{ref}/manifest.yaml"
  72. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  73. resp = await client.get(url, headers=self.headers)
  74. if resp.status_code == 404:
  75. return None
  76. resp.raise_for_status()
  77. return resp.text
  78. async def get_tree(self, owner: str, repo: str, commit_id: str, path: str = "") -> list:
  79. """Get the file tree of a repository."""
  80. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={commit_id}"
  81. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  82. resp = await client.get(url, headers=self.headers)
  83. resp.raise_for_status()
  84. return resp.json()
  85. async def get_file_info(self, owner: str, repo: str, commit_id: str, file_path: str) -> dict | None:
  86. """Get single file info including SHA.
  87. Returns dict with 'sha', 'size', 'path' or None if not found.
  88. """
  89. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}?ref={commit_id}"
  90. try:
  91. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  92. resp = await client.get(url, headers=self.headers)
  93. if resp.status_code == 404:
  94. return None
  95. resp.raise_for_status()
  96. data = resp.json()
  97. # contents API returns file info directly for single file
  98. if isinstance(data, dict) and data.get("type") == "file":
  99. return {
  100. "path": file_path,
  101. "sha": data.get("sha"),
  102. "size": data.get("size", 0),
  103. "type": "blob"
  104. }
  105. return None
  106. except httpx.HTTPStatusError as e:
  107. logger.error(f"Failed to get file info for {file_path}: {e}")
  108. return None
  109. async def get_directory_tree(self, owner: str, repo: str, commit_id: str, dir_path: str) -> list:
  110. """Get all files under a specific directory (recursive) using concurrency."""
  111. import asyncio
  112. all_files = []
  113. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT, headers=self.headers) as client:
  114. async def fetch_contents(path: str):
  115. """Recursively fetch directory contents using contents API in parallel."""
  116. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={commit_id}"
  117. try:
  118. resp = await client.get(url)
  119. if resp.status_code == 404:
  120. logger.warning(f"Directory not found: {path}")
  121. return
  122. resp.raise_for_status()
  123. data = resp.json()
  124. if isinstance(data, list):
  125. tasks = []
  126. for item in data:
  127. if item.get("type") == "file":
  128. all_files.append({
  129. "path": item.get("path"),
  130. "sha": item.get("sha"),
  131. "size": item.get("size", 0),
  132. "type": "blob"
  133. })
  134. elif item.get("type") == "dir":
  135. tasks.append(fetch_contents(item.get("path")))
  136. if tasks:
  137. await asyncio.gather(*tasks)
  138. except Exception as e:
  139. logger.error(f"Failed to get contents for {path}: {e}")
  140. await fetch_contents(dir_path)
  141. return all_files
  142. async def get_file_content(self, owner: str, repo: str, commit_id: str, file_path: str) -> bytes:
  143. """Download raw file content."""
  144. # Gogs raw file URL format: /{owner}/{repo}/raw/{ref}/{path}
  145. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{commit_id}/{file_path}"
  146. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  147. resp = await client.get(url, headers=self.headers)
  148. resp.raise_for_status()
  149. return resp.content