Prechádzať zdrojové kódy

fix:并发冲突处理

tanjingyu 3 týždňov pred
rodič
commit
01c6a948f7

+ 5 - 1
app/models.py

@@ -1,4 +1,4 @@
-from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, BigInteger
+from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, BigInteger, UniqueConstraint
 from sqlalchemy.orm import relationship
 from sqlalchemy.orm import relationship
 from sqlalchemy.sql import func
 from sqlalchemy.sql import func
 from ulid import ULID
 from ulid import ULID
@@ -32,6 +32,10 @@ class DataVersion(Base):
     manifest_snapshot = Column(Text)
     manifest_snapshot = Column(Text)
     created_at = Column(DateTime(timezone=True), server_default=func.now())
     created_at = Column(DateTime(timezone=True), server_default=func.now())
 
 
+    __table_args__ = (
+        UniqueConstraint('project_id', 'stage', 'commit_id', name='_project_stage_commit_uc'),
+    )
+
     project = relationship("Project", back_populates="versions")
     project = relationship("Project", back_populates="versions")
     files = relationship("DataFile", back_populates="version")
     files = relationship("DataFile", back_populates="version")
 
 

+ 12 - 5
app/services/storage_service.py

@@ -1,5 +1,6 @@
 import os
 import os
 from sqlalchemy.orm import Session
 from sqlalchemy.orm import Session
+from sqlalchemy.exc import IntegrityError
 from app.models import Project, DataVersion, DataFile
 from app.models import Project, DataVersion, DataFile
 from app.config import settings
 from app.config import settings
 from app.services.gogs_client import GogsClient
 from app.services.gogs_client import GogsClient
@@ -23,7 +24,8 @@ class StorageService:
             self.db.refresh(project)
             self.db.refresh(project)
         return project
         return project
 
 
-    def create_version(self, project_id: str, stage: str, commit_id: str, author: str, manifest: str) -> DataVersion:
+    def create_version(self, project_id: str, stage: str, commit_id: str, author: str, manifest: str) -> DataVersion | None:
+        """Create a new data version. Returns None if a duplicate exists (IntegrityError)."""
         version = DataVersion(
         version = DataVersion(
             project_id=project_id,
             project_id=project_id,
             stage=stage,
             stage=stage,
@@ -31,10 +33,15 @@ class StorageService:
             author=author,
             author=author,
             manifest_snapshot=manifest
             manifest_snapshot=manifest
         )
         )
-        self.db.add(version)
-        self.db.commit()
-        self.db.refresh(version)
-        return version
+        try:
+            self.db.add(version)
+            self.db.commit()
+            self.db.refresh(version)
+            return version
+        except IntegrityError:
+            self.db.rollback()
+            logger.info(f"Version already exists for project {project_id}, stage {stage}, commit {commit_id[:8]}.")
+            return None
 
 
     def rollback_version(self, version: DataVersion):
     def rollback_version(self, version: DataVersion):
         """Remove a version and all its associated file records.
         """Remove a version and all its associated file records.

+ 6 - 1
app/services/webhook_service.py

@@ -104,8 +104,13 @@ class WebhookService:
 
 
             # Create version for this stage
             # Create version for this stage
             version = self.storage.create_version(
             version = self.storage.create_version(
-                project.id, stage_name, after_sha, author_name, manifest_content
+                project.id, stage_name, after_sha, author_name, manifest_snapshot=manifest_content
             )
             )
+            
+            if not version:
+                logger.info(f"Stage '{stage_name}' (commit {after_sha[:8]}) is already being processed. Skipping.")
+                continue
+                
             logger.info(f"Processing stage '{stage_name}' with {len(outputs)} output rules")
             logger.info(f"Processing stage '{stage_name}' with {len(outputs)} output rules")
 
 
             # Process outputs and check if any file actually changed
             # Process outputs and check if any file actually changed