|
@@ -0,0 +1,62 @@
|
|
|
+package com.tzld.piaoquan.content.understanding.rocketmq.consumer;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.TypeReference;
|
|
|
+import com.tzld.piaoquan.content.understanding.service.ContentService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
|
|
|
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
|
|
|
+import org.apache.rocketmq.client.apis.message.MessageId;
|
|
|
+import org.apache.rocketmq.client.apis.message.MessageView;
|
|
|
+import org.apache.rocketmq.client.core.RocketMQListener;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.nio.charset.Charset;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 内容理解任务 消费者 延迟处理
|
|
|
+ *
|
|
|
+ * @author supeng
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+@RocketMQMessageListener(endpoints = "${rocketmq.consumer.endpoints:rmq-cn-vym47zv2i03-vpc.cn-hangzhou.rmq.aliyuncs.com:8080}",
|
|
|
+ topic = "${rocketmq.consumer.pipelinetask.delay.topic:topic_content_understanding_pipeline_task_delay}",
|
|
|
+ consumerGroup = "${rocketmq.consumer.pipelinetask.delay.group:group_content_understanding_pipeline_task_delay}", tag = "*", consumptionThreadCount = 2)
|
|
|
+public class ContentUnderstandingPipelingTaskDelayConsumer implements RocketMQListener {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ContentService contentService;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ConsumeResult consume(MessageView messageView) {
|
|
|
+ log.info("Receive message {}", messageView);
|
|
|
+ MessageId messageId = messageView.getMessageId();
|
|
|
+ try {
|
|
|
+ ByteBuffer body = messageView.getBody();
|
|
|
+ Charset charset = StandardCharsets.UTF_8;
|
|
|
+ String messageStr = charset.decode(body).toString();
|
|
|
+ Map<String, Object> messageMap = JSON.parseObject(messageStr, new TypeReference<Map<String, Object>>() {
|
|
|
+ });
|
|
|
+ Object videoIdObj = messageMap.get("videoId");
|
|
|
+ if (Objects.isNull(videoIdObj)) {
|
|
|
+ //错误消息 清理掉
|
|
|
+ return ConsumeResult.SUCCESS;
|
|
|
+ }
|
|
|
+ Long videoId = Long.parseLong(videoIdObj.toString());
|
|
|
+ boolean flag = contentService.remainVideoUnderstandingHandler(videoId);
|
|
|
+ if (flag) {
|
|
|
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
|
|
|
+ return ConsumeResult.SUCCESS;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, e);
|
|
|
+ }
|
|
|
+ return ConsumeResult.FAILURE;
|
|
|
+ }
|
|
|
+}
|