| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- import yaml
- import logging
- import fnmatch
- from sqlalchemy.orm import Session
- from app.models import Project, DataVersion
- from app.services.gogs_client import GogsClient
- from app.services.storage_service import StorageService
- logger = logging.getLogger(__name__)
- def normalize_path(path: str) -> str:
- """Normalize path by removing ./ prefix."""
- path = path.strip()
- if path.startswith("./"):
- path = path[2:]
- return path
- def is_directory_pattern(path: str) -> bool:
- """Check if the path pattern represents a directory."""
- return path.endswith("/")
- class WebhookService:
- def __init__(self, db: Session):
- self.db = db
- self.gogs = GogsClient()
- self.storage = StorageService(db, self.gogs)
- async def process_webhook(self, payload: dict):
- # 1. Parse payload
- ref = payload.get("ref")
- if not ref:
- logger.warning("No ref in payload")
- return
- after_sha = payload.get("after")
- repo = payload.get("repository", {})
- repo_name = repo.get("name")
- owner = repo.get("owner", {}).get("username")
- pusher = payload.get("pusher", {})
- author_name = pusher.get("username", "unknown")
- if not after_sha or not repo_name or not owner:
- logger.error("Invalid payload: missing essential fields")
- return
- logger.info(f"Processing push for {owner}/{repo_name} commit {after_sha}")
- # 2. Get manifest
- manifest_content = await self.gogs.get_manifest(owner, repo_name, after_sha)
- if not manifest_content:
- logger.info("No manifest.yaml found. Skipping.")
- return
- try:
- manifest = yaml.safe_load(manifest_content)
- except yaml.YAMLError as e:
- logger.error(f"Failed to parse manifest: {e}")
- return
- # 3. Validation
- project_name = manifest.get("project_name")
- if not project_name:
- logger.error("Manifest missing project_name")
- return
- # 4. Get or create project
- project = self.storage.get_or_create_project(project_name)
- # 5. Get all changed files from payload for pre-filtering
- all_changed_files = self._get_all_changed_files(payload)
- manifest_changed = "manifest.yaml" in all_changed_files
- logger.info(f"Detected {len(all_changed_files)} changed files. Manifest changed: {manifest_changed}")
- # 6. Process stages
- stages = manifest.get("stages", [])
- # Backward compatibility: old single-stage format
- if not stages and manifest.get("stage"):
- stages = [{
- "name": manifest.get("stage"),
- "outputs": manifest.get("outputs", [])
- }]
- if not stages:
- logger.error("Manifest missing stages configuration")
- return
- for stage_config in stages:
- stage_name = stage_config.get("name")
- outputs = stage_config.get("outputs", [])
- if not stage_name:
- logger.warning("Stage missing name, skipping")
- continue
- # --- PRE-FILTERING LOGIC ---
- # Skip if manifest hasn't changed AND no files in this stage's scope have changed
- if not manifest_changed and not self._is_stage_affected(outputs, all_changed_files):
- logger.info(f"Stage '{stage_name}': No relevant files changed. Skipping processing.")
- continue
- # Check if this stage+commit already processed (idempotency)
- existing_version = self.db.query(DataVersion).filter(
- DataVersion.project_id == project.id,
- DataVersion.stage == stage_name,
- DataVersion.commit_id == after_sha
- ).first()
- if existing_version:
- logger.info(f"Stage '{stage_name}' already processed. Skipping.")
- continue
- # Get commit message from payload if available
- commit_msg = None
- commits = payload.get("commits", [])
- if commits:
- commit_msg = commits[0].get("message")
- # Create version for this stage
- version = self.storage.create_version(
- project.id, stage_name, after_sha, author_name, manifest=manifest_content, commit_message=commit_msg
- )
- if not version:
- logger.info(f"Stage '{stage_name}' (commit {after_sha[:8]}) is already being processed. Skipping.")
- continue
- logger.info(f"Processing stage '{stage_name}' with {len(outputs)} output rules")
- # Process outputs and check if any file actually changed
- has_new_uploads = await self._process_outputs(
- version, outputs, owner, repo_name, after_sha
- )
- # Check if this version represents a real change (content OR file set)
- if not self.storage.is_snapshot_changed(version, has_new_uploads):
- # No changes detected — this was a code-only push, discard the snapshot
- self.storage.rollback_version(version)
- logger.info(
- f"Stage '{stage_name}': no data changes detected (content and file set same). "
- f"Version discarded."
- )
- else:
- self.storage.aggregate_version_records(version)
- def _get_all_changed_files(self, payload: dict) -> set[str]:
- """Extract all added, modified, and removed files from all commits in payload."""
- files = set()
- commits = payload.get("commits", [])
- for commit in commits:
- for key in ["added", "modified", "removed"]:
- for f in (commit.get(key) or []):
- files.add(normalize_path(f))
- return files
- def _is_stage_affected(self, outputs: list, changed_files: set[str]) -> bool:
- """Check if any of the changed files fall under the scope of the stage's outputs."""
- if not changed_files:
- return True # Fallback: if we can't tell what changed, process it
- for output in outputs:
- path_pattern = normalize_path(output.get("path", ""))
- is_dir = is_directory_pattern(output.get("path", ""))
- for f in changed_files:
- if is_dir:
- # If it's a directory output, any change inside that directory counts
- dir_path = path_pattern.rstrip("/")
- if '*' in dir_path:
- import fnmatch
- if fnmatch.fnmatch(f, dir_path + "/*") or fnmatch.fnmatch(f, dir_path):
- return True
- else:
- if f == dir_path or f.startswith(dir_path + "/"):
- return True
- else:
- # Single file output: exact match
- if f == path_pattern:
- return True
- return False
- async def _process_outputs(
- self, version, outputs: list, owner: str, repo_name: str, commit_id: str
- ) -> bool:
- """Process output rules, create snapshot records for ALL matching files.
- Returns
- -------
- bool
- ``True`` if at least one file had actual content changes,
- ``False`` if every file was unchanged.
- """
- has_changes = False
- for output in outputs:
- raw_path_pattern = output.get("path", "")
- # Support both string and list for pattern and exclude
- patterns = output.get("pattern", "*")
- excludes = output.get("exclude")
- direction = output.get("direction")
- label = output.get("label")
- extract_json_key = output.get("extract_json_key")
- directory_depth = output.get("directory_depth")
- path_pattern = normalize_path(raw_path_pattern)
- is_dir = is_directory_pattern(raw_path_pattern)
- dir_path = path_pattern.rstrip("/")
- if is_dir:
- # Directory pattern: fetch files from the closest static parent directory
- # For `data/*/test/`, that is `data/`
- import re
-
- # Split by first wildcard chunk path
- wildcard_idx = dir_path.find('*')
- if wildcard_idx != -1:
- static_base = dir_path[:wildcard_idx]
- # Trim back to the nearest directory separator
- last_sep = static_base.rfind('/')
- if last_sep != -1:
- static_base = static_base[:last_sep]
- else:
- static_base = "" # ROOT
- else:
- static_base = dir_path
-
- static_base = static_base.strip('/')
-
- logger.info(f"Fetching directory: {static_base} (to match wildcard path: {dir_path}) with patterns: {patterns}, excludes: {excludes}")
- files = await self.gogs.get_directory_tree(owner, repo_name, commit_id, static_base)
- for file_info in files:
- file_path = file_info.get("path")
-
- # 1. First verify if the full path matches the wildcard directory path provided
- if '*' in dir_path:
- # e.g dir_path: data/*/test/ -> match: data/*/test/*
- if not fnmatch.fnmatch(file_path, dir_path + "/*") and not fnmatch.fnmatch(file_path, dir_path):
- continue
- else:
- if not file_path.startswith(dir_path + "/"):
- continue
-
- # Calculate name relative to the matched base path segment for pattern matching
- import os
- rel_name = os.path.basename(file_path)
- if self._match_patterns(rel_name, patterns, excludes):
- try:
- changed = await self.storage.process_file_with_sha(
- version, file_path, file_info.get("sha"), owner, repo_name,
- direction=direction, label=label, extract_json_key=extract_json_key,
- directory_depth=directory_depth
- )
- if changed:
- has_changes = True
- except Exception as e:
- logger.error(f"Failed to process file {file_path}: {e}")
- else:
- # Single file: fetch only this file's info
- logger.info(f"Fetching single file: {path_pattern}")
- file_info = await self.gogs.get_file_info(owner, repo_name, commit_id, path_pattern)
- if file_info:
- # Apply pattern matching to the filename for consistency
- import os
- filename = os.path.basename(path_pattern)
- if self._match_patterns(filename, patterns, excludes):
- try:
- changed = await self.storage.process_file_with_sha(
- version, path_pattern, file_info.get("sha"), owner, repo_name,
- direction=direction, label=label, extract_json_key=extract_json_key,
- directory_depth=directory_depth
- )
- if changed:
- has_changes = True
- except Exception as e:
- logger.error(f"Failed to process file {path_pattern}: {e}")
- else:
- logger.warning(f"File not found: {path_pattern}")
- return has_changes
- def _match_patterns(
- self,
- filename: str,
- include_patterns: str | list[str],
- exclude_patterns: str | list[str] | None = None,
- ) -> bool:
- """Helper to match filename against multiple include and exclude glob patterns."""
- # Normalize to lists
- includes = (
- [include_patterns] if isinstance(include_patterns, str) else include_patterns
- )
- excludes = []
- if exclude_patterns:
- excludes = (
- [exclude_patterns] if isinstance(exclude_patterns, str) else exclude_patterns
- )
- # 1. Check if it matches ANY include pattern (OR logic)
- is_included = any(fnmatch.fnmatch(filename, p) for p in includes)
- if not is_included:
- return False
- # 2. Check if it matches ANY exclude pattern (OR logic: any match means reject)
- is_excluded = any(fnmatch.fnmatch(filename, p) for p in excludes)
- return not is_excluded
|