Преглед на файлове

etl consumer fixed thread

ehlxr преди 1 година
родител
ревизия
c3ed78d877
променени са 1 файла, в които са добавени 15 реда и са изтрити 66 реда
  1. 15 66
      etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

+ 15 - 66
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -28,8 +28,6 @@ import com.alibaba.fastjson.JSONObject;
 import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.model.Message;
 import com.aliyun.mq.http.model.Message;
-import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.tzld.crawler.etl.model.vo.CrawlerEtlParam;
 import com.tzld.crawler.etl.model.vo.CrawlerEtlParam;
 import com.tzld.crawler.etl.service.EtlService;
 import com.tzld.crawler.etl.service.EtlService;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -40,10 +38,8 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PostConstruct;
 import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 
 /**
 /**
  * @author ehlxr
  * @author ehlxr
@@ -65,12 +61,10 @@ public class EtlMQConsumer {
     private String topic;
     private String topic;
     @Value("${rocketmq.crawler.etl.groupid:}")
     @Value("${rocketmq.crawler.etl.groupid:}")
     private String groupId;
     private String groupId;
+    @Value("${consumer.thread.size:10}")
+    private Integer threadSize;
 
 
-    @ApolloJsonValue("${crawler.etl.priority:{}}")
-    private Map<String, Integer> priorityMap;
-
-    private MQClient mqClient;
-    private ThreadPoolExecutor priorityPool;
+    private MQConsumer consumer;
 
 
     public EtlMQConsumer(EtlService etlService) {
     public EtlMQConsumer(EtlService etlService) {
         this.etlService = etlService;
         this.etlService = etlService;
@@ -78,20 +72,20 @@ public class EtlMQConsumer {
 
 
     @PostConstruct
     @PostConstruct
     public void init() {
     public void init() {
-        mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
-        priorityPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60L,
-                TimeUnit.SECONDS, new PriorityBlockingQueue<>(),
-                new ThreadFactoryBuilder().setNameFormat("priority-etl-pool-%d").build());
+        MQClient mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
+        consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
 
 
-        new Thread(this::consumeMsg).start();
+        ExecutorService service = Executors.newFixedThreadPool(threadSize);
+        for (int i = 0; i < threadSize; i++) {
+            service.submit(this::consumeMsg);
+        }
     }
     }
 
 
     private void consumeMsg() {
     private void consumeMsg() {
-        final MQConsumer consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
         do {
         do {
             List<String> handles = new ArrayList<>();
             List<String> handles = new ArrayList<>();
             try {
             try {
-                List<Message> messages = consumer.consumeMessage(10, 10);
+                List<Message> messages = consumer.consumeMessage(1, 10);
                 if (messages == null || messages.isEmpty()) {
                 if (messages == null || messages.isEmpty()) {
                     log.info("No new message, continue");
                     log.info("No new message, continue");
                     continue;
                     continue;
@@ -99,10 +93,11 @@ public class EtlMQConsumer {
 
 
                 messages.forEach(message -> {
                 messages.forEach(message -> {
                     log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
                     log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
-                    handles.add(message.getReceiptHandle());
                     CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
                     CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
                     param.setMessageId(message.getMessageId());
                     param.setMessageId(message.getMessageId());
-                    priorityPool.execute(new RunnablePriority(param));
+                    etlService.deal(param);
+                    log.info("Deal done of message: {} from topic: {}, group: {}", message, topic, groupId);
+                    handles.add(message.getReceiptHandle());
                 });
                 });
             } catch (Throwable e) {
             } catch (Throwable e) {
                 log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
                 log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
@@ -112,50 +107,4 @@ public class EtlMQConsumer {
             }
             }
         } while (true);
         } while (true);
     }
     }
-
-    class RunnablePriority implements Runnable, Comparable<RunnablePriority> {
-        private final CrawlerEtlParam param;
-
-        public RunnablePriority(CrawlerEtlParam param) {
-            this.param = param;
-        }
-
-        public Integer getPriority() {
-            return priorityMap.getOrDefault(param.getPlatform(), -1);
-        }
-
-        /**
-         * 优先级比较
-         */
-        @Override
-        public int compareTo(RunnablePriority o) {
-            if (this.getPriority() < o.getPriority()) {
-                return 1;
-            } else if (this.getPriority() > o.getPriority()) {
-                return -1;
-            }
-            return 0;
-        }
-
-        @Override
-        public void run() {
-            try {
-                log.info("Thread pool info \nTaskCount: {}\nCompletedTaskCount: {}\nActiveCount: {}\nQueue size: {}\nQueue elements: {}",
-                        priorityPool.getTaskCount(), priorityPool.getCompletedTaskCount(), priorityPool.getActiveCount(),
-                        priorityPool.getQueue().size(), priorityPool.getQueue());
-                log.info("deal: {} priority: {}", param, getPriority());
-                etlService.deal(param);
-                log.info("Thread pool info \nTaskCount: {}\nCompletedTaskCount: {}\nActiveCount: {}\nQueue size: {}\nQueue elements: {}",
-                        priorityPool.getTaskCount(), priorityPool.getCompletedTaskCount(), priorityPool.getActiveCount(),
-                        priorityPool.getQueue().size(), priorityPool.getQueue());
-            } catch (Exception e) {
-                log.error("deal {} error.", param, e);
-            }
-        }
-
-        @Override
-        public String toString() {
-            return param.getPlatform() + ":" + param.getOutVideoId();
-        }
-    }
 }
 }