gogs_client.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import httpx
  2. from app.config import settings
  3. import logging
  4. from typing import Optional
  5. logger = logging.getLogger(__name__)
  6. # Default timeout for API requests (seconds)
  7. _DEFAULT_TIMEOUT = 30.0
  8. class GogsClient:
  9. def __init__(self):
  10. self.base_url = settings.GOGS_URL.rstrip('/')
  11. self.token = settings.GOGS_TOKEN
  12. self.headers = {"Authorization": f"token {self.token}"}
  13. # ------------------------------------------------------------------
  14. # Repository discovery
  15. # ------------------------------------------------------------------
  16. async def list_user_repos(self) -> list[dict]:
  17. """Fetch *all* repositories visible to the authenticated user.
  18. Gogs paginates with `?page=N` (default 20 per page).
  19. We iterate until an empty page is returned.
  20. """
  21. repos: list[dict] = []
  22. page = 1
  23. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  24. while True:
  25. url = f"{self.base_url}/api/v1/user/repos?page={page}&limit=50"
  26. resp = await client.get(url, headers=self.headers)
  27. resp.raise_for_status()
  28. batch = resp.json()
  29. if not batch:
  30. break
  31. repos.extend(batch)
  32. page += 1
  33. logger.info(f"Fetched {len(repos)} repositories in total")
  34. return repos
  35. # ------------------------------------------------------------------
  36. # Webhook management
  37. # ------------------------------------------------------------------
  38. async def list_repo_webhooks(self, owner: str, repo: str) -> list[dict]:
  39. """List all webhooks configured on a repository."""
  40. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
  41. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  42. resp = await client.get(url, headers=self.headers)
  43. resp.raise_for_status()
  44. return resp.json()
  45. async def create_repo_webhook(
  46. self,
  47. owner: str,
  48. repo: str,
  49. webhook_url: str,
  50. secret: str = "",
  51. events: Optional[list[str]] = None,
  52. ) -> dict:
  53. """Create a push webhook on a repository.
  54. Returns the created webhook payload from Gogs.
  55. """
  56. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
  57. payload = {
  58. "type": "gogs",
  59. "config": {
  60. "url": webhook_url,
  61. "content_type": "json",
  62. "secret": secret,
  63. },
  64. "events": events or ["push"],
  65. "active": True,
  66. }
  67. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  68. resp = await client.post(url, headers=self.headers, json=payload)
  69. resp.raise_for_status()
  70. return resp.json()
  71. # ------------------------------------------------------------------
  72. # Manifest / file operations (existing)
  73. # ------------------------------------------------------------------
  74. async def get_manifest(self, owner: str, repo: str, ref: str) -> str | None:
  75. """Fetch manifest.yaml raw content from a given ref (commit / branch)."""
  76. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{ref}/manifest.yaml"
  77. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  78. resp = await client.get(url, headers=self.headers)
  79. if resp.status_code == 404:
  80. return None
  81. resp.raise_for_status()
  82. return resp.text
  83. async def get_tree(self, owner: str, repo: str, commit_id: str, path: str = "") -> list:
  84. """Get the file tree of a repository."""
  85. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={commit_id}"
  86. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  87. resp = await client.get(url, headers=self.headers)
  88. resp.raise_for_status()
  89. return resp.json()
  90. async def get_file_info(self, owner: str, repo: str, commit_id: str, file_path: str) -> dict | None:
  91. """Get single file info including SHA.
  92. Returns dict with 'sha', 'size', 'path' or None if not found.
  93. """
  94. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}?ref={commit_id}"
  95. try:
  96. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  97. resp = await client.get(url, headers=self.headers)
  98. if resp.status_code == 404:
  99. return None
  100. resp.raise_for_status()
  101. data = resp.json()
  102. # contents API returns file info directly for single file
  103. if isinstance(data, dict) and data.get("type") == "file":
  104. return {
  105. "path": file_path,
  106. "sha": data.get("sha"),
  107. "size": data.get("size", 0),
  108. "type": "blob"
  109. }
  110. return None
  111. except httpx.HTTPStatusError as e:
  112. logger.error(f"Failed to get file info for {file_path}: {e}")
  113. return None
  114. async def get_directory_tree(self, owner: str, repo: str, commit_id: str, dir_path: str) -> list:
  115. """Get all files under a specific directory (recursive).
  116. Args:
  117. dir_path: Directory path without trailing slash (e.g., "data/output")
  118. Returns:
  119. List of file info dicts with 'path', 'sha', 'size', 'type'
  120. """
  121. all_files = []
  122. async def fetch_contents(path: str):
  123. """Recursively fetch directory contents using contents API."""
  124. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={commit_id}"
  125. try:
  126. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  127. resp = await client.get(url, headers=self.headers)
  128. if resp.status_code == 404:
  129. logger.warning(f"Directory not found: {path}")
  130. return
  131. resp.raise_for_status()
  132. data = resp.json()
  133. # contents API returns list for directories
  134. if isinstance(data, list):
  135. for item in data:
  136. if item.get("type") == "file":
  137. all_files.append({
  138. "path": item.get("path"),
  139. "sha": item.get("sha"),
  140. "size": item.get("size", 0),
  141. "type": "blob"
  142. })
  143. elif item.get("type") == "dir":
  144. # Recursively fetch subdirectory
  145. await fetch_contents(item.get("path"))
  146. except httpx.HTTPStatusError as e:
  147. logger.error(f"Failed to get contents for {path}: {e}")
  148. await fetch_contents(dir_path)
  149. return all_files
  150. async def get_file_content(self, owner: str, repo: str, commit_id: str, file_path: str) -> bytes:
  151. """Download raw file content."""
  152. # Gogs raw file URL format: /{owner}/{repo}/raw/{ref}/{path}
  153. url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{commit_id}/{file_path}"
  154. async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
  155. resp = await client.get(url, headers=self.headers)
  156. resp.raise_for_status()
  157. return resp.content