| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- import yaml
- import logging
- import fnmatch
- from sqlalchemy.orm import Session
- from app.models import Project, DataVersion
- from app.services.gogs_client import GogsClient
- from app.services.storage_service import StorageService
- logger = logging.getLogger(__name__)
- def normalize_path(path: str) -> str:
- """Normalize path by removing ./ prefix."""
- path = path.strip()
- if path.startswith("./"):
- path = path[2:]
- return path
- def is_directory_pattern(path: str) -> bool:
- """Check if the path pattern represents a directory."""
- return path.endswith("/")
- class WebhookService:
- def __init__(self, db: Session):
- self.db = db
- self.gogs = GogsClient()
- self.storage = StorageService(db, self.gogs)
- async def process_webhook(self, payload: dict):
- # 1. Parse payload
- ref = payload.get("ref")
- if not ref:
- logger.warning("No ref in payload")
- return
- after_sha = payload.get("after")
- repo = payload.get("repository", {})
- repo_name = repo.get("name")
- owner = repo.get("owner", {}).get("username")
- pusher = payload.get("pusher", {})
- author_name = pusher.get("username", "unknown")
- if not after_sha or not repo_name or not owner:
- logger.error("Invalid payload: missing essential fields")
- return
- logger.info(f"Processing push for {owner}/{repo_name} commit {after_sha}")
- # 2. Get manifest
- manifest_content = await self.gogs.get_manifest(owner, repo_name, after_sha)
- if not manifest_content:
- logger.info("No manifest.yaml found. Skipping.")
- return
- try:
- manifest = yaml.safe_load(manifest_content)
- except yaml.YAMLError as e:
- logger.error(f"Failed to parse manifest: {e}")
- return
- # 3. Validation
- project_name = manifest.get("project_name")
- if not project_name:
- logger.error("Manifest missing project_name")
- return
- # 4. Get or create project
- project = self.storage.get_or_create_project(project_name)
- # 5. Process stages (support both old and new format)
- stages = manifest.get("stages", [])
- # Backward compatibility: old single-stage format
- if not stages and manifest.get("stage"):
- stages = [{
- "name": manifest.get("stage"),
- "outputs": manifest.get("outputs", [])
- }]
- if not stages:
- logger.error("Manifest missing stages configuration")
- return
- for stage_config in stages:
- stage_name = stage_config.get("name")
- outputs = stage_config.get("outputs", [])
- if not stage_name:
- logger.warning("Stage missing name, skipping")
- continue
- # Check if this stage+commit already processed (idempotency)
- existing_version = self.db.query(DataVersion).filter(
- DataVersion.project_id == project.id,
- DataVersion.stage == stage_name,
- DataVersion.commit_id == after_sha
- ).first()
- if existing_version:
- logger.info(f"Stage '{stage_name}' already processed. Skipping.")
- continue
- # Create version for this stage
- version = self.storage.create_version(
- project.id, stage_name, after_sha, author_name, manifest_content
- )
- logger.info(f"Processing stage '{stage_name}' with {len(outputs)} output rules")
- # Process outputs and check if any file actually changed
- has_changes = await self._process_outputs(
- version, outputs, owner, repo_name, after_sha
- )
- if not has_changes:
- # No data files changed — this was a code-only push, discard the snapshot
- self.storage.rollback_version(version)
- logger.info(
- f"Stage '{stage_name}': no data file changes detected. "
- f"Version discarded (code-only push)."
- )
- async def _process_outputs(
- self, version, outputs: list, owner: str, repo_name: str, commit_id: str
- ) -> bool:
- """Process output rules, create snapshot records for ALL matching files.
- Returns
- -------
- bool
- ``True`` if at least one file had actual content changes,
- ``False`` if every file was unchanged.
- """
- has_changes = False
- for output in outputs:
- raw_path_pattern = output.get("path", "")
- match_pattern = output.get("pattern", "*")
- path_pattern = normalize_path(raw_path_pattern)
- is_dir = is_directory_pattern(raw_path_pattern)
- if is_dir:
- # Directory pattern: fetch only this directory's files
- dir_path = path_pattern.rstrip("/")
- logger.info(f"Fetching directory: {dir_path} with pattern: {match_pattern}")
- files = await self.gogs.get_directory_tree(owner, repo_name, commit_id, dir_path)
- for file_info in files:
- file_path = file_info.get("path")
- # Apply pattern matching
- rel_name = file_path[len(dir_path) + 1:] if file_path.startswith(dir_path + "/") else file_path
- if fnmatch.fnmatch(rel_name, match_pattern):
- try:
- changed = await self.storage.process_file_with_sha(
- version, file_path, file_info.get("sha"), owner, repo_name
- )
- if changed:
- has_changes = True
- except Exception as e:
- logger.error(f"Failed to process file {file_path}: {e}")
- else:
- # Single file: fetch only this file's info
- logger.info(f"Fetching single file: {path_pattern}")
- file_info = await self.gogs.get_file_info(owner, repo_name, commit_id, path_pattern)
- if file_info:
- try:
- changed = await self.storage.process_file_with_sha(
- version, path_pattern, file_info.get("sha"), owner, repo_name
- )
- if changed:
- has_changes = True
- except Exception as e:
- logger.error(f"Failed to process file {path_pattern}: {e}")
- else:
- logger.warning(f"File not found: {path_pattern}")
- return has_changes
|