repo_scanner.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. """
  2. Repo Scanner Service
  3. ====================
  4. Scans all Gogs repositories accessible to the authenticated user,
  5. identifies repos that:
  6. 1. We have admin permissions on
  7. 2. Contain a `manifest.yaml` on the default branch
  8. For qualifying repos, it ensures a Data Nexus webhook is configured
  9. (idempotent — skips repos that already have the webhook).
  10. """
  11. import logging
  12. from dataclasses import dataclass
  13. from app.config import settings
  14. from app.services.gogs_client import GogsClient
  15. logger = logging.getLogger(__name__)
  16. @dataclass(frozen=True)
  17. class ScanResult:
  18. """Lightweight value object summarising one scan run."""
  19. total_repos: int
  20. admin_repos: int
  21. manifest_repos: int
  22. webhooks_created: int
  23. webhooks_skipped: int
  24. errors: int
  25. class RepoScanner:
  26. """Orchestrates the repo‑scan → webhook‑setup pipeline."""
  27. def __init__(self, gogs: GogsClient | None = None):
  28. self.gogs = gogs or GogsClient()
  29. self.webhook_url: str = settings.GOGS_WEBHOOK_URL
  30. self.webhook_secret: str = settings.GOGS_WEBHOOK_SECRET
  31. # ------------------------------------------------------------------
  32. # Public API
  33. # ------------------------------------------------------------------
  34. async def scan_and_configure(self) -> ScanResult:
  35. """Run a full scan cycle.
  36. Steps
  37. -----
  38. 1. Fetch all repos visible to the token owner.
  39. 2. Filter repos where ``permissions.admin == True``.
  40. 3. For each admin repo, check if ``manifest.yaml`` exists.
  41. 4. If manifest exists, ensure our webhook is present.
  42. Returns a :class:`ScanResult` summarising what happened.
  43. """
  44. if not self.webhook_url:
  45. raise ValueError(
  46. "GOGS_WEBHOOK_URL is not configured. "
  47. "Please set it in .env before running the scanner."
  48. )
  49. all_repos = await self.gogs.list_user_repos()
  50. admin_repos = self._filter_admin_repos(all_repos)
  51. logger.info(
  52. f"Found {len(all_repos)} repos total, "
  53. f"{len(admin_repos)} with admin permissions"
  54. )
  55. manifest_count = 0
  56. created = 0
  57. skipped = 0
  58. errors = 0
  59. for repo in admin_repos:
  60. owner = repo["owner"]["username"]
  61. name = repo["name"]
  62. default_branch = repo.get("default_branch", "master")
  63. try:
  64. has_manifest = await self._has_manifest(owner, name, default_branch)
  65. if not has_manifest:
  66. logger.debug(f"[{owner}/{name}] No manifest.yaml — skipping")
  67. continue
  68. manifest_count += 1
  69. logger.info(f"[{owner}/{name}] manifest.yaml found ✔")
  70. already_configured = await self._webhook_already_exists(owner, name)
  71. if already_configured:
  72. logger.info(f"[{owner}/{name}] Webhook already configured — skipping")
  73. skipped += 1
  74. continue
  75. await self._create_webhook(owner, name)
  76. created += 1
  77. logger.info(f"[{owner}/{name}] Webhook created ✔")
  78. except Exception as exc:
  79. errors += 1
  80. logger.error(f"[{owner}/{name}] Error: {exc}", exc_info=True)
  81. result = ScanResult(
  82. total_repos=len(all_repos),
  83. admin_repos=len(admin_repos),
  84. manifest_repos=manifest_count,
  85. webhooks_created=created,
  86. webhooks_skipped=skipped,
  87. errors=errors,
  88. )
  89. logger.info(f"Scan complete: {result}")
  90. return result
  91. # ------------------------------------------------------------------
  92. # Internal helpers
  93. # ------------------------------------------------------------------
  94. @staticmethod
  95. def _filter_admin_repos(repos: list[dict]) -> list[dict]:
  96. """Return repos where the authenticated user has admin permissions."""
  97. return [
  98. r for r in repos
  99. if r.get("permissions", {}).get("admin") is True
  100. ]
  101. async def _has_manifest(self, owner: str, repo: str, ref: str) -> bool:
  102. """Check whether `manifest.yaml` exists in the repo."""
  103. content = await self.gogs.get_manifest(owner, repo, ref)
  104. return content is not None
  105. async def _webhook_already_exists(self, owner: str, repo: str) -> bool:
  106. """Return True if our webhook URL is already registered on the repo."""
  107. hooks = await self.gogs.list_repo_webhooks(owner, repo)
  108. return any(
  109. hook.get("config", {}).get("url") == self.webhook_url
  110. for hook in hooks
  111. )
  112. async def _create_webhook(self, owner: str, repo: str) -> dict:
  113. """Create our Data Nexus push webhook on the repo."""
  114. return await self.gogs.create_repo_webhook(
  115. owner=owner,
  116. repo=repo,
  117. webhook_url=self.webhook_url,
  118. secret=self.webhook_secret,
  119. events=["push"],
  120. )