| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- """
- Scheduled Task — Repository Scanner
- ====================================
- Periodically scans all admin-accessible Gogs repositories,
- detects repos with a ``manifest.yaml``, and auto-configures
- the Data Nexus webhook so data pushes are captured automatically.
- Usage
- -----
- Run directly::
- python -m app.tasks.scan_repos # single run
- python -m app.tasks.scan_repos --loop # loop with interval
- Or import and call programmatically::
- from app.tasks.scan_repos import run_once
- await run_once()
- """
- import asyncio
- import argparse
- import logging
- import sys
- from app.services.repo_scanner import RepoScanner
- # ── Logging ──────────────────────────────────────────────────────────
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s %(levelname)-8s %(name)s %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S",
- )
- logger = logging.getLogger("scan_repos")
- # ── Default scan interval (seconds) ─────────────────────────────────
- DEFAULT_INTERVAL_SECONDS = 60 * 60 # 1 hour
- # ── Core routines ───────────────────────────────────────────────────
- async def run_once() -> None:
- """Execute a single scan‑and‑configure cycle."""
- scanner = RepoScanner()
- result = await scanner.scan_and_configure()
- logger.info("=" * 60)
- logger.info(" Scan Summary")
- logger.info("-" * 60)
- logger.info(f" Total repos discovered : {result.total_repos}")
- logger.info(f" Admin repos : {result.admin_repos}")
- logger.info(f" With manifest.yaml : {result.manifest_repos}")
- logger.info(f" Webhooks created : {result.webhooks_created}")
- logger.info(f" Webhooks skipped (dup) : {result.webhooks_skipped}")
- logger.info(f" Errors : {result.errors}")
- logger.info("=" * 60)
- async def run_loop(interval: int = DEFAULT_INTERVAL_SECONDS) -> None:
- """Run the scan repeatedly with a fixed delay between cycles."""
- logger.info(f"Starting scan loop (interval={interval}s)")
- while True:
- try:
- await run_once()
- except Exception as exc:
- logger.error(f"Scan cycle failed: {exc}", exc_info=True)
- logger.info(f"Next scan in {interval} seconds …")
- await asyncio.sleep(interval)
- # ── CLI entry‑point ─────────────────────────────────────────────────
- def main() -> None:
- parser = argparse.ArgumentParser(
- description="Scan Gogs repos and auto-configure Data Nexus webhooks.",
- )
- parser.add_argument(
- "--loop",
- action="store_true",
- help="Run continuously with a fixed interval (default: 1 hour).",
- )
- parser.add_argument(
- "--interval",
- type=int,
- default=DEFAULT_INTERVAL_SECONDS,
- help=f"Interval in seconds between scans (default: {DEFAULT_INTERVAL_SECONDS}).",
- )
- args = parser.parse_args()
- if args.loop:
- asyncio.run(run_loop(interval=args.interval))
- else:
- asyncio.run(run_once())
- if __name__ == "__main__":
- main()
|