|
|
@@ -69,7 +69,13 @@ class WebhookService:
|
|
|
# 4. Get or create project
|
|
|
project = self.storage.get_or_create_project(project_name)
|
|
|
|
|
|
- # 5. Process stages (support both old and new format)
|
|
|
+ # 5. Get all changed files from payload for pre-filtering
|
|
|
+ all_changed_files = self._get_all_changed_files(payload)
|
|
|
+ manifest_changed = "manifest.yaml" in all_changed_files
|
|
|
+
|
|
|
+ logger.info(f"Detected {len(all_changed_files)} changed files. Manifest changed: {manifest_changed}")
|
|
|
+
|
|
|
+ # 6. Process stages
|
|
|
stages = manifest.get("stages", [])
|
|
|
|
|
|
# Backward compatibility: old single-stage format
|
|
|
@@ -91,6 +97,12 @@ class WebhookService:
|
|
|
logger.warning("Stage missing name, skipping")
|
|
|
continue
|
|
|
|
|
|
+ # --- PRE-FILTERING LOGIC ---
|
|
|
+ # Skip if manifest hasn't changed AND no files in this stage's scope have changed
|
|
|
+ if not manifest_changed and not self._is_stage_affected(outputs, all_changed_files):
|
|
|
+ logger.info(f"Stage '{stage_name}': No relevant files changed. Skipping processing.")
|
|
|
+ continue
|
|
|
+
|
|
|
# Check if this stage+commit already processed (idempotency)
|
|
|
existing_version = self.db.query(DataVersion).filter(
|
|
|
DataVersion.project_id == project.id,
|
|
|
@@ -106,11 +118,11 @@ class WebhookService:
|
|
|
version = self.storage.create_version(
|
|
|
project.id, stage_name, after_sha, author_name, manifest=manifest_content
|
|
|
)
|
|
|
-
|
|
|
+
|
|
|
if not version:
|
|
|
logger.info(f"Stage '{stage_name}' (commit {after_sha[:8]}) is already being processed. Skipping.")
|
|
|
continue
|
|
|
-
|
|
|
+
|
|
|
logger.info(f"Processing stage '{stage_name}' with {len(outputs)} output rules")
|
|
|
|
|
|
# Process outputs and check if any file actually changed
|
|
|
@@ -127,6 +139,37 @@ class WebhookService:
|
|
|
f"Version discarded."
|
|
|
)
|
|
|
|
|
|
+ def _get_all_changed_files(self, payload: dict) -> set[str]:
|
|
|
+ """Extract all added, modified, and removed files from all commits in payload."""
|
|
|
+ files = set()
|
|
|
+ commits = payload.get("commits", [])
|
|
|
+ for commit in commits:
|
|
|
+ for key in ["added", "modified", "removed"]:
|
|
|
+ for f in (commit.get(key) or []):
|
|
|
+ files.add(normalize_path(f))
|
|
|
+ return files
|
|
|
+
|
|
|
+ def _is_stage_affected(self, outputs: list, changed_files: set[str]) -> bool:
|
|
|
+ """Check if any of the changed files fall under the scope of the stage's outputs."""
|
|
|
+ if not changed_files:
|
|
|
+ return True # Fallback: if we can't tell what changed, process it
|
|
|
+
|
|
|
+ for output in outputs:
|
|
|
+ path_pattern = normalize_path(output.get("path", ""))
|
|
|
+ is_dir = is_directory_pattern(output.get("path", ""))
|
|
|
+
|
|
|
+ for f in changed_files:
|
|
|
+ if is_dir:
|
|
|
+ # If it's a directory output, any change inside that directory counts
|
|
|
+ dir_path = path_pattern.rstrip("/")
|
|
|
+ if f == dir_path or f.startswith(dir_path + "/"):
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ # Single file output: exact match
|
|
|
+ if f == path_pattern:
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
async def _process_outputs(
|
|
|
self, version, outputs: list, owner: str, repo_name: str, commit_id: str
|
|
|
) -> bool:
|