webhook_service.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. import yaml
  2. import logging
  3. import fnmatch
  4. import os
  5. import asyncio
  6. import re
  7. from sqlalchemy.orm import Session
  8. from app.models import Project, DataVersion
  9. from app.services.gogs_client import GogsClient
  10. from app.services.storage_service import StorageService
  11. logger = logging.getLogger(__name__)
  12. def normalize_path(path: str) -> str:
  13. """Normalize path by removing ./ prefix."""
  14. path = path.strip()
  15. if path.startswith("./"):
  16. path = path[2:]
  17. return path
  18. def is_directory_pattern(path: str) -> bool:
  19. """Check if the path pattern represents a directory."""
  20. return path.endswith("/")
  21. class WebhookService:
  22. def __init__(self, db: Session):
  23. self.db = db
  24. self.gogs = GogsClient()
  25. self.storage = StorageService(db, self.gogs)
  26. async def process_webhook(self, payload: dict):
  27. # 1. Parse payload
  28. ref = payload.get("ref")
  29. if not ref:
  30. logger.warning("No ref in payload")
  31. return
  32. after_sha = payload.get("after")
  33. repo = payload.get("repository", {})
  34. repo_name = repo.get("name")
  35. owner = repo.get("owner", {}).get("username")
  36. pusher = payload.get("pusher", {})
  37. author_name = pusher.get("username", "unknown")
  38. if not after_sha or not repo_name or not owner:
  39. logger.error("Invalid payload: missing essential fields")
  40. return
  41. logger.info(f"Processing push for {owner}/{repo_name} commit {after_sha}")
  42. # 2. Get manifest
  43. manifest_content = await self.gogs.get_manifest(owner, repo_name, after_sha)
  44. if not manifest_content:
  45. logger.info("No manifest.yaml found. Skipping.")
  46. return
  47. try:
  48. manifest = yaml.safe_load(manifest_content)
  49. except yaml.YAMLError as e:
  50. logger.error(f"Failed to parse manifest: {e}")
  51. return
  52. # 3. Validation
  53. project_name = manifest.get("project_name")
  54. if not project_name:
  55. logger.error("Manifest missing project_name")
  56. return
  57. # 4. Get or create project
  58. project = self.storage.get_or_create_project(project_name)
  59. # 5. Get all changed files from payload for pre-filtering
  60. all_changed_files = self._get_all_changed_files(payload)
  61. manifest_changed = "manifest.yaml" in all_changed_files
  62. logger.info(f"Detected {len(all_changed_files)} changed files. Manifest changed: {manifest_changed}")
  63. # 6. Process stages
  64. stages = manifest.get("stages", [])
  65. # Backward compatibility: old single-stage format
  66. if not stages and manifest.get("stage"):
  67. stages = [{
  68. "name": manifest.get("stage"),
  69. "outputs": manifest.get("outputs", [])
  70. }]
  71. if not stages:
  72. logger.error("Manifest missing stages configuration")
  73. return
  74. for stage_config in stages:
  75. stage_name = stage_config.get("name")
  76. outputs = stage_config.get("outputs", [])
  77. if not stage_name:
  78. logger.warning("Stage missing name, skipping")
  79. continue
  80. # --- PRE-FILTERING LOGIC ---
  81. # Skip if manifest hasn't changed AND no files in this stage's scope have changed
  82. if not manifest_changed and not self._is_stage_affected(outputs, all_changed_files):
  83. logger.info(f"Stage '{stage_name}': No relevant files changed. Skipping processing.")
  84. continue
  85. # Check if this stage+commit already processed (idempotency)
  86. existing_version = self.db.query(DataVersion).filter(
  87. DataVersion.project_id == project.id,
  88. DataVersion.stage == stage_name,
  89. DataVersion.commit_id == after_sha
  90. ).first()
  91. if existing_version:
  92. logger.info(f"Stage '{stage_name}' already processed. Skipping.")
  93. continue
  94. # Get commit message from payload if available
  95. commit_msg = None
  96. commits = payload.get("commits", [])
  97. if commits:
  98. commit_msg = commits[0].get("message")
  99. # Create version for this stage
  100. version = self.storage.create_version(
  101. project.id, stage_name, after_sha, author_name, manifest=manifest_content, commit_message=commit_msg
  102. )
  103. if not version:
  104. logger.info(f"Stage '{stage_name}' (commit {after_sha[:8]}) is already being processed. Skipping.")
  105. continue
  106. logger.info(f"Processing stage '{stage_name}' with {len(outputs)} output rules")
  107. # Process ONLY changed files that match output rules (no directory tree fetching)
  108. has_new_uploads = await self._process_outputs(
  109. version, outputs, owner, repo_name, after_sha, all_changed_files
  110. )
  111. # Check if this version represents a real change (content OR file set)
  112. if not self.storage.is_snapshot_changed(version, has_new_uploads):
  113. # No changes detected — this was a code-only push, discard the snapshot
  114. self.storage.rollback_version(version)
  115. logger.info(
  116. f"Stage '{stage_name}': no data changes detected (content and file set same). "
  117. f"Version discarded."
  118. )
  119. else:
  120. self.storage.aggregate_version_records(version)
  121. def _get_all_changed_files(self, payload: dict) -> set[str]:
  122. """Extract all added, modified, and removed files from all commits in payload."""
  123. files = set()
  124. commits = payload.get("commits", [])
  125. for commit in commits:
  126. for key in ["added", "modified", "removed"]:
  127. for f in (commit.get(key) or []):
  128. files.add(normalize_path(f))
  129. return files
  130. def _is_stage_affected(self, outputs: list, changed_files: set[str]) -> bool:
  131. """Check if any of the changed files fall under the scope of the stage's outputs."""
  132. if not changed_files:
  133. return True # Fallback: if we can't tell what changed, process it
  134. for output in outputs:
  135. path_pattern = normalize_path(output.get("path", ""))
  136. is_dir = is_directory_pattern(output.get("path", ""))
  137. for f in changed_files:
  138. if is_dir:
  139. # If it's a directory output, any change inside that directory counts
  140. dir_path = path_pattern.rstrip("/")
  141. if '*' in dir_path:
  142. import fnmatch
  143. if fnmatch.fnmatch(f, dir_path + "/*") or fnmatch.fnmatch(f, dir_path):
  144. return True
  145. else:
  146. if f == dir_path or f.startswith(dir_path + "/"):
  147. return True
  148. else:
  149. # Single file output: exact match
  150. if f == path_pattern:
  151. return True
  152. return False
  153. def _find_matching_output(self, file_path: str, outputs: list) -> dict | None:
  154. """Check if a file path matches any manifest output rule using LOCAL logic only.
  155. No Gogs API calls are made — this is pure string/glob matching.
  156. Returns the matching output config dict, or None.
  157. """
  158. for output in outputs:
  159. raw_path = output.get("path", "")
  160. path_pattern = normalize_path(raw_path)
  161. is_dir = is_directory_pattern(raw_path)
  162. patterns = output.get("pattern", "*")
  163. excludes = output.get("exclude")
  164. if is_dir:
  165. dir_path = path_pattern.rstrip("/")
  166. if '*' in dir_path:
  167. if not fnmatch.fnmatch(file_path, dir_path + "/*") and not fnmatch.fnmatch(file_path, dir_path):
  168. continue
  169. else:
  170. if not file_path.startswith(dir_path + "/"):
  171. continue
  172. filename = os.path.basename(file_path)
  173. if self._match_patterns(filename, patterns, excludes):
  174. return output
  175. else:
  176. if file_path == path_pattern:
  177. return output
  178. return None
  179. async def _fetch_and_process_file(
  180. self, version, file_path: str, output_config: dict,
  181. owner: str, repo_name: str, commit_id: str
  182. ) -> bool:
  183. """Get file SHA from Gogs and process a single changed file, plus its paired input if configured."""
  184. file_info = await self.gogs.get_file_info(owner, repo_name, commit_id, file_path)
  185. if not file_info:
  186. logger.info(f"File {file_path} not found at commit {commit_id[:8]} (removed). Skipping.")
  187. return False
  188. # Calculate group_key here so both paired input and output can share it
  189. directory_depth = output_config.get("directory_depth")
  190. if directory_depth is not None and directory_depth > 0:
  191. parts = file_path.split("/")
  192. if len(parts) > 1:
  193. group_key = "/".join(parts[:-1][:directory_depth])
  194. else:
  195. group_key = ""
  196. else:
  197. group_key = os.path.dirname(file_path)
  198. has_change = await self.storage.process_file_with_sha(
  199. version, file_path, file_info.get("sha"), owner, repo_name,
  200. direction=output_config.get("direction"),
  201. label=output_config.get("label"),
  202. extract_json_key=output_config.get("extract_json_key"),
  203. directory_depth=directory_depth,
  204. group_key=group_key,
  205. )
  206. # Handle paired_input active fetching
  207. paired_input = output_config.get("paired_input")
  208. if paired_input:
  209. extract_regex = paired_input.get("extract_regex")
  210. path_template = paired_input.get("path_template")
  211. if extract_regex and path_template:
  212. match = re.search(extract_regex, file_path)
  213. if match:
  214. # Construct paired file path using named capture groups
  215. try:
  216. paired_path = path_template.format(**match.groupdict())
  217. except KeyError as e:
  218. logger.error(f"Failed to format paired_path: missing {e} in regex match for {file_path}")
  219. paired_path = None
  220. if paired_path:
  221. # Actively fetch paired file info from Gogs
  222. paired_info = await self.gogs.get_file_info(owner, repo_name, commit_id, paired_path)
  223. if paired_info:
  224. paired_changed = await self.storage.process_file_with_sha(
  225. version, paired_path, paired_info.get("sha"), owner, repo_name,
  226. direction=paired_input.get("direction", "input"),
  227. label=paired_input.get("label"),
  228. extract_json_key=paired_input.get("extract_json_key"),
  229. group_key=group_key, # Link them together!
  230. )
  231. has_change = has_change or paired_changed
  232. else:
  233. logger.warning(f"Paired input file not found at commit {commit_id[:8]}: {paired_path}")
  234. return has_change
  235. async def _process_outputs(
  236. self, version, outputs: list, owner: str, repo_name: str, commit_id: str,
  237. changed_files: set[str]
  238. ) -> bool:
  239. """Process ONLY changed files that match manifest output rules.
  240. Instead of fetching entire directory trees from Gogs API (slow),
  241. we match the webhook payload's changed-file list against manifest
  242. rules using LOCAL string/glob logic — zero API calls for matching.
  243. Returns True if at least one file had actual content changes.
  244. """
  245. # Step 1: Local matching — zero API calls
  246. matched_files = []
  247. for file_path in changed_files:
  248. matched_output = self._find_matching_output(file_path, outputs)
  249. if matched_output is not None:
  250. matched_files.append((file_path, matched_output))
  251. if not matched_files:
  252. logger.info("No changed files matched any output rule.")
  253. return False
  254. logger.info(f"Matched {len(matched_files)} changed file(s) to output rules. Processing in parallel.")
  255. # Step 2: Fetch file info + download/upload in parallel
  256. tasks = [
  257. self._fetch_and_process_file(version, fp, oc, owner, repo_name, commit_id)
  258. for fp, oc in matched_files
  259. ]
  260. has_changes = False
  261. results = await asyncio.gather(*tasks, return_exceptions=True)
  262. for i, res in enumerate(results):
  263. if isinstance(res, Exception):
  264. logger.error(f"Error processing {matched_files[i][0]}: {res}")
  265. elif res is True:
  266. has_changes = True
  267. return has_changes
  268. def _match_patterns(
  269. self,
  270. filename: str,
  271. include_patterns: str | list[str],
  272. exclude_patterns: str | list[str] | None = None,
  273. ) -> bool:
  274. """Helper to match filename against multiple include and exclude glob patterns."""
  275. # Normalize to lists
  276. includes = (
  277. [include_patterns] if isinstance(include_patterns, str) else include_patterns
  278. )
  279. excludes = []
  280. if exclude_patterns:
  281. excludes = (
  282. [exclude_patterns] if isinstance(exclude_patterns, str) else exclude_patterns
  283. )
  284. # 1. Check if it matches ANY include pattern (OR logic)
  285. is_included = any(fnmatch.fnmatch(filename, p) for p in includes)
  286. if not is_included:
  287. return False
  288. # 2. Check if it matches ANY exclude pattern (OR logic: any match means reject)
  289. is_excluded = any(fnmatch.fnmatch(filename, p) for p in excludes)
  290. return not is_excluded