Ver código fonte

fixed task loss when priority queue is full

ehlxr 1 ano atrás
pai
commit
05c45a3198

+ 16 - 37
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -39,10 +39,11 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.*;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author ehlxr
@@ -88,31 +89,8 @@ public class EtlMQConsumer {
     }
 
     private void doConsume() {
-        // ack message
-        List<Future<String>> futures = new ArrayList<>();
-        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
-        service.scheduleAtFixedRate(() -> {
-            List<String> handles = new ArrayList<>();
-
-            // Iterate over the futures list using a traditional loop to prevent ConcurrentModificationExceptions
-            for (Iterator<Future<String>> iterator = futures.iterator(); iterator.hasNext(); ) {
-                Future<String> future = iterator.next();
-                if (future.isDone()) {
-                    try {
-                        handles.add(future.get(10, TimeUnit.MICROSECONDS));
-                        iterator.remove();
-                    } catch (Exception e) {
-                        log.warn("Exception occurred during future.get(): {}", e.getMessage());
-                    }
-                }
-            }
-
-            if (!handles.isEmpty()) {
-                consumer.ackMessage(handles);
-            }
-        }, 0, 1, TimeUnit.SECONDS);
-
         do {
+            List<String> handles = new ArrayList<>();
             try {
                 List<Message> messages = consumer.consumeMessage(1, 10);
                 if (messages == null || messages.isEmpty()) {
@@ -122,20 +100,23 @@ public class EtlMQConsumer {
 
                 messages.forEach(message -> {
                     logPoolInfo();
-                    if (priorityPool.getQueue().size() > 10) {
-                        // if (priorityPool.getQueue().remainingCapacity() > 0) {
+                    if (priorityPool.getQueue().size() < 10) {
                         log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
                         CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
                         param.setMessageId(message.getMessageId());
 
-                        futures.add(priorityPool.submit(new CallablePriority(param)));
+                        priorityPool.execute(new RunnablePriority(param));
+                        handles.add(message.getReceiptHandle());
                     } else {
-                        log.warn("Priority pool queue have no remainingCapacity! waiting for execute task size {}", priorityPool.getQueue().size());
+                        log.warn("Priority pool queue size > {}! waiting for execute task.", priorityPool.getQueue().size());
                     }
                 });
             } catch (Throwable e) {
                 log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
-                continue;
+            }
+
+            if (!handles.isEmpty()) {
+                consumer.ackMessage(handles);
             }
         } while (true);
     }
@@ -146,10 +127,10 @@ public class EtlMQConsumer {
                 priorityPool.getQueue().size(), priorityPool.getQueue().remainingCapacity());
     }
 
-    class CallablePriority implements Callable<String>, Comparable<CallablePriority> {
+    class RunnablePriority implements Runnable, Comparable<RunnablePriority> {
         private final CrawlerEtlParam param;
 
-        public CallablePriority(CrawlerEtlParam param) {
+        public RunnablePriority(CrawlerEtlParam param) {
             this.param = param;
         }
 
@@ -161,7 +142,7 @@ public class EtlMQConsumer {
          * 优先级比较
          */
         @Override
-        public int compareTo(CallablePriority o) {
+        public int compareTo(RunnablePriority o) {
             if (this.getPriority() < o.getPriority()) {
                 return 1;
             } else if (this.getPriority() > o.getPriority()) {
@@ -171,16 +152,14 @@ public class EtlMQConsumer {
         }
 
         @Override
-        public String call() {
+        public void run() {
             try {
                 logPoolInfo();
                 log.info("deal: {} priority: {}", param, getPriority());
                 etlService.deal(param);
                 logPoolInfo();
-                return param.getReceiptHandle();
             } catch (Exception e) {
                 log.error("deal {} error.", param, e);
-                return null;
             }
         }