Browse Source

fixed task loss when priority queue is full

ehlxr 1 year ago
parent
commit
a36aadcc82

+ 44 - 30
etl-core/src/main/java/com/tzld/crawler/etl/model/vo/CrawlerEtlParam.java

@@ -127,36 +127,10 @@ public class CrawlerEtlParam {
      */
     private String messageId;
 
-    @Override
-    public String toString() {
-        return "CrawlerEtlParam{" +
-                "userId=" + userId +
-                ", outUserId='" + outUserId + '\'' +
-                ", userName='" + userName + '\'' +
-                ", avatarUrl='" + avatarUrl + '\'' +
-                ", platform='" + platform + '\'' +
-                ", strategy='" + strategy + '\'' +
-                ", outVideoId='" + outVideoId + '\'' +
-                ", videoTitle='" + videoTitle + '\'' +
-                ", coverUrl='" + coverUrl + '\'' +
-                ", videoUrl='" + videoUrl + '\'' +
-                ", duration=" + duration +
-                ", publishTime='" + publishTime + '\'' +
-                ", playCnt=" + playCnt +
-                ", likeCnt=" + likeCnt +
-                ", shareCnt=" + shareCnt +
-                ", collectionCnt=" + collectionCnt +
-                ", commentCnt=" + commentCnt +
-                ", crawlerRule='" + crawlerRule + '\'' +
-                ", width=" + width +
-                ", height=" + height +
-                ", strategyType='" + strategyType + '\'' +
-                ", coverOssPath='" + coverOssPath + '\'' +
-                ", videoOssPath='" + videoOssPath + '\'' +
-                ", audioUrl='" + audioUrl + '\'' +
-                ", messageId='" + messageId + '\'' +
-                '}';
-    }
+    /**
+     * the handle of message, used to ack
+     */
+    private String receiptHandle;
 
     public String getMessageId() {
         return messageId;
@@ -357,4 +331,44 @@ public class CrawlerEtlParam {
     public void setHeight(Integer height) {
         this.height = height;
     }
+
+    @Override
+    public String toString() {
+        return "CrawlerEtlParam{" +
+                "userId=" + userId +
+                ", outUserId='" + outUserId + '\'' +
+                ", userName='" + userName + '\'' +
+                ", avatarUrl='" + avatarUrl + '\'' +
+                ", platform='" + platform + '\'' +
+                ", strategy='" + strategy + '\'' +
+                ", outVideoId='" + outVideoId + '\'' +
+                ", videoTitle='" + videoTitle + '\'' +
+                ", coverUrl='" + coverUrl + '\'' +
+                ", videoUrl='" + videoUrl + '\'' +
+                ", duration=" + duration +
+                ", publishTime='" + publishTime + '\'' +
+                ", playCnt=" + playCnt +
+                ", likeCnt=" + likeCnt +
+                ", shareCnt=" + shareCnt +
+                ", collectionCnt=" + collectionCnt +
+                ", commentCnt=" + commentCnt +
+                ", crawlerRule='" + crawlerRule + '\'' +
+                ", width=" + width +
+                ", height=" + height +
+                ", strategyType='" + strategyType + '\'' +
+                ", coverOssPath='" + coverOssPath + '\'' +
+                ", videoOssPath='" + videoOssPath + '\'' +
+                ", audioUrl='" + audioUrl + '\'' +
+                ", messageId='" + messageId + '\'' +
+                ", receiptHandle='" + receiptHandle + '\'' +
+                '}';
+    }
+
+    public String getReceiptHandle() {
+        return receiptHandle;
+    }
+
+    public void setReceiptHandle(String receiptHandle) {
+        this.receiptHandle = receiptHandle;
+    }
 }

+ 59 - 29
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -39,11 +39,10 @@ import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * @author ehlxr
@@ -69,7 +68,7 @@ public class EtlMQConsumer {
     @ApolloJsonValue("${crawler.etl.priority:{}}")
     private Map<String, Integer> priorityMap;
 
-    private MQClient mqClient;
+    private MQConsumer consumer;
     private ThreadPoolExecutor priorityPool;
 
     public EtlMQConsumer(EtlService etlService) {
@@ -78,45 +77,78 @@ public class EtlMQConsumer {
 
     @PostConstruct
     public void init() {
-        mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
-        priorityPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60L,
+        MQClient mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
+        priorityPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+                Runtime.getRuntime().availableProcessors() * 2, 60L,
                 TimeUnit.SECONDS, new PriorityBlockingQueue<>(),
                 new ThreadFactoryBuilder().setNameFormat("priority-etl-pool-%d").build());
+        consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
 
-        new Thread(this::consumeMsg).start();
+        new Thread(this::doConsume).start();
     }
 
-    private void consumeMsg() {
-        final MQConsumer consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
-        do {
+    private void doConsume() {
+        // ack message
+        List<Future<String>> futures = new ArrayList<>();
+        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+        service.scheduleAtFixedRate(() -> {
             List<String> handles = new ArrayList<>();
+
+            // Iterate over the futures list using a traditional loop to prevent ConcurrentModificationExceptions
+            for (Iterator<Future<String>> iterator = futures.iterator(); iterator.hasNext(); ) {
+                Future<String> future = iterator.next();
+                if (future.isDone()) {
+                    try {
+                        handles.add(future.get(10, TimeUnit.MICROSECONDS));
+                        iterator.remove();
+                    } catch (Exception e) {
+                        log.warn("Exception occurred during future.get(): {}", e.getMessage());
+                    }
+                }
+            }
+
+            if (!handles.isEmpty()) {
+                consumer.ackMessage(handles);
+            }
+        }, 0, 1, TimeUnit.SECONDS);
+
+        do {
             try {
-                List<Message> messages = consumer.consumeMessage(10, 10);
+                List<Message> messages = consumer.consumeMessage(1, 10);
                 if (messages == null || messages.isEmpty()) {
                     log.info("No new message, continue");
                     continue;
                 }
 
                 messages.forEach(message -> {
-                    log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
-                    handles.add(message.getReceiptHandle());
-                    CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
-                    param.setMessageId(message.getMessageId());
-                    priorityPool.execute(new RunnablePriority(param));
+                    logPoolInfo();
+                    if (priorityPool.getQueue().remainingCapacity() > 0) {
+                        log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
+                        CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
+                        param.setMessageId(message.getMessageId());
+
+                        futures.add(priorityPool.submit(new CallablePriority(param)));
+                    } else {
+                        log.warn("Priority pool queue have no remainingCapacity! waiting for execute task size {}", priorityPool.getQueue().size());
+                    }
                 });
             } catch (Throwable e) {
                 log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
-            }
-            if (!handles.isEmpty()) {
-                consumer.ackMessage(handles);
+                continue;
             }
         } while (true);
     }
 
-    class RunnablePriority implements Runnable, Comparable<RunnablePriority> {
+    private void logPoolInfo() {
+        log.info("Thread pool info TaskCount {}, CompletedTaskCount {}, ActiveCount {}, Queue size {}, Queue remainingCapacity {}",
+                priorityPool.getTaskCount(), priorityPool.getCompletedTaskCount(), priorityPool.getActiveCount(),
+                priorityPool.getQueue().size(), priorityPool.getQueue().remainingCapacity());
+    }
+
+    class CallablePriority implements Callable<String>, Comparable<CallablePriority> {
         private final CrawlerEtlParam param;
 
-        public RunnablePriority(CrawlerEtlParam param) {
+        public CallablePriority(CrawlerEtlParam param) {
             this.param = param;
         }
 
@@ -128,7 +160,7 @@ public class EtlMQConsumer {
          * 优先级比较
          */
         @Override
-        public int compareTo(RunnablePriority o) {
+        public int compareTo(CallablePriority o) {
             if (this.getPriority() < o.getPriority()) {
                 return 1;
             } else if (this.getPriority() > o.getPriority()) {
@@ -138,18 +170,16 @@ public class EtlMQConsumer {
         }
 
         @Override
-        public void run() {
+        public String call() {
             try {
-                log.info("Thread pool info \nTaskCount: {}\nCompletedTaskCount: {}\nActiveCount: {}\nQueue size: {}\nQueue elements: {}",
-                        priorityPool.getTaskCount(), priorityPool.getCompletedTaskCount(), priorityPool.getActiveCount(),
-                        priorityPool.getQueue().size(), priorityPool.getQueue());
+                logPoolInfo();
                 log.info("deal: {} priority: {}", param, getPriority());
                 etlService.deal(param);
-                log.info("Thread pool info \nTaskCount: {}\nCompletedTaskCount: {}\nActiveCount: {}\nQueue size: {}\nQueue elements: {}",
-                        priorityPool.getTaskCount(), priorityPool.getCompletedTaskCount(), priorityPool.getActiveCount(),
-                        priorityPool.getQueue().size(), priorityPool.getQueue());
+                logPoolInfo();
+                return param.getReceiptHandle();
             } catch (Exception e) {
                 log.error("deal {} error.", param, e);
+                return null;
             }
         }