Просмотр исходного кода

feat:优化匹配逻辑,提升效率

tanjingyu 4 недель назад
Родитель
Сommit
43e1d20f2e
2 измененных файлов с 89 добавлено и 57 удалено
  1. 56 23
      app/services/gogs_client.py
  2. 33 34
      app/services/webhook_service.py

+ 56 - 23
app/services/gogs_client.py

@@ -30,41 +30,74 @@ class GogsClient:
             resp.raise_for_status()
             return resp.json()
 
-    async def get_recursive_tree(self, owner: str, repo: str, commit_id: str) -> dict:
-        """Get recursive tree by manually traversing subdirectories.
+    async def get_file_info(self, owner: str, repo: str, commit_id: str, file_path: str) -> dict | None:
+        """Get single file info including SHA.
 
-        Gogs API doesn't support recursive parameter, so we need to
-        manually traverse each subdirectory.
+        Returns dict with 'sha', 'size', 'path' or None if not found.
         """
-        all_items = []
+        url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}?ref={commit_id}"
+        try:
+            async with httpx.AsyncClient() as client:
+                resp = await client.get(url, headers=self.headers)
+                if resp.status_code == 404:
+                    return None
+                resp.raise_for_status()
+                data = resp.json()
+                # contents API returns file info directly for single file
+                if isinstance(data, dict) and data.get("type") == "file":
+                    return {
+                        "path": file_path,
+                        "sha": data.get("sha"),
+                        "size": data.get("size", 0),
+                        "type": "blob"
+                    }
+                return None
+        except httpx.HTTPStatusError as e:
+            logger.error(f"Failed to get file info for {file_path}: {e}")
+            return None
+
+    async def get_directory_tree(self, owner: str, repo: str, commit_id: str, dir_path: str) -> list:
+        """Get all files under a specific directory (recursive).
 
-        async def fetch_tree(tree_sha: str, prefix: str = ""):
-            """Recursively fetch tree contents."""
-            url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/git/trees/{tree_sha}"
+        Args:
+            dir_path: Directory path without trailing slash (e.g., "data/output")
+
+        Returns:
+            List of file info dicts with 'path', 'sha', 'size', 'type'
+        """
+        all_files = []
+
+        async def fetch_contents(path: str):
+            """Recursively fetch directory contents using contents API."""
+            url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={commit_id}"
             try:
                 async with httpx.AsyncClient() as client:
                     resp = await client.get(url, headers=self.headers)
+                    if resp.status_code == 404:
+                        logger.warning(f"Directory not found: {path}")
+                        return
                     resp.raise_for_status()
                     data = resp.json()
 
-                    for item in data.get("tree", []):
-                        # Build full path
-                        full_path = f"{prefix}{item['path']}" if prefix else item['path']
-                        item_copy = item.copy()
-                        item_copy['path'] = full_path
-                        all_items.append(item_copy)
-
-                        # Recursively fetch subdirectories
-                        if item['type'] == 'tree':
-                            await fetch_tree(item['sha'], f"{full_path}/")
+                    # contents API returns list for directories
+                    if isinstance(data, list):
+                        for item in data:
+                            if item.get("type") == "file":
+                                all_files.append({
+                                    "path": item.get("path"),
+                                    "sha": item.get("sha"),
+                                    "size": item.get("size", 0),
+                                    "type": "blob"
+                                })
+                            elif item.get("type") == "dir":
+                                # Recursively fetch subdirectory
+                                await fetch_contents(item.get("path"))
 
             except httpx.HTTPStatusError as e:
-                logger.error(f"Failed to get tree {tree_sha}: {e}")
-
-        # Start from the commit SHA
-        await fetch_tree(commit_id)
+                logger.error(f"Failed to get contents for {path}: {e}")
 
-        return {"tree": all_items}
+        await fetch_contents(dir_path)
+        return all_files
 
     async def get_file_content(self, owner: str, repo: str, commit_id: str, file_path: str) -> bytes:
         """Download raw file content."""

+ 33 - 34
app/services/webhook_service.py

@@ -69,19 +69,7 @@ class WebhookService:
         # 4. Get or create project
         project = self.storage.get_or_create_project(project_name)
 
-        # 5. Get file tree (once for all stages)
-        try:
-            tree_data = await self.gogs.get_recursive_tree(owner, repo_name, after_sha)
-            tree_files = {
-                item['path']: item
-                for item in tree_data.get('tree', [])
-                if item['type'] == 'blob'
-            }
-        except Exception as e:
-            logger.error(f"Failed to get file tree: {e}", exc_info=True)
-            return
-
-        # 6. Process stages (support both old and new format)
+        # 5. Process stages (support both old and new format)
         stages = manifest.get("stages", [])
 
         # Backward compatibility: old single-stage format
@@ -120,13 +108,13 @@ class WebhookService:
             )
             logger.info(f"Processing stage '{stage_name}' with {len(outputs)} output rules")
 
-            # Process outputs for this stage
-            await self._process_outputs(version, outputs, tree_files, owner, repo_name)
+            # Process outputs for this stage (fetch only needed files)
+            await self._process_outputs(version, outputs, owner, repo_name, after_sha)
 
     async def _process_outputs(
-        self, version, outputs: list, tree_files: dict, owner: str, repo_name: str
+        self, version, outputs: list, owner: str, repo_name: str, commit_id: str
     ):
-        """Process output rules and download matching files."""
+        """Process output rules and download matching files (on-demand fetching)."""
         for output in outputs:
             raw_path_pattern = output.get("path", "")
             match_pattern = output.get("pattern", "*")
@@ -134,24 +122,35 @@ class WebhookService:
             path_pattern = normalize_path(raw_path_pattern)
             is_dir = is_directory_pattern(raw_path_pattern)
 
-            for file_path, file_info in tree_files.items():
-                is_match = False
-
-                if is_dir:
-                    dir_prefix = path_pattern.rstrip("/") + "/"
-                    if file_path.startswith(dir_prefix):
-                        rel_name = file_path[len(dir_prefix):]
-                        if rel_name and fnmatch.fnmatch(rel_name, match_pattern):
-                            is_match = True
-                else:
-                    if file_path == path_pattern:
-                        is_match = True
-
-                if is_match:
+            if is_dir:
+                # Directory pattern: fetch only this directory's files
+                dir_path = path_pattern.rstrip("/")
+                logger.info(f"Fetching directory: {dir_path} with pattern: {match_pattern}")
+
+                files = await self.gogs.get_directory_tree(owner, repo_name, commit_id, dir_path)
+
+                for file_info in files:
+                    file_path = file_info.get("path")
+                    # Apply pattern matching
+                    rel_name = file_path[len(dir_path) + 1:] if file_path.startswith(dir_path + "/") else file_path
+                    if fnmatch.fnmatch(rel_name, match_pattern):
+                        try:
+                            await self.storage.process_file_with_sha(
+                                version, file_path, file_info.get("sha"), owner, repo_name
+                            )
+                        except Exception as e:
+                            logger.error(f"Failed to process file {file_path}: {e}")
+            else:
+                # Single file: fetch only this file's info
+                logger.info(f"Fetching single file: {path_pattern}")
+
+                file_info = await self.gogs.get_file_info(owner, repo_name, commit_id, path_pattern)
+                if file_info:
                     try:
-                        file_sha = file_info.get("sha")
                         await self.storage.process_file_with_sha(
-                            version, file_path, file_sha, owner, repo_name
+                            version, path_pattern, file_info.get("sha"), owner, repo_name
                         )
                     except Exception as e:
-                        logger.error(f"Failed to process file {file_path}: {e}")
+                        logger.error(f"Failed to process file {path_pattern}: {e}")
+                else:
+                    logger.warning(f"File not found: {path_pattern}")