supeng преди 6 месеца
родител
ревизия
7a2d67f67f

+ 21 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/model/dto/PipelineTaskMessageDTO.java

@@ -0,0 +1,21 @@
+package com.tzld.piaoquan.content.understanding.model.dto;
+
+import lombok.Data;
+
+import java.util.Map;
+
+/**
+ * @author supeng
+ */
+@Data
+public class PipelineTaskMessageDTO {
+    private Long videoId;
+    /**
+     * 任务ID
+     */
+    private String taskId;
+    /**
+     * 扩展字段
+     */
+    private Map<String, Object> extMap;
+}

+ 0 - 11
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/model/param/SubmitTasksParam.java

@@ -11,16 +11,5 @@ import java.util.List;
  */
 @Data
 public class SubmitTasksParam {
-//    /**
-//     * 内容理解维度
-//     *
-//     * @see com.tzld.piaoquan.content.understanding.common.enums.ContentTypeEnum
-//     */
-//    private List<Integer> contentTypes;
-//    /**
-//     * 内容信息
-//     */
-//    private ContentInfoDTO contentInfo;
-
     private List<ContentDTO> contents;
 }

+ 75 - 21
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/consumer/ContentUnderstandingPipelineTaskConsumer.java

@@ -2,7 +2,18 @@ package com.tzld.piaoquan.content.understanding.rocketmq.consumer;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.TypeReference;
+import com.tzld.piaoquan.content.understanding.common.enums.ExceptionEnum;
+import com.tzld.piaoquan.content.understanding.common.enums.TaskStatusEnum;
+import com.tzld.piaoquan.content.understanding.common.exception.CommonException;
+import com.tzld.piaoquan.content.understanding.dao.mapper.CuPipelineMapper;
+import com.tzld.piaoquan.content.understanding.dao.mapper.CuPipelineStepMapper;
+import com.tzld.piaoquan.content.understanding.dao.mapper.CuTaskMapper;
+import com.tzld.piaoquan.content.understanding.model.dto.ContentAnalyseDTO;
+import com.tzld.piaoquan.content.understanding.model.dto.PipelineTaskMessageDTO;
+import com.tzld.piaoquan.content.understanding.model.dto.TreeNode;
+import com.tzld.piaoquan.content.understanding.model.po.*;
 import com.tzld.piaoquan.content.understanding.service.ContentService;
+import com.tzld.piaoquan.content.understanding.service.PipelineService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
@@ -15,6 +26,7 @@ import org.springframework.stereotype.Component;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -33,30 +45,72 @@ public class ContentUnderstandingPipelineTaskConsumer implements RocketMQListene
     @Autowired
     private ContentService contentService;
 
+    @Autowired
+    private PipelineService pipelineService;
+
+    @Autowired
+    private CuTaskMapper cuTaskMapper;
+
+    @Autowired
+    private CuPipelineMapper cuPipelineMapper;
+    @Autowired
+    private CuPipelineStepMapper cuPipelineStepMapper;
+
     @Override
     public ConsumeResult consume(MessageView messageView) {
         log.info("Receive message {}", messageView);
-//        MessageId messageId = messageView.getMessageId();
-//        try {
-//            ByteBuffer body = messageView.getBody();
-//            Charset charset = StandardCharsets.UTF_8;
-//            String messageStr = charset.decode(body).toString();
-//            Map<String, Object> messageMap = JSON.parseObject(messageStr, new TypeReference<Map<String, Object>>() {
-//            });
-//            Object videoIdObj = messageMap.get("videoId");
-//            if (Objects.isNull(videoIdObj)) {
-//                //错误消息 清理掉
-//                return ConsumeResult.SUCCESS;
-//            }
-//            Long videoId = Long.parseLong(videoIdObj.toString());
-//            boolean flag = contentService.remainVideoUnderstandingHandler(videoId);
-//            if (flag) {
-//                log.info("Message is acknowledged successfully, messageId={}", messageId);
-//                return ConsumeResult.SUCCESS;
-//            }
-//        } catch (Exception e) {
-//            log.error("Message is failed to be acknowledged, messageId={}", messageId, e);
-//        }
+        MessageId messageId = messageView.getMessageId();
+        try {
+            ByteBuffer body = messageView.getBody();
+            Charset charset = StandardCharsets.UTF_8;
+            String messageStr = charset.decode(body).toString();
+            PipelineTaskMessageDTO messageDTO = JSON.parseObject(messageStr, PipelineTaskMessageDTO.class);
+            String taskId = messageDTO.getTaskId();
+            Long videoId = messageDTO.getVideoId();
+            Map<String, Object> extMap = messageDTO.getExtMap();
+            CuTaskExample example = new CuTaskExample();
+            example.createCriteria().andTaskIdEqualTo(taskId).andTaskStatusEqualTo(TaskStatusEnum.SUBMIT.getValue());
+            List<CuTask> list = cuTaskMapper.selectByExample(example);
+            if (Objects.isNull(list) || list.isEmpty()) {
+                //无需处理
+                log.info("task 数据无效 taskId = {}", taskId);
+                return ConsumeResult.SUCCESS;
+            }
+            CuTask cuTask = list.get(0);
+            Long pipelineId = cuTask.getPipelineId();
+            String input = cuTask.getInput();
+            if (input == null || input.isEmpty()) {
+                //没有输入内容
+                return ConsumeResult.SUCCESS;
+            }
+            CuPipeline cuPipeline = cuPipelineMapper.selectByPrimaryKey(pipelineId);
+            if (Objects.isNull(cuPipeline)) {
+                //pipeline 无数据
+                log.info("pipeline 数据无效 pipelineId = {}", pipelineId);
+                return ConsumeResult.SUCCESS;
+            }
+            // 2.获取pipeline配置
+            Integer contentType = cuPipeline.getContentType();
+            CuPipelineStepExample stepExample = new CuPipelineStepExample();
+            stepExample.createCriteria().andPipelineIdEqualTo(pipelineId);
+            List<CuPipelineStep> stepList = cuPipelineStepMapper.selectByExample(stepExample);
+            if (Objects.isNull(stepList) || stepList.isEmpty()) {
+                return ConsumeResult.SUCCESS;
+            }
+            // 3.构建tree结构
+            TreeNode root = pipelineService.buildTree(stepList);
+            root.setType(contentType);
+            root.setInput(input);
+            ContentAnalyseDTO dto1 = new ContentAnalyseDTO();
+            dto1.setTaskId(taskId);
+            dto1.setVideoId(videoId);
+            dto1.setContent(input);
+            dto1.setExtMap(extMap);
+            // 4.按照步骤执行pipeline每一步动作
+            pipelineService.executeTaskTreeNodeBFS(root, dto1);
+        } catch (Exception e) {
+            log.error("Message is failed to be acknowledged, messageId={}", messageId, e);
+        }
         return ConsumeResult.FAILURE;
     }
 }

+ 4 - 3
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/producer/ContentUnderstandingPipelineTaskProducer.java

@@ -1,6 +1,7 @@
 package com.tzld.piaoquan.content.understanding.rocketmq.producer;
 
 import com.alibaba.fastjson.JSON;
+import com.tzld.piaoquan.content.understanding.model.dto.PipelineTaskMessageDTO;
 import com.tzld.piaoquan.content.understanding.util.RetryUtil;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
@@ -26,11 +27,11 @@ public class ContentUnderstandingPipelineTaskProducer {
     @Value("${rocketmq.producer.pipelinetask.topic:topic_content_understanding_pipeline_task}")
     private String pipelineTaskTopic;
 
-    public MessageId syncSendNormalMessage(Map<String, Object> messageMap) {
+    public MessageId syncSendNormalMessage(PipelineTaskMessageDTO messageDTO) {
         return RetryUtil.executeWithRetry(() -> {
-            SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(pipelineTaskTopic, JSON.toJSONString(messageMap));
+            SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(pipelineTaskTopic, JSON.toJSONString(messageDTO));
             if (Objects.isNull(sendReceipt) || Objects.isNull(sendReceipt.getMessageId())) {
-                throw new RuntimeException("ContentUnderstandingPipelineTaskProducer syncSendNormalMessage failure messageMap = " + messageMap);
+                throw new RuntimeException("ContentUnderstandingPipelineTaskProducer syncSendNormalMessage failure messageDTO = " + messageDTO);
             }
             return sendReceipt.getMessageId();
         }, "ContentUnderstandingPipelineTaskProducer");

+ 22 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/PipelineService.java

@@ -1,7 +1,12 @@
 package com.tzld.piaoquan.content.understanding.service;
 
+import com.tzld.piaoquan.content.understanding.model.dto.ContentAnalyseDTO;
 import com.tzld.piaoquan.content.understanding.model.dto.ContentDTO;
+import com.tzld.piaoquan.content.understanding.model.dto.TreeNode;
 import com.tzld.piaoquan.content.understanding.model.param.ContentAnalyseParam;
+import com.tzld.piaoquan.content.understanding.model.po.CuPipelineStep;
+
+import java.util.List;
 
 /**
  * @author supeng
@@ -16,4 +21,21 @@ public interface PipelineService {
      * @return taskId
      */
     String executeTask(ContentDTO contentDTO);
+
+    TreeNode buildTree(List<CuPipelineStep> stepList);
+
+    /**
+     * 遍历执行
+     *
+     * @param root 根节点
+     * @param dto  数据
+     */
+    void executeTaskTreeNodeBFS(TreeNode root, ContentAnalyseDTO dto);
+
+    /**
+     * 提交任务
+     * @param contentDTO
+     * @return
+     */
+    String submitTask(ContentDTO contentDTO);
 }

+ 35 - 1
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/service/impl/PipelineServiceImpl.java

@@ -12,14 +12,17 @@ import com.tzld.piaoquan.content.understanding.dao.mapper.CuPromptMapper;
 import com.tzld.piaoquan.content.understanding.dao.mapper.CuTaskMapper;
 import com.tzld.piaoquan.content.understanding.model.dto.ContentAnalyseDTO;
 import com.tzld.piaoquan.content.understanding.model.dto.ContentDTO;
+import com.tzld.piaoquan.content.understanding.model.dto.PipelineTaskMessageDTO;
 import com.tzld.piaoquan.content.understanding.model.dto.TreeNode;
 import com.tzld.piaoquan.content.understanding.model.param.ActionParam;
 import com.tzld.piaoquan.content.understanding.model.param.ContentAnalyseParam;
 import com.tzld.piaoquan.content.understanding.model.po.*;
+import com.tzld.piaoquan.content.understanding.rocketmq.producer.ContentUnderstandingPipelineTaskProducer;
 import com.tzld.piaoquan.content.understanding.service.Action;
 import com.tzld.piaoquan.content.understanding.service.LoghubService;
 import com.tzld.piaoquan.content.understanding.service.PipelineService;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.apis.message.MessageId;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -66,6 +69,9 @@ public class PipelineServiceImpl implements PipelineService {
     @Value("${aliyun.log.logstore.result}")
     private String aliyunLogStoreResult;
 
+    @Autowired
+    private ContentUnderstandingPipelineTaskProducer contentUnderstandingPipelineTaskProducer;
+
     /**
      * 线程池队列大小
      */
@@ -126,7 +132,6 @@ public class PipelineServiceImpl implements PipelineService {
         cuTask.setPipelineId(pipelineId);
         cuTask.setInput(content);
         int insertCount = cuTaskMapper.insertSelective(cuTask);
-//        String reason = "";
         if (insertCount <= 0) {
             throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "新增任务失败");
         }
@@ -158,6 +163,34 @@ public class PipelineServiceImpl implements PipelineService {
         return taskId;
     }
 
+    @Override
+    public String submitTask(ContentDTO contentDTO) {
+        // 1.数据准备
+        Long pipelineId = contentDTO.getPipelineId();
+        String content = contentDTO.getContent();
+        // 1.新增任务
+        String taskId = UUID.randomUUID().toString().replace("-", "") + System.currentTimeMillis();
+        CuTask cuTask = new CuTask();
+        cuTask.setTaskId(taskId);
+        cuTask.setPipelineId(pipelineId);
+        cuTask.setInput(content);
+        int insertCount = cuTaskMapper.insertSelective(cuTask);
+        if (insertCount <= 0) {
+            throw new CommonException(ExceptionEnum.SYSTEM_ERROR, "新增任务失败");
+        }
+        PipelineTaskMessageDTO messageDTO = new PipelineTaskMessageDTO();
+        messageDTO.setTaskId(taskId);
+        messageDTO.setExtMap(contentDTO.getExtMap());
+        try {
+            MessageId messageId = contentUnderstandingPipelineTaskProducer.syncSendNormalMessage(messageDTO);
+            log.info("syncSendNormalMessage messageId = {}", messageId);
+        } catch (Exception e) {
+            log.error("syncSendNormalMessage error messageDTO = {}", e, messageDTO);
+        }
+        return taskId;
+    }
+
+    @Override
     public TreeNode buildTree(List<CuPipelineStep> stepList) {
         Map<Long, TreeNode> nodeMap = new HashMap<>();
         TreeNode root = null;
@@ -237,6 +270,7 @@ public class PipelineServiceImpl implements PipelineService {
      * @param root 根节点
      * @param dto  数据
      */
+    @Override
     public void executeTaskTreeNodeBFS(TreeNode root, ContentAnalyseDTO dto) {
         if (Objects.isNull(root) || Objects.isNull(dto) || Objects.isNull(dto.getVideoId())) {
             return;