Explorar el Código

fixed task loss when priority queue is full

ehlxr hace 1 año
padre
commit
da3af3a91a

+ 18 - 9
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -32,6 +32,8 @@ import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.tzld.crawler.etl.model.vo.CrawlerEtlParam;
 import com.tzld.crawler.etl.service.EtlService;
+import com.tzld.crawler.etl.util.CommonUtils;
+import org.apache.commons.lang3.RandomUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
@@ -73,8 +75,6 @@ public class EtlMQConsumer {
     private MQConsumer consumer;
     private ThreadPoolExecutor priorityPool;
 
-    private BlockingQueue<String> ackReceiptHandleQueue;
-
     public EtlMQConsumer(EtlService etlService) {
         this.etlService = etlService;
     }
@@ -88,7 +88,7 @@ public class EtlMQConsumer {
                 new ThreadFactoryBuilder().setNameFormat("priority-etl-pool-%d").build());
         consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
 
-        ackReceiptHandleQueue = new LinkedBlockingDeque<>();
+        BlockingQueue<String> ackReceiptHandleQueue = new LinkedBlockingDeque<>();
         new Thread(() -> doConsume(ackReceiptHandleQueue)).start();
         new Thread(() -> ackMessage(ackReceiptHandleQueue)).start();
     }
@@ -120,10 +120,12 @@ public class EtlMQConsumer {
                         log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
                         CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
                         param.setMessageId(message.getMessageId());
+                        param.setReceiptHandle(message.getReceiptHandle());
 
                         priorityPool.execute(new RunnablePriority(param, ackReceiptHandleQueue));
                     } else {
-                        log.warn("Priority pool queue size > {}! waiting for execute task.", priorityPool.getQueue().size());
+                        log.warn("Priority pool queue size > {}! messageId {} will be retry. firstConsumeTime {}, ConsumedTimes {}",
+                                priorityPool.getQueue().size(), message.getMessageId(), CommonUtils.milliToDateStr(message.getFirstConsumeTime()), message.getConsumedTimes());
                     }
                 });
             } catch (Throwable e) {
@@ -133,9 +135,9 @@ public class EtlMQConsumer {
     }
 
     private void logPoolInfo() {
-        log.info("Thread pool info TaskCount {}, CompletedTaskCount {}, ActiveCount {}, Queue size {}, Queue remainingCapacity {}",
+        log.info("Thread pool info TaskCount {}, CompletedTaskCount {}, ActiveCount {}, Queue size {}",
                 priorityPool.getTaskCount(), priorityPool.getCompletedTaskCount(), priorityPool.getActiveCount(),
-                priorityPool.getQueue().size(), priorityPool.getQueue().remainingCapacity());
+                priorityPool.getQueue().size());
     }
 
     class RunnablePriority implements Runnable, Comparable<RunnablePriority> {
@@ -168,14 +170,21 @@ public class EtlMQConsumer {
         public void run() {
             try {
                 logPoolInfo();
-                log.info("deal: {} priority: {}", param, getPriority());
+                log.info("begin deal {} priority {} ", param, getPriority());
+                // TODO: 模拟数据处理
+                TimeUnit.SECONDS.sleep(RandomUtils.nextInt(2, 20));
                 etlService.deal(param);
-                log.info("deal: {} deal done", param);
-                ackReceiptHandleQueue.put(param.getReceiptHandle());
+                log.info("deal done of {}", param);
                 logPoolInfo();
             } catch (Exception e) {
                 log.error("deal {} error.", param, e);
             }
+
+            try {
+                ackReceiptHandleQueue.put(param.getReceiptHandle());
+            } catch (Exception e) {
+                log.error("put {} to ackReceiptHandleQueue error.", param.getReceiptHandle(), e);
+            }
         }
 
         @Override

+ 14 - 0
etl-core/src/main/java/com/tzld/crawler/etl/util/CommonUtils.java

@@ -24,11 +24,20 @@
 
 package com.tzld.crawler.etl.util;
 
+import com.tzld.crawler.etl.common.base.Constant;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+
 /**
  * @author ehlxr
  * @since 2023-07-20 10:52.
  */
 public class CommonUtils {
+    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern(Constant.STANDARD_FORMAT);
+
     /**
      * 递归获取根异常
      *
@@ -41,4 +50,9 @@ public class CommonUtils {
         }
         return rootCause;
     }
+
+    public static String milliToDateStr(long timestamp) {
+        LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault());
+        return dateTime.format(FORMATTER);
+    }
 }

+ 3 - 3
etl-server/src/main/java/com/tzld/crawler/etl/controller/IndexController.java

@@ -59,14 +59,14 @@ public class IndexController {
         return "ok";
     }
 
-    @PostMapping("/sendMsg")
-    public String sendMsg() {
+    @RequestMapping("/sendMsg")
+    public String sendMsg(@RequestParam(value = "size", required = false, defaultValue = "10000") int size) {
         MQClient mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
         MQProducer producer = mqClient.getProducer(instanceId, topic);
 
         String msg = "{\"userId\":6282865,\"outUserId\":\"51101170\",\"platform\":\"douyin\",\"strategy\":\"search\",\"outVideoId\":\"ppp212122fdf22\",\"videoTitle\":\"早上起床前念四句话,身体有可能变得更健康\",\"coverUrl\":\"https://cdn-xphoto2.xiaoniangao.cn/5111511988@690w_385h_0e_1pr_0r.jpg\",\"videoUrl\":\"http://v9-xg-web-pc.ixigua.com/c8347a501c9f589ff862d459249aa1eb/635fff\",\"audioUrl\":\"http://cdn-xalbum-baishan.xiaoniangao.cn/5111511797?Expires=1704038400\"}";
 
-        IntStream.range(0, 10000).parallel().forEach((i) -> {
+        IntStream.range(0, size).parallel().forEach((i) -> {
             TopicMessage pubMsg = new TopicMessage(msg.getBytes());
             pubMsg.setMessageKey("test_" + i);
             TopicMessage pubResultMsg = producer.publishMessage(pubMsg);