webhook_service.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import yaml
  2. import logging
  3. import fnmatch
  4. from sqlalchemy.orm import Session
  5. from app.models import Project, DataVersion
  6. from app.services.gogs_client import GogsClient
  7. from app.services.storage_service import StorageService
  8. logger = logging.getLogger(__name__)
  9. def normalize_path(path: str) -> str:
  10. """Normalize path by removing ./ prefix."""
  11. path = path.strip()
  12. if path.startswith("./"):
  13. path = path[2:]
  14. return path
  15. def is_directory_pattern(path: str) -> bool:
  16. """Check if the path pattern represents a directory."""
  17. return path.endswith("/")
  18. class WebhookService:
  19. def __init__(self, db: Session):
  20. self.db = db
  21. self.gogs = GogsClient()
  22. self.storage = StorageService(db, self.gogs)
  23. async def process_webhook(self, payload: dict):
  24. # 1. Parse payload
  25. ref = payload.get("ref")
  26. if not ref:
  27. logger.warning("No ref in payload")
  28. return
  29. after_sha = payload.get("after")
  30. repo = payload.get("repository", {})
  31. repo_name = repo.get("name")
  32. owner = repo.get("owner", {}).get("username")
  33. pusher = payload.get("pusher", {})
  34. author_name = pusher.get("username", "unknown")
  35. if not after_sha or not repo_name or not owner:
  36. logger.error("Invalid payload: missing essential fields")
  37. return
  38. logger.info(f"Processing push for {owner}/{repo_name} commit {after_sha}")
  39. # 2. Get manifest
  40. manifest_content = await self.gogs.get_manifest(owner, repo_name, after_sha)
  41. if not manifest_content:
  42. logger.info("No manifest.yaml found. Skipping.")
  43. return
  44. try:
  45. manifest = yaml.safe_load(manifest_content)
  46. except yaml.YAMLError as e:
  47. logger.error(f"Failed to parse manifest: {e}")
  48. return
  49. # 3. Validation
  50. project_name = manifest.get("project_name")
  51. if not project_name:
  52. logger.error("Manifest missing project_name")
  53. return
  54. # 4. Get or create project
  55. project = self.storage.get_or_create_project(project_name)
  56. # 5. Process stages (support both old and new format)
  57. stages = manifest.get("stages", [])
  58. # Backward compatibility: old single-stage format
  59. if not stages and manifest.get("stage"):
  60. stages = [{
  61. "name": manifest.get("stage"),
  62. "outputs": manifest.get("outputs", [])
  63. }]
  64. if not stages:
  65. logger.error("Manifest missing stages configuration")
  66. return
  67. for stage_config in stages:
  68. stage_name = stage_config.get("name")
  69. outputs = stage_config.get("outputs", [])
  70. if not stage_name:
  71. logger.warning("Stage missing name, skipping")
  72. continue
  73. # Check if this stage+commit already processed (idempotency)
  74. existing_version = self.db.query(DataVersion).filter(
  75. DataVersion.project_id == project.id,
  76. DataVersion.stage == stage_name,
  77. DataVersion.commit_id == after_sha
  78. ).first()
  79. if existing_version:
  80. logger.info(f"Stage '{stage_name}' already processed. Skipping.")
  81. continue
  82. # Create version for this stage
  83. version = self.storage.create_version(
  84. project.id, stage_name, after_sha, author_name, manifest_content
  85. )
  86. logger.info(f"Processing stage '{stage_name}' with {len(outputs)} output rules")
  87. # Process outputs and check if any file actually changed
  88. has_changes = await self._process_outputs(
  89. version, outputs, owner, repo_name, after_sha
  90. )
  91. if not has_changes:
  92. # No data files changed — this was a code-only push, discard the snapshot
  93. self.storage.rollback_version(version)
  94. logger.info(
  95. f"Stage '{stage_name}': no data file changes detected. "
  96. f"Version discarded (code-only push)."
  97. )
  98. async def _process_outputs(
  99. self, version, outputs: list, owner: str, repo_name: str, commit_id: str
  100. ) -> bool:
  101. """Process output rules, create snapshot records for ALL matching files.
  102. Returns
  103. -------
  104. bool
  105. ``True`` if at least one file had actual content changes,
  106. ``False`` if every file was unchanged.
  107. """
  108. has_changes = False
  109. for output in outputs:
  110. raw_path_pattern = output.get("path", "")
  111. match_pattern = output.get("pattern", "*")
  112. path_pattern = normalize_path(raw_path_pattern)
  113. is_dir = is_directory_pattern(raw_path_pattern)
  114. if is_dir:
  115. # Directory pattern: fetch only this directory's files
  116. dir_path = path_pattern.rstrip("/")
  117. logger.info(f"Fetching directory: {dir_path} with pattern: {match_pattern}")
  118. files = await self.gogs.get_directory_tree(owner, repo_name, commit_id, dir_path)
  119. for file_info in files:
  120. file_path = file_info.get("path")
  121. # Apply pattern matching
  122. rel_name = file_path[len(dir_path) + 1:] if file_path.startswith(dir_path + "/") else file_path
  123. if fnmatch.fnmatch(rel_name, match_pattern):
  124. try:
  125. changed = await self.storage.process_file_with_sha(
  126. version, file_path, file_info.get("sha"), owner, repo_name
  127. )
  128. if changed:
  129. has_changes = True
  130. except Exception as e:
  131. logger.error(f"Failed to process file {file_path}: {e}")
  132. else:
  133. # Single file: fetch only this file's info
  134. logger.info(f"Fetching single file: {path_pattern}")
  135. file_info = await self.gogs.get_file_info(owner, repo_name, commit_id, path_pattern)
  136. if file_info:
  137. try:
  138. changed = await self.storage.process_file_with_sha(
  139. version, path_pattern, file_info.get("sha"), owner, repo_name
  140. )
  141. if changed:
  142. has_changes = True
  143. except Exception as e:
  144. logger.error(f"Failed to process file {path_pattern}: {e}")
  145. else:
  146. logger.warning(f"File not found: {path_pattern}")
  147. return has_changes