|
@@ -1,52 +1,62 @@
|
|
|
package com.tzld.piaoquan.content.understanding.rocketmq.consumer;
|
|
|
|
|
|
-import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.alibaba.fastjson.TypeReference;
|
|
|
import com.tzld.piaoquan.content.understanding.service.ContentService;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
|
|
|
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
|
|
|
+import org.apache.rocketmq.client.apis.message.MessageId;
|
|
|
import org.apache.rocketmq.client.apis.message.MessageView;
|
|
|
+import org.apache.rocketmq.client.core.RocketMQClientTemplate;
|
|
|
import org.apache.rocketmq.client.core.RocketMQListener;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.boot.CommandLineRunner;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.charset.Charset;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
+import java.time.Duration;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Objects;
|
|
|
|
|
|
@Slf4j
|
|
|
@Service
|
|
|
-@RocketMQMessageListener(endpoints = "${demo.trans.rocketmq.endpoints:}", topic = "${demo.trans.rocketmq.topic:}",
|
|
|
- consumerGroup = "${demo.trans.rocketmq.consumer-group:}", tag = "${demo.trans.rocketmq.tag:}")
|
|
|
-public class OldVersionRemainVideoUnderstandingConsumer implements RocketMQListener {
|
|
|
+public class OldVersionRemainVideoUnderstandingConsumer implements CommandLineRunner {
|
|
|
+ @Autowired
|
|
|
+ private RocketMQClientTemplate rocketMQClientTemplate;
|
|
|
|
|
|
@Autowired
|
|
|
private ContentService contentService;
|
|
|
|
|
|
@Override
|
|
|
- public ConsumeResult consume(MessageView messageView) {
|
|
|
- log.info("consume message = {}", messageView);
|
|
|
- try {
|
|
|
- ByteBuffer body = messageView.getBody();
|
|
|
- Charset charset = StandardCharsets.UTF_8;
|
|
|
- String message = charset.decode(body).toString();
|
|
|
- Map<String, Object> messageMap = JSONObject.parseObject(message, new TypeReference<Map<String, Object>>() {});
|
|
|
- Object videoIdObj = messageMap.get("videoId");
|
|
|
- if (Objects.nonNull(videoIdObj)) {
|
|
|
- Long videoId = Long.parseLong(videoIdObj.toString());
|
|
|
- boolean flag = contentService.remainVideoUnderstandingHandler(videoId);
|
|
|
- if (!flag) {
|
|
|
- return ConsumeResult.FAILURE;
|
|
|
+ public void run(String... args) throws Exception {
|
|
|
+ while (true) {
|
|
|
+ final List<MessageView> messages = rocketMQClientTemplate.receive(10, Duration.ofSeconds(15));
|
|
|
+ log.info("Received {} message(s)", messages.size());
|
|
|
+ for (MessageView message : messages) {
|
|
|
+ log.info("receive message, topic:" + message.getTopic() + " messageId:" + message.getMessageId());
|
|
|
+ final MessageId messageId = message.getMessageId();
|
|
|
+ try {
|
|
|
+ ByteBuffer body = message.getBody();
|
|
|
+ Charset charset = StandardCharsets.UTF_8;
|
|
|
+ String messageStr = charset.decode(body).toString();
|
|
|
+ Map<String, Object> messageMap = JSONObject.parseObject(messageStr, new TypeReference<Map<String, Object>>() {});
|
|
|
+ Object videoIdObj = messageMap.get("videoId");
|
|
|
+ if (Objects.nonNull(videoIdObj)) {
|
|
|
+ Long videoId = Long.parseLong(videoIdObj.toString());
|
|
|
+ boolean flag = contentService.remainVideoUnderstandingHandler(videoId);
|
|
|
+ if (flag) {
|
|
|
+ rocketMQClientTemplate.ack(message);
|
|
|
+ log.info("Message is acknowledged successfully, messageId={}", messageId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Throwable t) {
|
|
|
+ log.error("Message is failed to be acknowledged, messageId={}", messageId, t);
|
|
|
}
|
|
|
}
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("consume error {}", e, messageView);
|
|
|
- return ConsumeResult.FAILURE;
|
|
|
}
|
|
|
- return ConsumeResult.SUCCESS;
|
|
|
}
|
|
|
}
|