supeng 3 тижнів тому
батько
коміт
83cc577ce5

+ 22 - 6
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/consumer/ContentUnderstandingPipelineTaskConsumer.java

@@ -14,6 +14,7 @@ import com.tzld.piaoquan.content.understanding.model.dto.TreeNode;
 import com.tzld.piaoquan.content.understanding.model.po.*;
 import com.tzld.piaoquan.content.understanding.service.ContentService;
 import com.tzld.piaoquan.content.understanding.service.PipelineService;
+import com.tzld.piaoquan.content.understanding.util.RedisUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
@@ -21,6 +22,7 @@ import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.rocketmq.client.core.RocketMQListener;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import java.nio.ByteBuffer;
@@ -53,6 +55,15 @@ public class ContentUnderstandingPipelineTaskConsumer implements RocketMQListene
     private CuPipelineMapper cuPipelineMapper;
     @Autowired
     private CuPipelineStepMapper cuPipelineStepMapper;
+    @Value("${same.video.understanding.skip.seconds:0L}")
+    private Long sameVideoUnderstandingSkipSeconds;
+    @Autowired
+    private RedisUtil redisUtil;
+    /**
+     * 相同视频跳过理解
+     * same:video:understanding:skip:{videoId}
+     */
+    private static final String SAME_VIDEO_SKIP_KEY = "same:video:understanding:skip:%s";
 
     @Override
     public ConsumeResult consume(MessageView messageView) {
@@ -75,13 +86,18 @@ public class ContentUnderstandingPipelineTaskConsumer implements RocketMQListene
                 log.info("数据无效 taskId = {} videoId = {}", taskId, videoId);
                 return ConsumeResult.SUCCESS;
             }
-            CuTask cuTask = list.get(0);
-            if (Objects.equals(cuTask.getTaskStatus(), TaskStatusEnum.SUCCESS.getValue())
-                    || Objects.equals(cuTask.getTaskStatus(), TaskStatusEnum.FAILURE.getValue())) {
-                //已完成的任务 消费掉消息
-                log.info("finish task consume taskId = {} videoId = {}", taskId, videoId);
-                return ConsumeResult.SUCCESS;
+            //相同视频跳过
+            if (Objects.nonNull(sameVideoUnderstandingSkipSeconds) && sameVideoUnderstandingSkipSeconds > 0) {
+                String key = String.format(SAME_VIDEO_SKIP_KEY, videoId);
+                Boolean flag = redisUtil.setNx(key, "1", sameVideoUnderstandingSkipSeconds);
+                if (Objects.isNull(flag) || !flag) {
+                    log.info("相同视频跳过理解 videoId = {} taskId = {} sameVideoUnderstandingSkipSeconds = {}", videoId, taskId, sameVideoUnderstandingSkipSeconds);
+                    String reason = "相同视频跳过理解,时间间隔:" + sameVideoUnderstandingSkipSeconds;
+                    pipelineService.updateTaskResult(taskId, null, reason, videoId, extMap, false);
+                    return ConsumeResult.SUCCESS;
+                }
             }
+            CuTask cuTask = list.get(0);
             Long pipelineId = cuTask.getPipelineId();
             String input = cuTask.getInput();
             if (input == null || input.isEmpty()) {