Przeglądaj źródła

fixed task loss when priority queue is full

ehlxr 2 lat temu
rodzic
commit
cf5b86694d

+ 14 - 0
etl-core/src/main/java/com/tzld/crawler/etl/model/vo/CrawlerEtlParam.java

@@ -127,6 +127,11 @@ public class CrawlerEtlParam {
      */
     private String messageId;
 
+    /**
+     * the handle of message, used to ack
+     */
+    private String receiptHandle;
+
     @Override
     public String toString() {
         return "CrawlerEtlParam{" +
@@ -155,9 +160,18 @@ public class CrawlerEtlParam {
                 ", videoOssPath='" + videoOssPath + '\'' +
                 ", audioUrl='" + audioUrl + '\'' +
                 ", messageId='" + messageId + '\'' +
+                ", receiptHandle='" + receiptHandle + '\'' +
                 '}';
     }
 
+    public String getReceiptHandle() {
+        return receiptHandle;
+    }
+
+    public void setReceiptHandle(String receiptHandle) {
+        this.receiptHandle = receiptHandle;
+    }
+
     public String getMessageId() {
         return messageId;
     }

+ 31 - 16
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -38,12 +38,10 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * @author ehlxr
@@ -69,9 +67,14 @@ public class EtlMQConsumer {
     @ApolloJsonValue("${crawler.etl.priority:{}}")
     private Map<String, Integer> priorityMap;
 
+    @Value("${pool.queue.maxsize.limit:80}")
+    private Integer queueSizeList;
+
     private MQConsumer consumer;
     private ThreadPoolExecutor priorityPool;
 
+    private BlockingQueue<String> ackReceiptHandleQueue;
+
     public EtlMQConsumer(EtlService etlService) {
         this.etlService = etlService;
     }
@@ -80,17 +83,30 @@ public class EtlMQConsumer {
     public void init() {
         MQClient mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
         priorityPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
-                Runtime.getRuntime().availableProcessors() * 2, 60L,
+                Runtime.getRuntime().availableProcessors(), 60L,
                 TimeUnit.SECONDS, new PriorityBlockingQueue<>(),
                 new ThreadFactoryBuilder().setNameFormat("priority-etl-pool-%d").build());
         consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
 
-        new Thread(this::doConsume).start();
+        ackReceiptHandleQueue = new LinkedBlockingDeque<>();
+        new Thread(() -> doConsume(ackReceiptHandleQueue)).start();
+        new Thread(() -> ackMessage(ackReceiptHandleQueue)).start();
+    }
+
+    private void ackMessage(BlockingQueue<String> ackReceiptHandleQueue) {
+        try {
+            while (true) {
+                String receiptHandle = ackReceiptHandleQueue.take();
+                log.info("ack mq receiptHandle: {}", receiptHandle);
+                consumer.ackMessage(Collections.singletonList(receiptHandle));
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
     }
 
-    private void doConsume() {
+    private void doConsume(BlockingQueue<String> ackReceiptHandleQueue) {
         do {
-            List<String> handles = new ArrayList<>();
             try {
                 List<Message> messages = consumer.consumeMessage(1, 10);
                 if (messages == null || messages.isEmpty()) {
@@ -100,13 +116,12 @@ public class EtlMQConsumer {
 
                 messages.forEach(message -> {
                     logPoolInfo();
-                    if (priorityPool.getQueue().size() < 10) {
+                    if (priorityPool.getQueue().size() < queueSizeList) {
                         log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
                         CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
                         param.setMessageId(message.getMessageId());
 
-                        priorityPool.execute(new RunnablePriority(param));
-                        handles.add(message.getReceiptHandle());
+                        priorityPool.execute(new RunnablePriority(param, ackReceiptHandleQueue));
                     } else {
                         log.warn("Priority pool queue size > {}! waiting for execute task.", priorityPool.getQueue().size());
                     }
@@ -114,10 +129,6 @@ public class EtlMQConsumer {
             } catch (Throwable e) {
                 log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
             }
-
-            if (!handles.isEmpty()) {
-                consumer.ackMessage(handles);
-            }
         } while (true);
     }
 
@@ -129,8 +140,10 @@ public class EtlMQConsumer {
 
     class RunnablePriority implements Runnable, Comparable<RunnablePriority> {
         private final CrawlerEtlParam param;
+        private final BlockingQueue<String> ackReceiptHandleQueue;
 
-        public RunnablePriority(CrawlerEtlParam param) {
+        public RunnablePriority(CrawlerEtlParam param, BlockingQueue<String> ackReceiptHandleQueue) {
+            this.ackReceiptHandleQueue = ackReceiptHandleQueue;
             this.param = param;
         }
 
@@ -157,6 +170,8 @@ public class EtlMQConsumer {
                 logPoolInfo();
                 log.info("deal: {} priority: {}", param, getPriority());
                 etlService.deal(param);
+                log.info("deal: {} deal done", param);
+                ackReceiptHandleQueue.put(param.getReceiptHandle());
                 logPoolInfo();
             } catch (Exception e) {
                 log.error("deal {} error.", param, e);