supeng 9 місяців тому
батько
коміт
2c98c0d54c

+ 106 - 0
etl-core/src/main/java/com/tzld/crawler/etl/config/MqConfig.java

@@ -0,0 +1,106 @@
+package com.tzld.crawler.etl.config;
+
+import com.aliyun.openservices.ons.api.PropertyKeyConst;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Properties;
+
+/**
+ * @author supeng
+ */
+@Configuration
+@ConfigurationProperties(prefix = "rocketmq")
+public class MqConfig {
+
+    private String accessKey;
+    private String secretKey;
+    private String nameSrvAddr;
+    private String topic;
+    private String groupId;
+    private String tag;
+    private String orderTopic;
+    private String orderGroupId;
+    private String orderTag;
+
+    public Properties getMqPropertie() {
+        Properties properties = new Properties();
+        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
+        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
+        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
+        return properties;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public String getNameSrvAddr() {
+        return nameSrvAddr;
+    }
+
+    public void setNameSrvAddr(String nameSrvAddr) {
+        this.nameSrvAddr = nameSrvAddr;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getTag() {
+        return tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+
+    public String getOrderTopic() {
+        return orderTopic;
+    }
+
+    public void setOrderTopic(String orderTopic) {
+        this.orderTopic = orderTopic;
+    }
+
+    public String getOrderGroupId() {
+        return orderGroupId;
+    }
+
+    public void setOrderGroupId(String orderGroupId) {
+        this.orderGroupId = orderGroupId;
+    }
+
+    public String getOrderTag() {
+        return orderTag;
+    }
+
+    public void setOrderTag(String orderTag) {
+        this.orderTag = orderTag;
+    }
+}
+

+ 8 - 0
etl-core/src/main/java/com/tzld/crawler/etl/model/dto/EtlMessageDTO.java

@@ -0,0 +1,8 @@
+package com.tzld.crawler.etl.model.dto;
+
+/**
+ * @author supeng
+ */
+public class EtlMessageDTO {
+
+}

+ 55 - 0
etl-core/src/main/java/com/tzld/crawler/etl/mq/BatchConsumerClient.java

@@ -0,0 +1,55 @@
+package com.tzld.crawler.etl.mq;
+
+import com.aliyun.openservices.ons.api.PropertyKeyConst;
+import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
+import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;
+import com.aliyun.openservices.ons.api.bean.Subscription;
+import com.tzld.crawler.etl.config.MqConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * @author supeng
+ */
+@Configuration
+public class BatchConsumerClient {
+
+    @Autowired
+    private MqConfig mqConfig;
+
+    @Autowired
+    private EtlBatchMessageListener etlBatchMessageListener;
+
+    @Bean(initMethod = "start", destroyMethod = "shutdown")
+    public BatchConsumerBean buildBatchConsumer() {
+        BatchConsumerBean batchConsumerBean = new BatchConsumerBean();
+        //配置文件
+        Properties properties = mqConfig.getMqPropertie();
+        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
+        //将消费者线程数固定为1000个 20为默认值  范围[1,1000]
+        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "100");
+        //批量消费的最大消息数量
+        properties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, "10");
+        batchConsumerBean.setProperties(properties);
+        //订阅关系
+        Map<Subscription, BatchMessageListener> subscriptionTable = new HashMap<>();
+        Subscription subscription = new Subscription();
+        subscription.setTopic(mqConfig.getTopic());
+        subscription.setExpression(mqConfig.getTag());
+        subscriptionTable.put(subscription, etlBatchMessageListener);
+        //订阅多个topic如上面设置
+//        Subscription subscription1 = new Subscription();
+//        subscription1.setTopic(mqConfig.getTopic());
+//        subscription1.setExpression(mqConfig.getTag());
+//        subscriptionTable.put(subscription1, etlBatchMessageListener1);
+
+        batchConsumerBean.setSubscriptionTable(subscriptionTable);
+        return batchConsumerBean;
+    }
+
+}

+ 47 - 0
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlBatchMessageListener.java

@@ -0,0 +1,47 @@
+package com.tzld.crawler.etl.mq;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.openservices.ons.api.Action;
+import com.aliyun.openservices.ons.api.ConsumeContext;
+import com.aliyun.openservices.ons.api.Message;
+import com.tzld.crawler.etl.model.dto.EtlMessageDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * 新增消息 批量消费
+ *
+ * @author supeng
+ */
+@Component
+public class EtlBatchMessageListener implements com.aliyun.openservices.ons.api.batch.BatchMessageListener {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(EtlBatchMessageListener.class);
+    @Override
+    public Action consume(final List<Message> messages, final ConsumeContext context) {
+        LOGGER.info("Receive message size = {}", messages.size());
+        try {
+            for (Message message : messages) {
+                try {
+                    String body = new String(message.getBody(), StandardCharsets.UTF_8);
+                    LOGGER.info("message body = {}", body);
+                    EtlMessageDTO etlMessageDTO = JSONObject.parseObject(body, EtlMessageDTO.class);
+                    //todo
+
+                } catch (Exception e) {
+                    LOGGER.info("loop message error {}", e, JSON.toJSONString(message));
+                }
+            }
+            //消费成功
+            return Action.CommitMessage;
+        } catch (Exception e) {
+            //消费失败
+            return Action.ReconsumeLater;
+        }
+    }
+}

+ 106 - 106
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -1,106 +1,106 @@
-/*
- * The MIT License (MIT)
- *
- * Copyright © 2023 xrv <xrv@live.com>
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- * THE SOFTWARE.
- */
-
-package com.tzld.crawler.etl.mq;
-
-import com.alibaba.fastjson.JSONObject;
-import com.aliyun.mq.http.MQClient;
-import com.aliyun.mq.http.MQConsumer;
-import com.aliyun.mq.http.model.Message;
-import com.tzld.crawler.etl.model.vo.CrawlerEtlParam;
-import com.tzld.crawler.etl.service.EtlService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * @author ehlxr
- * @since 2023-06-09 15:24.
- */
-@Component
-public class EtlMQConsumer {
-    private static final Logger log = LoggerFactory.getLogger(EtlMQConsumer.class);
-    private final EtlService etlService;
-    @Value("${rocketmq.accessKey:}")
-    private String accessKey;
-    @Value("${rocketmq.secretKey:}")
-    private String secretKey;
-    @Value("${rocketmq.httpEndpoint:}")
-    private String httpEndpoint;
-    @Value("${rocketmq.instanceId:}")
-    private String instanceId;
-    @Value("${rocketmq.crawler.etl.topic:}")
-    private String topic;
-    @Value("${rocketmq.crawler.etl.groupid:}")
-    private String groupId;
-    @Value("${consumer.thread.size:32}")
-    private Integer threadSize;
-
-    private MQConsumer consumer;
-
-    public EtlMQConsumer(EtlService etlService) {
-        this.etlService = etlService;
-    }
-
-    @PostConstruct
-    public void init() {
-        MQClient mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
-        consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
-
-        ExecutorService service = Executors.newFixedThreadPool(threadSize);
-        for (int i = 0; i < threadSize; i++) {
-            service.submit(this::consumeMsg);
-        }
-    }
-
-    private void consumeMsg() {
-        do {
-            try {
-                List<Message> messages = consumer.consumeMessage(1, 10);
-                if (messages == null || messages.isEmpty()) {
-                    log.info("No new message, continue");
-                    continue;
-                }
-
-                messages.forEach(message -> {
-                    log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
-                    CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
-                    param.setMessageId(message.getMessageId());
-                    etlService.deal(param);
-                    log.info("Deal done of message: {} from topic: {}, group: {}", message, topic, groupId);
-                    consumer.ackMessage(Collections.singletonList(message.getReceiptHandle()));
-                });
-            } catch (Throwable e) {
-                log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
-            }
-        } while (true);
-    }
-}
+///*
+// * The MIT License (MIT)
+// *
+// * Copyright © 2023 xrv <xrv@live.com>
+// *
+// * Permission is hereby granted, free of charge, to any person obtaining a copy
+// * of this software and associated documentation files (the "Software"), to deal
+// * in the Software without restriction, including without limitation the rights
+// * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// * copies of the Software, and to permit persons to whom the Software is
+// * furnished to do so, subject to the following conditions:
+// *
+// * The above copyright notice and this permission notice shall be included in
+// * all copies or substantial portions of the Software.
+// *
+// * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// * THE SOFTWARE.
+// */
+//
+//package com.tzld.crawler.etl.mq;
+//
+//import com.alibaba.fastjson.JSONObject;
+//import com.aliyun.mq.http.MQClient;
+//import com.aliyun.mq.http.MQConsumer;
+//import com.aliyun.mq.http.model.Message;
+//import com.tzld.crawler.etl.model.vo.CrawlerEtlParam;
+//import com.tzld.crawler.etl.service.EtlService;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//import org.springframework.beans.factory.annotation.Value;
+//import org.springframework.stereotype.Component;
+//
+//import javax.annotation.PostConstruct;
+//import java.util.Collections;
+//import java.util.List;
+//import java.util.concurrent.ExecutorService;
+//import java.util.concurrent.Executors;
+//
+///**
+// * @author ehlxr
+// * @since 2023-06-09 15:24.
+// */
+//@Component
+//public class EtlMQConsumer {
+//    private static final Logger log = LoggerFactory.getLogger(EtlMQConsumer.class);
+//    private final EtlService etlService;
+//    @Value("${rocketmq.accessKey:}")
+//    private String accessKey;
+//    @Value("${rocketmq.secretKey:}")
+//    private String secretKey;
+//    @Value("${rocketmq.httpEndpoint:}")
+//    private String httpEndpoint;
+//    @Value("${rocketmq.instanceId:}")
+//    private String instanceId;
+//    @Value("${rocketmq.crawler.etl.topic:}")
+//    private String topic;
+//    @Value("${rocketmq.crawler.etl.groupid:}")
+//    private String groupId;
+//    @Value("${consumer.thread.size:32}")
+//    private Integer threadSize;
+//
+//    private MQConsumer consumer;
+//
+//    public EtlMQConsumer(EtlService etlService) {
+//        this.etlService = etlService;
+//    }
+//
+//    @PostConstruct
+//    public void init() {
+//        MQClient mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
+//        consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
+//
+//        ExecutorService service = Executors.newFixedThreadPool(threadSize);
+//        for (int i = 0; i < threadSize; i++) {
+//            service.submit(this::consumeMsg);
+//        }
+//    }
+//
+//    private void consumeMsg() {
+//        do {
+//            try {
+//                List<Message> messages = consumer.consumeMessage(1, 10);
+//                if (messages == null || messages.isEmpty()) {
+//                    log.info("No new message, continue");
+//                    continue;
+//                }
+//
+//                messages.forEach(message -> {
+//                    log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
+//                    CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
+//                    param.setMessageId(message.getMessageId());
+//                    etlService.deal(param);
+//                    log.info("Deal done of message: {} from topic: {}, group: {}", message, topic, groupId);
+//                    consumer.ackMessage(Collections.singletonList(message.getReceiptHandle()));
+//                });
+//            } catch (Throwable e) {
+//                log.error("Consume message from topic: {}, group: {} error", topic, groupId, e);
+//            }
+//        } while (true);
+//    }
+//}

+ 24 - 24
etl-server/src/test/java/com/tzld/crawler/etl/EtlServerApplicationTests.java

@@ -1,12 +1,12 @@
 package com.tzld.crawler.etl;
 
-import com.aliyun.mq.http.MQClient;
-import com.aliyun.mq.http.MQProducer;
-import com.aliyun.mq.http.model.TopicMessage;
+//import com.aliyun.mq.http.MQClient;
+//import com.aliyun.mq.http.MQProducer;
+//import com.aliyun.mq.http.model.TopicMessage;
 import com.google.common.collect.Lists;
 import com.huaban.analysis.jieba.JiebaSegmenter;
 import com.huaban.analysis.jieba.SegToken;
-import com.tzld.crawler.etl.mq.EtlMQConsumer;
+//import com.tzld.crawler.etl.mq.EtlMQConsumer;
 import net.bramp.ffmpeg.FFprobe;
 import net.bramp.ffmpeg.probe.FFmpegFormat;
 import net.bramp.ffmpeg.probe.FFmpegProbeResult;
@@ -35,30 +35,30 @@ class EtlServerApplicationTests {
     /**
      * 排除 EtlMQConsumer 的加载
      */
-    @MockBean
-    private EtlMQConsumer etlMQConsumer;
+//    @MockBean
+//    private EtlMQConsumer etlMQConsumer;
 
 
     @Test
     void produceMsgTest() {
-        MQClient mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
-        MQProducer producer = mqClient.getProducer(instanceId, "topic_crawler_etl_test");
-        ArrayList<String> platforms = Lists.newArrayList("benshanzhufu", "kuaishou", "douyin", "xigua", "gongzhonghao", "xiaoniangao");
-
-        IntStream.range(0, 20).forEach(x -> {
-            try {
-                String s = "{\"user_id\": 6281907,\"out_user_id\": \"53322270\",\"platform\": \"" + platforms.get(x % 6) + "\",\"strategy\": \"author\"," +
-                        "\"out_video_id\": \"" + System.currentTimeMillis() + "\"," +
-                        "\"video_title\": \"" + x + "世界上最强百米对决\",\"cover_url\": \"https://cdn-xphoto2.xiaoniangao.cn/5200474225?Expires=1704038400&OSSAccessKeyId=LTAI4G2W1FsgwzAWYpPoB3v6&Signature=ncvtSP8FSrwuU8unZMtxdXIuWBE%3D&x-oss-process=image%2Fresize%2Cw_690%2Ch_385%2Climit_0%2Finterlace%2C1%2Fformat%2Cjpg%2Fauto-orient%2C0\"," +
-                        "\"video_url\": \"https://cdn-xalbum2.xiaoniangao.cn/6506ec4500000104bd7c0623?Expires=1704038400&OSSAccessKeyId=LTAI5tB7cRkYiqHcTdkVprwb&Signature=CxJEEcwUR87is9X3li5xP5ZiDvQ%3D\"" +
-                        ",\"duration\": 40,\"publish_time\": \"2023-06-08 23:01:47\",\"play_cnt\": 602,\"like_cnt\": 0,\"share_cnt\": 0,\"collection_cnt\": 0,\"comment_cnt\": 0,\"crawler_rule\": {\"period\": { \"max\": 3, \"min\": 3 },\"duration\": { \"max\": 999999999999999, \"min\": 40 },\"play_cnt\": { \"max\": 999999999999999, \"min\": 500 }},\"width\": 450,\"height\": 254}";
-
-                TopicMessage pubMsg = new TopicMessage(s.getBytes());
-                producer.publishMessage(pubMsg);
-            } catch (Exception e) {
-                System.out.println(e);
-            }
-        });
+//        MQClient mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
+//        MQProducer producer = mqClient.getProducer(instanceId, "topic_crawler_etl_test");
+//        ArrayList<String> platforms = Lists.newArrayList("benshanzhufu", "kuaishou", "douyin", "xigua", "gongzhonghao", "xiaoniangao");
+//
+//        IntStream.range(0, 20).forEach(x -> {
+//            try {
+//                String s = "{\"user_id\": 6281907,\"out_user_id\": \"53322270\",\"platform\": \"" + platforms.get(x % 6) + "\",\"strategy\": \"author\"," +
+//                        "\"out_video_id\": \"" + System.currentTimeMillis() + "\"," +
+//                        "\"video_title\": \"" + x + "世界上最强百米对决\",\"cover_url\": \"https://cdn-xphoto2.xiaoniangao.cn/5200474225?Expires=1704038400&OSSAccessKeyId=LTAI4G2W1FsgwzAWYpPoB3v6&Signature=ncvtSP8FSrwuU8unZMtxdXIuWBE%3D&x-oss-process=image%2Fresize%2Cw_690%2Ch_385%2Climit_0%2Finterlace%2C1%2Fformat%2Cjpg%2Fauto-orient%2C0\"," +
+//                        "\"video_url\": \"https://cdn-xalbum2.xiaoniangao.cn/6506ec4500000104bd7c0623?Expires=1704038400&OSSAccessKeyId=LTAI5tB7cRkYiqHcTdkVprwb&Signature=CxJEEcwUR87is9X3li5xP5ZiDvQ%3D\"" +
+//                        ",\"duration\": 40,\"publish_time\": \"2023-06-08 23:01:47\",\"play_cnt\": 602,\"like_cnt\": 0,\"share_cnt\": 0,\"collection_cnt\": 0,\"comment_cnt\": 0,\"crawler_rule\": {\"period\": { \"max\": 3, \"min\": 3 },\"duration\": { \"max\": 999999999999999, \"min\": 40 },\"play_cnt\": { \"max\": 999999999999999, \"min\": 500 }},\"width\": 450,\"height\": 254}";
+//
+//                TopicMessage pubMsg = new TopicMessage(s.getBytes());
+//                producer.publishMessage(pubMsg);
+//            } catch (Exception e) {
+//                System.out.println(e);
+//            }
+//        });
 
     }
 

+ 22 - 7
pom.xml

@@ -120,12 +120,12 @@
             <artifactId>caffeine</artifactId>
         </dependency>
 
-        <dependency>
-            <groupId>com.aliyun.mq</groupId>
-            <artifactId>mq-http-sdk</artifactId>
-            <version>1.0.3.2</version>
-            <classifier>jar-with-dependencies</classifier>
-        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>com.aliyun.mq</groupId>-->
+<!--            <artifactId>mq-http-sdk</artifactId>-->
+<!--            <version>1.0.3.2</version>-->
+<!--            <classifier>jar-with-dependencies</classifier>-->
+<!--        </dependency>-->
         <dependency>
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections</artifactId>
@@ -163,7 +163,22 @@
             <groupId>org.hibernate.validator</groupId>
             <artifactId>hibernate-validator</artifactId>
         </dependency>
-
+        <!-- aliyun rocketmq 4.0-->
+        <dependency>
+            <groupId>com.aliyun.openservices</groupId>
+            <artifactId>ons-client</artifactId>
+            <version>1.8.8.8.Final</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-configuration-processor</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.32</version>
+        </dependency>
     </dependencies>
 
 </project>