Explorar o código

fix:commit后记录所有需要上云的数据快照

tanjingyu hai 2 semanas
pai
achega
d7bb4b96e7
Modificáronse 2 ficheiros con 88 adicións e 20 borrados
  1. 57 14
      app/services/storage_service.py
  2. 31 6
      app/services/webhook_service.py

+ 57 - 14
app/services/storage_service.py

@@ -8,6 +8,7 @@ import logging
 
 logger = logging.getLogger(__name__)
 
+
 class StorageService:
     def __init__(self, db: Session, gogs_client: GogsClient):
         self.db = db
@@ -35,46 +36,88 @@ class StorageService:
         self.db.refresh(version)
         return version
 
-    async def process_file_with_sha(self, version: DataVersion, relative_path: str, file_sha: str, owner: str, repo: str):
-        """只处理变化的文件,未变化的文件不记录。"""
-        # 查询同一项目 + 同一 stage + 同一文件路径的最新一条记录
+    def rollback_version(self, version: DataVersion):
+        """Remove a version and all its associated file records.
+
+        Used when a push didn't produce any data file changes,
+        meaning it was a code-only commit with no snapshot value.
+        """
+        self.db.query(DataFile).filter(DataFile.version_id == version.id).delete()
+        self.db.delete(version)
+        self.db.commit()
+        logger.info(f"Rolled back empty version {version.id}")
+
+    async def process_file_with_sha(
+        self,
+        version: DataVersion,
+        relative_path: str,
+        file_sha: str,
+        owner: str,
+        repo: str,
+    ) -> bool:
+        """Process a file and create a snapshot record.
+
+        **Snapshot semantics**: a record is ALWAYS created regardless of
+        whether the file changed.  This ensures every version is a
+        self-contained snapshot of all declared output files.
+
+        Returns
+        -------
+        bool
+            ``True`` if the file content actually changed (new upload),
+            ``False`` if unchanged (record reuses previous OSS key).
+        """
+        # Find the latest record for this file in the same project + stage
         last_file = (
             self.db.query(DataFile)
             .join(DataVersion)
             .filter(
                 DataVersion.project_id == version.project_id,
                 DataVersion.stage == version.stage,
-                DataFile.relative_path == relative_path
+                DataFile.relative_path == relative_path,
             )
             .order_by(DataVersion.created_at.desc())
             .first()
         )
 
         if last_file and last_file.file_sha == file_sha:
-            logger.info(f"File {relative_path} (SHA: {file_sha}) unchanged. Skipping.")
-            return
+            # ── Unchanged: reuse previous OSS key, still record a snapshot entry ──
+            new_file = DataFile(
+                version_id=version.id,
+                relative_path=relative_path,
+                storage_path=last_file.storage_path,
+                file_size=last_file.file_size,
+                file_type=last_file.file_type,
+                file_sha=file_sha,
+            )
+            self.db.add(new_file)
+            self.db.commit()
+            logger.info(
+                f"File {relative_path} (SHA: {file_sha[:8]}…) "
+                f"unchanged — snapshot recorded, reusing OSS key"
+            )
+            return False
 
-        # 文件是新的或有变化,下载并上传到 OSS
-        logger.info(f"File {relative_path} (SHA: {file_sha}) changed. Downloading.")
+        # ── Changed or new: download → upload → record ──
+        logger.info(f"File {relative_path} (SHA: {file_sha[:8]}…) changed — downloading")
         content = await self.gogs.get_file_content(owner, repo, version.commit_id, relative_path)
         file_size = len(content)
 
         project_name = version.project.project_name
+        oss_key = oss_client._build_key(
+            project_name, version.stage, version.commit_id, relative_path
+        )
 
-        # 构建 OSS key
-        oss_key = oss_client._build_key(project_name, version.stage, version.commit_id, relative_path)
-
-        # 上传到 OSS
         oss_client.upload(oss_key, content)
 
-        # 创建记录(storage_path 存 OSS key)
         new_file = DataFile(
             version_id=version.id,
             relative_path=relative_path,
             storage_path=oss_key,
             file_size=file_size,
             file_type=os.path.splitext(relative_path)[1],
-            file_sha=file_sha
+            file_sha=file_sha,
         )
         self.db.add(new_file)
         self.db.commit()
+        return True

+ 31 - 6
app/services/webhook_service.py

@@ -108,13 +108,32 @@ class WebhookService:
             )
             logger.info(f"Processing stage '{stage_name}' with {len(outputs)} output rules")
 
-            # Process outputs for this stage (fetch only needed files)
-            await self._process_outputs(version, outputs, owner, repo_name, after_sha)
+            # Process outputs and check if any file actually changed
+            has_changes = await self._process_outputs(
+                version, outputs, owner, repo_name, after_sha
+            )
+
+            if not has_changes:
+                # No data files changed — this was a code-only push, discard the snapshot
+                self.storage.rollback_version(version)
+                logger.info(
+                    f"Stage '{stage_name}': no data file changes detected. "
+                    f"Version discarded (code-only push)."
+                )
 
     async def _process_outputs(
         self, version, outputs: list, owner: str, repo_name: str, commit_id: str
-    ):
-        """Process output rules and download matching files (on-demand fetching)."""
+    ) -> bool:
+        """Process output rules, create snapshot records for ALL matching files.
+
+        Returns
+        -------
+        bool
+            ``True`` if at least one file had actual content changes,
+            ``False`` if every file was unchanged.
+        """
+        has_changes = False
+
         for output in outputs:
             raw_path_pattern = output.get("path", "")
             match_pattern = output.get("pattern", "*")
@@ -135,9 +154,11 @@ class WebhookService:
                     rel_name = file_path[len(dir_path) + 1:] if file_path.startswith(dir_path + "/") else file_path
                     if fnmatch.fnmatch(rel_name, match_pattern):
                         try:
-                            await self.storage.process_file_with_sha(
+                            changed = await self.storage.process_file_with_sha(
                                 version, file_path, file_info.get("sha"), owner, repo_name
                             )
+                            if changed:
+                                has_changes = True
                         except Exception as e:
                             logger.error(f"Failed to process file {file_path}: {e}")
             else:
@@ -147,10 +168,14 @@ class WebhookService:
                 file_info = await self.gogs.get_file_info(owner, repo_name, commit_id, path_pattern)
                 if file_info:
                     try:
-                        await self.storage.process_file_with_sha(
+                        changed = await self.storage.process_file_with_sha(
                             version, path_pattern, file_info.get("sha"), owner, repo_name
                         )
+                        if changed:
+                            has_changes = True
                     except Exception as e:
                         logger.error(f"Failed to process file {path_pattern}: {e}")
                 else:
                     logger.warning(f"File not found: {path_pattern}")
+
+        return has_changes