webhook_service.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import yaml
  2. import logging
  3. import fnmatch
  4. from sqlalchemy.orm import Session
  5. from app.models import Project, DataVersion
  6. from app.services.gogs_client import GogsClient
  7. from app.services.storage_service import StorageService
  8. logger = logging.getLogger(__name__)
  9. def normalize_path(path: str) -> str:
  10. """Normalize path by removing ./ prefix."""
  11. path = path.strip()
  12. if path.startswith("./"):
  13. path = path[2:]
  14. return path
  15. def is_directory_pattern(path: str) -> bool:
  16. """Check if the path pattern represents a directory."""
  17. return path.endswith("/")
  18. class WebhookService:
  19. def __init__(self, db: Session):
  20. self.db = db
  21. self.gogs = GogsClient()
  22. self.storage = StorageService(db, self.gogs)
  23. async def process_webhook(self, payload: dict):
  24. # 1. Parse payload
  25. ref = payload.get("ref")
  26. if not ref:
  27. logger.warning("No ref in payload")
  28. return
  29. after_sha = payload.get("after")
  30. repo = payload.get("repository", {})
  31. repo_name = repo.get("name")
  32. owner = repo.get("owner", {}).get("username")
  33. pusher = payload.get("pusher", {})
  34. author_name = pusher.get("username", "unknown")
  35. if not after_sha or not repo_name or not owner:
  36. logger.error("Invalid payload: missing essential fields")
  37. return
  38. logger.info(f"Processing push for {owner}/{repo_name} commit {after_sha}")
  39. # 2. Get manifest
  40. manifest_content = await self.gogs.get_manifest(owner, repo_name, after_sha)
  41. if not manifest_content:
  42. logger.info("No manifest.yaml found. Skipping.")
  43. return
  44. try:
  45. manifest = yaml.safe_load(manifest_content)
  46. except yaml.YAMLError as e:
  47. logger.error(f"Failed to parse manifest: {e}")
  48. return
  49. # 3. Validation
  50. project_name = manifest.get("project_name")
  51. if not project_name:
  52. logger.error("Manifest missing project_name")
  53. return
  54. # 4. Get or create project
  55. project = self.storage.get_or_create_project(project_name)
  56. # 5. Process stages (support both old and new format)
  57. stages = manifest.get("stages", [])
  58. # Backward compatibility: old single-stage format
  59. if not stages and manifest.get("stage"):
  60. stages = [{
  61. "name": manifest.get("stage"),
  62. "outputs": manifest.get("outputs", [])
  63. }]
  64. if not stages:
  65. logger.error("Manifest missing stages configuration")
  66. return
  67. for stage_config in stages:
  68. stage_name = stage_config.get("name")
  69. outputs = stage_config.get("outputs", [])
  70. if not stage_name:
  71. logger.warning("Stage missing name, skipping")
  72. continue
  73. # Check if this stage+commit already processed (idempotency)
  74. existing_version = self.db.query(DataVersion).filter(
  75. DataVersion.project_id == project.id,
  76. DataVersion.stage == stage_name,
  77. DataVersion.commit_id == after_sha
  78. ).first()
  79. if existing_version:
  80. logger.info(f"Stage '{stage_name}' already processed. Skipping.")
  81. continue
  82. # Create version for this stage
  83. version = self.storage.create_version(
  84. project.id, stage_name, after_sha, author_name, manifest_content
  85. )
  86. logger.info(f"Processing stage '{stage_name}' with {len(outputs)} output rules")
  87. # Process outputs for this stage (fetch only needed files)
  88. await self._process_outputs(version, outputs, owner, repo_name, after_sha)
  89. async def _process_outputs(
  90. self, version, outputs: list, owner: str, repo_name: str, commit_id: str
  91. ):
  92. """Process output rules and download matching files (on-demand fetching)."""
  93. for output in outputs:
  94. raw_path_pattern = output.get("path", "")
  95. match_pattern = output.get("pattern", "*")
  96. path_pattern = normalize_path(raw_path_pattern)
  97. is_dir = is_directory_pattern(raw_path_pattern)
  98. if is_dir:
  99. # Directory pattern: fetch only this directory's files
  100. dir_path = path_pattern.rstrip("/")
  101. logger.info(f"Fetching directory: {dir_path} with pattern: {match_pattern}")
  102. files = await self.gogs.get_directory_tree(owner, repo_name, commit_id, dir_path)
  103. for file_info in files:
  104. file_path = file_info.get("path")
  105. # Apply pattern matching
  106. rel_name = file_path[len(dir_path) + 1:] if file_path.startswith(dir_path + "/") else file_path
  107. if fnmatch.fnmatch(rel_name, match_pattern):
  108. try:
  109. await self.storage.process_file_with_sha(
  110. version, file_path, file_info.get("sha"), owner, repo_name
  111. )
  112. except Exception as e:
  113. logger.error(f"Failed to process file {file_path}: {e}")
  114. else:
  115. # Single file: fetch only this file's info
  116. logger.info(f"Fetching single file: {path_pattern}")
  117. file_info = await self.gogs.get_file_info(owner, repo_name, commit_id, path_pattern)
  118. if file_info:
  119. try:
  120. await self.storage.process_file_with_sha(
  121. version, path_pattern, file_info.get("sha"), owner, repo_name
  122. )
  123. except Exception as e:
  124. logger.error(f"Failed to process file {path_pattern}: {e}")
  125. else:
  126. logger.warning(f"File not found: {path_pattern}")