Explorar el Código

etl consumer fixed thread

ehlxr hace 1 año
padre
commit
a123a6ea22

+ 2 - 6
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -36,7 +36,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -83,7 +83,6 @@ public class EtlMQConsumer {
 
     private void consumeMsg() {
         do {
-            List<String> handles = new ArrayList<>();
             try {
                 List<Message> messages = consumer.consumeMessage(1, 10);
                 if (messages == null || messages.isEmpty()) {
@@ -97,14 +96,11 @@ public class EtlMQConsumer {
                     param.setMessageId(message.getMessageId());
                     etlService.deal(param);
                     log.info("Deal done of message: {} from topic: {}, group: {}", message, topic, groupId);
-                    handles.add(message.getReceiptHandle());
+                    consumer.ackMessage(Collections.singletonList(message.getReceiptHandle()));
                 });
             } catch (Throwable e) {
                 log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
             }
-            if (!handles.isEmpty()) {
-                consumer.ackMessage(handles);
-            }
         } while (true);
     }
 }