|
|
@@ -218,14 +218,10 @@ class WebhookService:
|
|
|
|
|
|
async def _fetch_and_process_file(
|
|
|
self, version, file_path: str, output_config: dict,
|
|
|
- owner: str, repo_name: str, commit_id: str
|
|
|
+ owner: str, repo_name: str, commit_id: str,
|
|
|
+ processed_keys: set
|
|
|
) -> bool:
|
|
|
"""Get file SHA from Gogs and process a single changed file, plus its paired input if configured."""
|
|
|
- file_info = await self.gogs.get_file_info(owner, repo_name, commit_id, file_path)
|
|
|
- if not file_info:
|
|
|
- logger.info(f"File {file_path} not found at commit {commit_id[:8]} (removed). Skipping.")
|
|
|
- return False
|
|
|
-
|
|
|
# Calculate group_key here so both paired input and output can share it
|
|
|
directory_depth = output_config.get("directory_depth")
|
|
|
if directory_depth is not None and directory_depth > 0:
|
|
|
@@ -237,6 +233,17 @@ class WebhookService:
|
|
|
else:
|
|
|
group_key = os.path.dirname(file_path)
|
|
|
|
|
|
+ # Deduplicate API calls and DB entries across concurrently running tasks
|
|
|
+ task_key = (file_path, group_key)
|
|
|
+ if task_key in processed_keys:
|
|
|
+ return False
|
|
|
+ processed_keys.add(task_key)
|
|
|
+
|
|
|
+ file_info = await self.gogs.get_file_info(owner, repo_name, commit_id, file_path)
|
|
|
+ if not file_info:
|
|
|
+ logger.info(f"File {file_path} not found at commit {commit_id[:8]} (removed). Skipping.")
|
|
|
+ return False
|
|
|
+
|
|
|
has_change = await self.storage.process_file_with_sha(
|
|
|
version, file_path, file_info.get("sha"), owner, repo_name,
|
|
|
direction=output_config.get("direction"),
|
|
|
@@ -266,6 +273,12 @@ class WebhookService:
|
|
|
paired_path = None
|
|
|
|
|
|
if paired_path:
|
|
|
+ # Deduplicate paired input fetches
|
|
|
+ paired_task_key = (paired_path, group_key)
|
|
|
+ if paired_task_key in processed_keys:
|
|
|
+ continue
|
|
|
+ processed_keys.add(paired_task_key)
|
|
|
+
|
|
|
# Actively fetch paired file info from Gogs
|
|
|
paired_info = await self.gogs.get_file_info(owner, repo_name, commit_id, paired_path)
|
|
|
if paired_info:
|
|
|
@@ -308,8 +321,9 @@ class WebhookService:
|
|
|
logger.info(f"Matched {len(matched_files)} changed file(s) to output rules. Processing in parallel.")
|
|
|
|
|
|
# Step 2: Fetch file info + download/upload in parallel
|
|
|
+ processed_keys = set()
|
|
|
tasks = [
|
|
|
- self._fetch_and_process_file(version, fp, oc, owner, repo_name, commit_id)
|
|
|
+ self._fetch_and_process_file(version, fp, oc, owner, repo_name, commit_id, processed_keys)
|
|
|
for fp, oc in matched_files
|
|
|
]
|
|
|
|