ehlxr 1 year ago
parent
commit
461272f060
1 changed files with 4 additions and 2 deletions
  1. 4 2
      etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

+ 4 - 2
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -145,14 +145,16 @@ public class EtlMQConsumer {
 
                 messages.forEach(message -> {
                     log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
+                    handles.add(message.getReceiptHandle());
                     CrawlerVideoVO video = JSONObject.parseObject(message.getMessageBodyString(), CrawlerVideoVO.class);
                     priorityPool.execute(new RunnablePriority(video));
-                    handles.add(message.getReceiptHandle());
                 });
             } catch (Throwable e) {
                 log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
             }
-            consumer.ackMessage(handles);
+            if (!handles.isEmpty()) {
+                consumer.ackMessage(handles);
+            }
         } while (true);
     }
 }