Browse Source

optimize consumer

supeng 3 ngày trước cách đây
mục cha
commit
d2c4563f67

+ 11 - 3
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/consumer/ContentUnderstandingPipelineTaskConsumer.java

@@ -26,6 +26,7 @@ import org.springframework.stereotype.Component;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Objects;
@@ -71,20 +72,24 @@ public class ContentUnderstandingPipelineTaskConsumer implements RocketMQListene
             List<CuTask> list = cuTaskMapper.selectByExample(example);
             List<CuTask> list = cuTaskMapper.selectByExample(example);
             if (Objects.isNull(list) || list.isEmpty()) {
             if (Objects.isNull(list) || list.isEmpty()) {
                 //无需处理
                 //无需处理
-                log.info("task 数据无效 taskId = {}", taskId);
+                log.info("数据无效 taskId = {} videoId = {}", taskId, videoId);
                 return ConsumeResult.SUCCESS;
                 return ConsumeResult.SUCCESS;
             }
             }
             CuTask cuTask = list.get(0);
             CuTask cuTask = list.get(0);
             Long pipelineId = cuTask.getPipelineId();
             Long pipelineId = cuTask.getPipelineId();
             String input = cuTask.getInput();
             String input = cuTask.getInput();
             if (input == null || input.isEmpty()) {
             if (input == null || input.isEmpty()) {
-                //没有输入内容
+                log.info("输入内容为空 taskId = {} videoId = {} pipelineId = {}", taskId, videoId, pipelineId);
+                String reason = "输入内容为空";
+                pipelineService.updateTaskResult(taskId, null, reason, videoId, extMap, false);
                 return ConsumeResult.SUCCESS;
                 return ConsumeResult.SUCCESS;
             }
             }
             CuPipeline cuPipeline = cuPipelineMapper.selectByPrimaryKey(pipelineId);
             CuPipeline cuPipeline = cuPipelineMapper.selectByPrimaryKey(pipelineId);
             if (Objects.isNull(cuPipeline)) {
             if (Objects.isNull(cuPipeline)) {
                 //pipeline 无数据
                 //pipeline 无数据
-                log.info("pipeline 数据无效 pipelineId = {}", pipelineId);
+                log.info("输入内容为空 taskId = {} videoId = {} pipelineId = {}", taskId, videoId, pipelineId);
+                String reason = "pipelineId[" + pipelineId + "]无效";
+                pipelineService.updateTaskResult(taskId, null, reason, videoId, extMap, false);
                 return ConsumeResult.SUCCESS;
                 return ConsumeResult.SUCCESS;
             }
             }
             // 2.获取pipeline配置
             // 2.获取pipeline配置
@@ -93,6 +98,9 @@ public class ContentUnderstandingPipelineTaskConsumer implements RocketMQListene
             stepExample.createCriteria().andPipelineIdEqualTo(pipelineId);
             stepExample.createCriteria().andPipelineIdEqualTo(pipelineId);
             List<CuPipelineStep> stepList = cuPipelineStepMapper.selectByExample(stepExample);
             List<CuPipelineStep> stepList = cuPipelineStepMapper.selectByExample(stepExample);
             if (Objects.isNull(stepList) || stepList.isEmpty()) {
             if (Objects.isNull(stepList) || stepList.isEmpty()) {
+                log.info("输入内容为空 taskId = {} videoId = {} pipelineId = {}", taskId, videoId, pipelineId);
+                String reason = "pipelineId[" + pipelineId + "]无步骤配置";
+                pipelineService.updateTaskResult(taskId, null, reason, videoId, extMap, false);
                 return ConsumeResult.SUCCESS;
                 return ConsumeResult.SUCCESS;
             }
             }
             // 3.构建tree结构
             // 3.构建tree结构

+ 14 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/PipelineService.java

@@ -7,6 +7,7 @@ import com.tzld.piaoquan.content.understanding.model.param.ContentAnalyseParam;
 import com.tzld.piaoquan.content.understanding.model.po.CuPipelineStep;
 import com.tzld.piaoquan.content.understanding.model.po.CuPipelineStep;
 
 
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 
 
 /**
 /**
  * @author supeng
  * @author supeng
@@ -39,6 +40,19 @@ public interface PipelineService {
      */
      */
     void executeTaskTreeNodeBFS(TreeNode root, ContentAnalyseDTO dto);
     void executeTaskTreeNodeBFS(TreeNode root, ContentAnalyseDTO dto);
 
 
+
+    /**
+     * 更新任务
+     *
+     * @param taskId
+     * @param result
+     * @param reason
+     * @param videoId
+     * @param extMap
+     * @param isSuccess
+     */
+    void updateTaskResult(String taskId, String result, String reason, Long videoId, Map<String, Object> extMap, boolean isSuccess);
+
     /**
     /**
      * 提交任务
      * 提交任务
      *
      *

+ 34 - 37
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/PipelineServiceImpl.java

@@ -300,47 +300,12 @@ public class PipelineServiceImpl implements PipelineService {
                     //异常不阻塞其他分支执行
                     //异常不阻塞其他分支执行
                     try {
                     try {
                         //保存数据到mysql
                         //保存数据到mysql
-                        CuTaskExample example = new CuTaskExample();
-                        example.createCriteria().andTaskIdEqualTo(taskId);
-                        CuTask cuTask = new CuTask();
-                        cuTask.setOutput(result);
-                        cuTask.setTaskStatus(TaskStatusEnum.SUCCESS.getValue());
-                        int update = cuTaskMapper.updateByExampleSelective(cuTask, example);
-                        if (update <= 0) {
-                            log.error("step execute update error step = {}, result = {} update = {}", JSON.toJSONString(step), result, update);
-                        }
-                        Map<String, Object> logMap = new HashMap<>();
-                        logMap.put("video_id", videoId);
-                        logMap.put("data", result);
-                        logMap.put("status", "success");
-                        logMap.put("taskId", taskId);
-                        if (Objects.nonNull(dto.getExtMap())) {
-                            logMap.putAll(dto.getExtMap());
-                        }
-                        loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogStoreResult, "", logMap);
-
+                        updateTaskResult(taskId, result, reason, videoId, dto.getExtMap(), true);
                     } catch (Exception e) {
                     } catch (Exception e) {
                         log.error("step execute update error step = {}, result = {}", e, JSON.toJSONString(step), result);
                         log.error("step execute update error step = {}, result = {}", e, JSON.toJSONString(step), result);
                     }
                     }
                 } else {
                 } else {
-                    CuTaskExample example = new CuTaskExample();
-                    example.createCriteria().andTaskIdEqualTo(taskId);
-                    CuTask cuTask = new CuTask();
-                    cuTask.setReason(reason);
-                    cuTask.setTaskStatus(TaskStatusEnum.FAILURE.getValue());
-                    int update = cuTaskMapper.updateByExampleSelective(cuTask, example);
-                    if (update <= 0) {
-                        log.error("step execute update error step = {}, result = {} update = {}", JSON.toJSONString(step), result, update);
-                    }
-                    Map<String, Object> logMap = new HashMap<>();
-                    logMap.put("video_id", videoId);
-                    logMap.put("error_msg", reason);
-                    logMap.put("status", "failure");
-                    logMap.put("taskId", taskId);
-                    if (Objects.nonNull(dto.getExtMap())) {
-                        logMap.putAll(dto.getExtMap());
-                    }
-                    loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogStoreResult, "", logMap);
+                    updateTaskResult(taskId, result, reason, videoId, dto.getExtMap(), false);
                 }
                 }
                 continue;
                 continue;
             }
             }
@@ -351,6 +316,38 @@ public class PipelineServiceImpl implements PipelineService {
         }
         }
     }
     }
 
 
+    @Override
+    public void updateTaskResult(String taskId, String result, String reason, Long videoId, Map<String, Object> extMap, boolean isSuccess) {
+        CuTaskExample example = new CuTaskExample();
+        example.createCriteria().andTaskIdEqualTo(taskId);
+        CuTask cuTask = new CuTask();
+        if (isSuccess) {
+            cuTask.setOutput(result);
+            cuTask.setTaskStatus(TaskStatusEnum.SUCCESS.getValue());
+        } else {
+            cuTask.setReason(reason);
+            cuTask.setTaskStatus(TaskStatusEnum.FAILURE.getValue());
+        }
+        int updateCount = cuTaskMapper.updateByExampleSelective(cuTask, example);
+        if (updateCount <= 0) {
+            log.error("updateTaskResult execute update error taskId = {} videoId = {}, result = {} reason = {} updateCount = {}", taskId, videoId, result, reason, updateCount);
+        }
+        Map<String, Object> logMap = new HashMap<>();
+        logMap.put("video_id", videoId);
+        logMap.put("taskId", taskId);
+        if (isSuccess) {
+            logMap.put("data", result);
+            logMap.put("status", "success");
+        } else {
+            logMap.put("error_msg", reason);
+            logMap.put("status", "failure");
+        }
+        if (Objects.nonNull(extMap)) {
+            logMap.putAll(extMap);
+        }
+        loghubService.asyncSubmitLog(aliyunLogProject, aliyunLogStoreResult, "", logMap);
+    }
+
     /**
     /**
      * 执行step 动作
      * 执行step 动作
      *
      *