supeng 1 day ago
parent
commit
367445d382

+ 62 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/consumer/ContentUnderstandingPipelineTaskConsumer.java

@@ -0,0 +1,62 @@
+package com.tzld.piaoquan.content.understanding.rocketmq.consumer;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+import com.tzld.piaoquan.content.understanding.service.ContentService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * 旧版视频理解 需要延迟处理的视频 消费者
+ *
+ * @author supeng
+ */
+@Slf4j
+@Component
+@RocketMQMessageListener(endpoints = "${rocketmq.consumer.endpoints:rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080}",
+        topic = "${rocketmq.consumer.pipelinetask.topic:topic_content_understanding_pipeline_task}",
+        consumerGroup = "${rocketmq.consumer.pipelinetask.group:group_content_understanding_pipeline_task}", tag = "*")
+public class ContentUnderstandingPipelineTaskConsumer implements RocketMQListener {
+
+    @Autowired
+    private ContentService contentService;
+
+    @Override
+    public ConsumeResult consume(MessageView messageView) {
+        log.info("Receive message {}", messageView);
+//        MessageId messageId = messageView.getMessageId();
+//        try {
+//            ByteBuffer body = messageView.getBody();
+//            Charset charset = StandardCharsets.UTF_8;
+//            String messageStr = charset.decode(body).toString();
+//            Map<String, Object> messageMap = JSON.parseObject(messageStr, new TypeReference<Map<String, Object>>() {
+//            });
+//            Object videoIdObj = messageMap.get("videoId");
+//            if (Objects.isNull(videoIdObj)) {
+//                //错误消息 清理掉
+//                return ConsumeResult.SUCCESS;
+//            }
+//            Long videoId = Long.parseLong(videoIdObj.toString());
+//            boolean flag = contentService.remainVideoUnderstandingHandler(videoId);
+//            if (flag) {
+//                log.info("Message is acknowledged successfully, messageId={}", messageId);
+//                return ConsumeResult.SUCCESS;
+//            }
+//        } catch (Exception e) {
+//            log.error("Message is failed to be acknowledged, messageId={}", messageId, e);
+//        }
+        return ConsumeResult.FAILURE;
+    }
+}

+ 3 - 2
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/consumer/OldVersionRemainVideoUnderstandingConsumer.java

@@ -26,8 +26,9 @@ import java.util.Objects;
  */
  */
 @Slf4j
 @Slf4j
 @Component
 @Component
-@RocketMQMessageListener(endpoints = "${rocketmq.consumer.endpoints:rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080}", topic = "${rocketmq.consumer.oldversion.topic:topic_old_version_remain_video_understanding_test}",
-        consumerGroup = "${rocketmq.consumer.oldversion.group:group_old_version_remain_video_understanding_test}", tag = "*")
+@RocketMQMessageListener(endpoints = "${rocketmq.consumer.endpoints:rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080}",
+        topic = "${rocketmq.consumer.oldversion.topic:topic_old_version_remain_video_understanding}",
+        consumerGroup = "${rocketmq.consumer.oldversion.group:group_old_version_remain_video_understanding}", tag = "*")
 public class OldVersionRemainVideoUnderstandingConsumer implements RocketMQListener {
 public class OldVersionRemainVideoUnderstandingConsumer implements RocketMQListener {
 
 
     @Autowired
     @Autowired

+ 38 - 0
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/producer/ContentUnderstandingPipelineTaskProducer.java

@@ -0,0 +1,38 @@
+package com.tzld.piaoquan.content.understanding.rocketmq.producer;
+
+import com.alibaba.fastjson.JSON;
+import com.tzld.piaoquan.content.understanding.util.RetryUtil;
+import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.core.RocketMQClientTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * 内容理解任务 任务 生产者
+ *
+ * @author supeng
+ */
+@Component
+public class ContentUnderstandingPipelineTaskProducer {
+    @Autowired
+    private RocketMQClientTemplate rocketMQClientTemplate;
+
+    @Value("${rocketmq.producer.pipelinetask.topic:topic_content_understanding_pipeline_task}")
+    private String pipelineTaskTopic;
+
+    public MessageId syncSendNormalMessage(Map<String, Object> messageMap) {
+        return RetryUtil.executeWithRetry(() -> {
+            SendReceipt sendReceipt = rocketMQClientTemplate.syncSendNormalMessage(pipelineTaskTopic, JSON.toJSONString(messageMap));
+            if (Objects.isNull(sendReceipt) || Objects.isNull(sendReceipt.getMessageId())) {
+                throw new RuntimeException("ContentUnderstandingPipelineTaskProducer syncSendNormalMessage failure messageMap = " + messageMap);
+            }
+            return sendReceipt.getMessageId();
+        }, "ContentUnderstandingPipelineTaskProducer");
+    }
+}

+ 1 - 1
content-understanding-core/src/main/java/com/tzld/piaoquan/content/understanding/rocketmq/producer/OldVersionRemainVideoUnderstandingProducer.java

@@ -33,6 +33,6 @@ public class OldVersionRemainVideoUnderstandingProducer {
                 throw new RuntimeException("OldVersionRemainVideoUnderstandingProducer syncSendDelayMessage failure messageMap = " + messageMap);
                 throw new RuntimeException("OldVersionRemainVideoUnderstandingProducer syncSendDelayMessage failure messageMap = " + messageMap);
             }
             }
             return sendReceipt.getMessageId();
             return sendReceipt.getMessageId();
-        }, "oldVersionRemainVideoSendDelayMessage");
+        }, "OldVersionRemainVideoUnderstandingProducer");
     }
     }
 }
 }

+ 6 - 1
content-understanding-server/src/main/resources/application-dev.yml

@@ -81,7 +81,12 @@ rocketmq:
     oldversion:
     oldversion:
       topic: topic_old_version_remain_video_understanding_test
       topic: topic_old_version_remain_video_understanding_test
       group: group_old_version_remain_video_understanding_test
       group: group_old_version_remain_video_understanding_test
+    pipelinetask:
+      topic: topic_content_understanding_pipeline_task_test
+      group: group_content_understanding_pipeline_task_test
   producer:
   producer:
     endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
     endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
     oldversion:
     oldversion:
-      topic: topic_old_version_remain_video_understanding_test
+      topic: topic_old_version_remain_video_understanding_test
+    pipelinetask:
+      topic: topic_content_understanding_pipeline_task_test

+ 6 - 1
content-understanding-server/src/main/resources/application-pre.yml

@@ -82,7 +82,12 @@ rocketmq:
     oldversion:
     oldversion:
       topic: topic_old_version_remain_video_understanding_pre
       topic: topic_old_version_remain_video_understanding_pre
       group: group_old_version_remain_video_understanding_pre
       group: group_old_version_remain_video_understanding_pre
+    pipelinetask:
+      topic: topic_content_understanding_pipeline_task_pre
+      group: group_content_understanding_pipeline_task_pre
   producer:
   producer:
     endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
     endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
     oldversion:
     oldversion:
-      topic: topic_old_version_remain_video_understanding_pre
+      topic: topic_old_version_remain_video_understanding_pre
+    pipelinetask:
+      topic: topic_content_understanding_pipeline_task_pre

+ 6 - 1
content-understanding-server/src/main/resources/application-prod.yml

@@ -82,7 +82,12 @@ rocketmq:
     oldversion:
     oldversion:
       topic: topic_old_version_remain_video_understanding
       topic: topic_old_version_remain_video_understanding
       group: group_old_version_remain_video_understanding
       group: group_old_version_remain_video_understanding
+    pipelinetask:
+      topic: topic_content_understanding_pipeline_task
+      group: group_content_understanding_pipeline_task
   producer:
   producer:
     endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
     endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
     oldversion:
     oldversion:
-      topic: topic_old_version_remain_video_understanding
+      topic: topic_old_version_remain_video_understanding
+    pipelinetask:
+      topic: topic_content_understanding_pipeline_task

+ 6 - 1
content-understanding-server/src/main/resources/application-stress.yml

@@ -84,7 +84,12 @@ rocketmq:
     oldversion:
     oldversion:
       topic: topic_old_version_remain_video_understanding_test
       topic: topic_old_version_remain_video_understanding_test
       group: group_old_version_remain_video_understanding_test
       group: group_old_version_remain_video_understanding_test
+    pipelinetask:
+      topic: topic_content_understanding_pipeline_task_test
+      group: group_content_understanding_pipeline_task_test
   producer:
   producer:
     endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
     endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
     oldversion:
     oldversion:
-      topic: topic_old_version_remain_video_understanding_test
+      topic: topic_old_version_remain_video_understanding_test
+    pipelinetask:
+      topic: topic_content_understanding_pipeline_task_test

+ 6 - 1
content-understanding-server/src/main/resources/application-test.yml

@@ -84,7 +84,12 @@ rocketmq:
     oldversion:
     oldversion:
       topic: topic_old_version_remain_video_understanding_test
       topic: topic_old_version_remain_video_understanding_test
       group: group_old_version_remain_video_understanding_test
       group: group_old_version_remain_video_understanding_test
+    pipelinetask:
+      topic: topic_content_understanding_pipeline_task_test
+      group: group_content_understanding_pipeline_task_test
   producer:
   producer:
     endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
     endpoints: rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080
     oldversion:
     oldversion:
-      topic: topic_old_version_remain_video_understanding_test
+      topic: topic_old_version_remain_video_understanding_test
+    pipelinetask:
+      topic: topic_content_understanding_pipeline_task_test