|
@@ -0,0 +1,62 @@
|
|
|
+package com.tzld.piaoquan.content.understanding.rocketmq.consumer;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.alibaba.fastjson.TypeReference;
|
|
|
+import com.tzld.piaoquan.content.understanding.service.ContentService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
|
|
|
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
|
|
|
+import org.apache.rocketmq.client.apis.message.MessageId;
|
|
|
+import org.apache.rocketmq.client.apis.message.MessageView;
|
|
|
+import org.apache.rocketmq.client.core.RocketMQClientTemplate;
|
|
|
+import org.apache.rocketmq.client.core.RocketMQListener;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.boot.CommandLineRunner;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.nio.ByteBuffer;
|
|
|
+import java.nio.charset.Charset;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.time.Duration;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class OldVersionRemainVideoUnderstandingConsumerV2 implements CommandLineRunner {
|
|
|
+ @Autowired
|
|
|
+ private RocketMQClientTemplate rocketMQClientTemplate;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private ContentService contentService;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run(String... args) throws Exception {
|
|
|
+ while (true) {
|
|
|
+ final List<MessageView> messages = rocketMQClientTemplate.receive(10, Duration.ofSeconds(15));
|
|
|
+ log.info("Received {} message(s)", messages.size());
|
|
|
+ for (MessageView message : messages) {
|
|
|
+ log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
|
|
|
+ final MessageId messageId = message.getMessageId();
|
|
|
+ try {
|
|
|
+ ByteBuffer body = message.getBody();
|
|
|
+ Charset charset = StandardCharsets.UTF_8;
|
|
|
+ String messageStr = charset.decode(body).toString();
|
|
|
+ Map<String, Object> messageMap = JSONObject.parseObject(messageStr, new TypeReference<Map<String, Object>>() {});
|
|
|
+ Object videoIdObj = messageMap.get("videoId");
|
|
|
+ if (Objects.nonNull(videoIdObj)) {
|
|
|
+ Long videoId = Long.parseLong(videoIdObj.toString());
|
|
|
+ boolean flag = contentService.remainVideoUnderstandingHandler(videoId);
|
|
|
+ if (flag) {
|
|
|
+ rocketMQClientTemplate.ack(message);
|
|
|
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Throwable t) {
|
|
|
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|