| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- """
- Repo Scanner Service
- ====================
- Scans all Gogs repositories accessible to the authenticated user,
- identifies repos that:
- 1. We have admin permissions on
- 2. Contain a `manifest.yaml` on the default branch
- For qualifying repos, it ensures a Data Nexus webhook is configured
- (idempotent — skips repos that already have the webhook).
- """
- import logging
- from dataclasses import dataclass
- from app.config import settings
- from app.services.gogs_client import GogsClient
- logger = logging.getLogger(__name__)
- @dataclass(frozen=True)
- class ScanResult:
- """Lightweight value object summarising one scan run."""
- total_repos: int
- admin_repos: int
- manifest_repos: int
- webhooks_created: int
- webhooks_skipped: int
- errors: int
- class RepoScanner:
- """Orchestrates the repo‑scan → webhook‑setup pipeline."""
- def __init__(self, gogs: GogsClient | None = None):
- self.gogs = gogs or GogsClient()
- self.webhook_url: str = settings.GOGS_WEBHOOK_URL
- self.webhook_secret: str = settings.GOGS_WEBHOOK_SECRET
- # ------------------------------------------------------------------
- # Public API
- # ------------------------------------------------------------------
- async def scan_and_configure(self) -> ScanResult:
- """Run a full scan cycle.
- Steps
- -----
- 1. Fetch all repos visible to the token owner.
- 2. Filter repos where ``permissions.admin == True``.
- 3. For each admin repo, check if ``manifest.yaml`` exists.
- 4. If manifest exists, ensure our webhook is present.
- Returns a :class:`ScanResult` summarising what happened.
- """
- if not self.webhook_url:
- raise ValueError(
- "GOGS_WEBHOOK_URL is not configured. "
- "Please set it in .env before running the scanner."
- )
- all_repos = await self.gogs.list_user_repos()
- admin_repos = self._filter_admin_repos(all_repos)
- logger.info(
- f"Found {len(all_repos)} repos total, "
- f"{len(admin_repos)} with admin permissions"
- )
- manifest_count = 0
- created = 0
- skipped = 0
- errors = 0
- for repo in admin_repos:
- owner = repo["owner"]["username"]
- name = repo["name"]
- default_branch = repo.get("default_branch", "master")
- try:
- has_manifest = await self._has_manifest(owner, name, default_branch)
- if not has_manifest:
- logger.debug(f"[{owner}/{name}] No manifest.yaml — skipping")
- continue
- manifest_count += 1
- logger.info(f"[{owner}/{name}] manifest.yaml found ✔")
- already_configured = await self._webhook_already_exists(owner, name)
- if already_configured:
- logger.info(f"[{owner}/{name}] Webhook already configured — skipping")
- skipped += 1
- continue
- await self._create_webhook(owner, name)
- created += 1
- logger.info(f"[{owner}/{name}] Webhook created ✔")
- except Exception as exc:
- errors += 1
- logger.error(f"[{owner}/{name}] Error: {exc}", exc_info=True)
- result = ScanResult(
- total_repos=len(all_repos),
- admin_repos=len(admin_repos),
- manifest_repos=manifest_count,
- webhooks_created=created,
- webhooks_skipped=skipped,
- errors=errors,
- )
- logger.info(f"Scan complete: {result}")
- return result
- # ------------------------------------------------------------------
- # Internal helpers
- # ------------------------------------------------------------------
- @staticmethod
- def _filter_admin_repos(repos: list[dict]) -> list[dict]:
- """Return repos where the authenticated user has admin permissions."""
- return [
- r for r in repos
- if r.get("permissions", {}).get("admin") is True
- ]
- async def _has_manifest(self, owner: str, repo: str, ref: str) -> bool:
- """Check whether `manifest.yaml` exists in the repo."""
- content = await self.gogs.get_manifest(owner, repo, ref)
- return content is not None
- async def _webhook_already_exists(self, owner: str, repo: str) -> bool:
- """Return True if our webhook URL is already registered on the repo."""
- hooks = await self.gogs.list_repo_webhooks(owner, repo)
- return any(
- hook.get("config", {}).get("url") == self.webhook_url
- for hook in hooks
- )
- async def _create_webhook(self, owner: str, repo: str) -> dict:
- """Create our Data Nexus push webhook on the repo."""
- return await self.gogs.create_repo_webhook(
- owner=owner,
- repo=repo,
- webhook_url=self.webhook_url,
- secret=self.webhook_secret,
- events=["push"],
- )
|