Преглед на файлове

optimize code and add log

supeng преди 9 месеца
родител
ревизия
e27c0e3f0f
променени са 1 файла, в които са добавени 51 реда и са изтрити 9 реда
  1. 51 9
      etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

+ 51 - 9
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -27,6 +27,7 @@ package com.tzld.crawler.etl.mq;
 import com.alibaba.fastjson.JSONObject;
 import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
+import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;
 import com.tzld.crawler.etl.model.vo.CrawlerEtlParam;
 import com.tzld.crawler.etl.service.EtlService;
@@ -36,6 +37,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -83,24 +85,64 @@ public class EtlMQConsumer {
 
     private void consumeMsg() {
         do {
+            List<Message> messages = null;
             try {
-                List<Message> messages = consumer.consumeMessage(1, 10);
-                if (messages == null || messages.isEmpty()) {
-                    log.info("No new message, continue");
-                    continue;
-                }
-
-                messages.forEach(message -> {
+                messages = consumer.consumeMessage(1, 3);
+            } catch (Exception e) {
+                log.error("consumeMessage error", e);
+            }
+            if (messages == null || messages.isEmpty()) {
+                log.info("No new message, continue");
+                continue;
+            }
+            List<String> handles = new ArrayList<>();
+            for (Message message : messages) {
+                try {
                     log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
                     CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
                     param.setMessageId(message.getMessageId());
                     etlService.deal(param);
                     log.info("Deal done of message: {} from topic: {}, group: {}", message, topic, groupId);
                     consumer.ackMessage(Collections.singletonList(message.getReceiptHandle()));
-                });
+                } catch (Exception e) {
+                    log.error("Consume message from topic: {}, group: {} message: {} error", topic, groupId, message, e);
+                } finally {
+                    //处理失败暂不重复处理
+                    handles.add(message.getReceiptHandle());
+                }
+            }
+
+            try {
+                consumer.ackMessage(handles);
             } catch (Throwable e) {
-                log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
+                // 某些消息的句柄可能超时了会导致确认不成功。
+                if (e instanceof AckMessageException) {
+                    AckMessageException errors = (AckMessageException) e;
+                    log.error("Ack message fail, requestId is: {}, fail handles: {} errorMessage:{} ", errors.getRequestId(), handles, errors.getErrorMessages(), e);
+                    continue;
+                }
+                log.error("Ack message fail handles: {}", handles, e);
+
             }
+
+//            try {
+//                List<Message> messages = consumer.consumeMessage(1, 10);
+//                if (messages == null || messages.isEmpty()) {
+//                    log.info("No new message, continue");
+//                    continue;
+//                }
+//
+//                messages.forEach(message -> {
+//                    log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
+//                    CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
+//                    param.setMessageId(message.getMessageId());
+//                    etlService.deal(param);
+//                    log.info("Deal done of message: {} from topic: {}, group: {}", message, topic, groupId);
+//                    consumer.ackMessage(Collections.singletonList(message.getReceiptHandle()));
+//                });
+//            } catch (Throwable e) {
+//                log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
+//            }
         } while (true);
     }
 }