supeng преди 1 година
родител
ревизия
ce31d3389d

+ 47 - 0
etl-core/src/main/java/com/tzld/crawler/etl/mq/BatchConsumerClient.java

@@ -0,0 +1,47 @@
+package com.tzld.crawler.etl.mq;
+
+import com.aliyun.openservices.ons.api.PropertyKeyConst;
+import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
+import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;
+import com.aliyun.openservices.ons.api.bean.Subscription;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+@Configuration
+public class BatchConsumerClient {
+
+    @Autowired
+    private MqConfig mqConfig;
+
+    @Autowired
+    private BatchEtlMessageListener batchEtlMessageListener;
+
+    @Bean(initMethod = "start", destroyMethod = "shutdown")
+    public BatchConsumerBean buildBatchConsumer() {
+        BatchConsumerBean batchConsumerBean = new BatchConsumerBean();
+        //配置文件
+        Properties properties = mqConfig.getMqPropertie();
+        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
+        //将消费者线程数固定为?个 20为默认值  范围[1,1000]
+        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "1000");
+        //批量消费的最大消息数量
+        properties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, "32");
+        batchConsumerBean.setProperties(properties);
+        //订阅关系
+        Map<Subscription, BatchMessageListener> subscriptionTable = new HashMap<>();
+        Subscription subscription = new Subscription();
+        subscription.setTopic(mqConfig.getTopic());
+        subscription.setExpression(mqConfig.getTag());
+        subscriptionTable.put(subscription, batchEtlMessageListener);
+        //订阅多个topic如上面设置
+
+        batchConsumerBean.setSubscriptionTable(subscriptionTable);
+        return batchConsumerBean;
+    }
+
+}

+ 53 - 0
etl-core/src/main/java/com/tzld/crawler/etl/mq/BatchEtlMessageListener.java

@@ -0,0 +1,53 @@
+package com.tzld.crawler.etl.mq;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.openservices.ons.api.Action;
+import com.aliyun.openservices.ons.api.ConsumeContext;
+import com.aliyun.openservices.ons.api.Message;
+import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
+import com.tzld.crawler.etl.model.vo.CrawlerEtlParam;
+import com.tzld.crawler.etl.service.EtlService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * 批量消费
+ *
+ * @author supeng
+ */
+@Component
+public class BatchEtlMessageListener implements BatchMessageListener {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(BatchEtlMessageListener.class);
+
+    @Autowired
+    EtlService etlService;
+
+    @Override
+    public Action consume(final List<Message> messages, final ConsumeContext context) {
+        LOGGER.info("Receive message size = {}", messages.size());
+        try {
+            for (Message message : messages) {
+                try {
+                    String body = new String(message.getBody(), StandardCharsets.UTF_8);
+                    LOGGER.info("message body = {}", body);
+                    CrawlerEtlParam param = JSONObject.parseObject(body, CrawlerEtlParam.class);
+                    param.setMessageId(message.getMsgID());
+                    etlService.deal(param);
+                } catch (Exception e) {
+                    LOGGER.info("loop message error {}", JSON.toJSONString(message), e);
+                }
+            }
+            return Action.CommitMessage;
+        } catch (Exception e) {
+            //消费失败
+            return Action.ReconsumeLater;
+        }
+    }
+}

+ 76 - 0
etl-core/src/main/java/com/tzld/crawler/etl/mq/MqConfig.java

@@ -0,0 +1,76 @@
+package com.tzld.crawler.etl.mq;
+
+import com.aliyun.openservices.ons.api.PropertyKeyConst;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Properties;
+
+@Configuration
+@ConfigurationProperties(prefix = "rocketmq")
+public class MqConfig {
+
+    private String accessKey;
+    private String secretKey;
+    private String nameSrvAddr;
+    private String topic;
+    private String groupId;
+    private String tag;
+
+    public Properties getMqPropertie() {
+        Properties properties = new Properties();
+        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
+        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
+        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
+        return properties;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public String getNameSrvAddr() {
+        return nameSrvAddr;
+    }
+
+    public void setNameSrvAddr(String nameSrvAddr) {
+        this.nameSrvAddr = nameSrvAddr;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getTag() {
+        return tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+}
+

+ 12 - 4
etl-server/src/main/resources/application-dev.yml

@@ -38,8 +38,16 @@ feign:
         # FULL 记录请求和响应的头、正文和元数据
         logger-level: FULL
 
+#rocketmq:
+#  crawler:
+#    etl:
+#      topic: topic_crawler_etl_dev
+#      groupid: GID_CRAWLER_ETL_DEV
+
 rocketmq:
-  crawler:
-    etl:
-      topic: topic_crawler_etl_dev
-      groupid: GID_CRAWLER_ETL_DEV
+  accessKey: LTAI4G7puhXtLyHzHQpD6H7A
+  secretKey: nEbq3xWNQd1qLpdy2u71qFweHkZjSG
+  nameSrvAddr: http://MQ_INST_1894469520484605_BXhXuzkZ.mq-internet-access.mq-internet.aliyuncs.com:80
+  topic: topic_crawler_etl_dev
+  groupId: GID_CRAWLER_ETL_DEV
+  tag: '*'

+ 12 - 4
etl-server/src/main/resources/application-prod.yml

@@ -21,8 +21,16 @@ longvideo:
   feign:
     url: longvideoapi-internal.piaoquantv.com
 
+#rocketmq:
+#  crawler:
+#    etl:
+#      topic: topic_crawler_etl_prod
+#      groupid: GID_CRAWLER_ETL_PROD
+
 rocketmq:
-  crawler:
-    etl:
-      topic: topic_crawler_etl_prod
-      groupid: GID_CRAWLER_ETL_PROD
+  accessKey: LTAI4G7puhXtLyHzHQpD6H7A
+  secretKey: nEbq3xWNQd1qLpdy2u71qFweHkZjSG
+  nameSrvAddr: http://MQ_INST_1894469520484605_BXhXuzkZ.mq-internet-access.mq-internet.aliyuncs.com:80
+  topic: topic_crawler_etl_prod
+  groupId: GID_CRAWLER_ETL_PROD
+  tag: '*'

+ 12 - 4
etl-server/src/main/resources/application-test.yml

@@ -38,8 +38,16 @@ feign:
         # FULL 记录请求和响应的头、正文和元数据
         logger-level: FULL
 
+#rocketmq:
+#  crawler:
+#    etl:
+#      topic: topic_crawler_etl_test
+#      groupid: GID_CRAWLER_ETL_TEST
+
 rocketmq:
-  crawler:
-    etl:
-      topic: topic_crawler_etl_test
-      groupid: GID_CRAWLER_ETL_TEST
+  accessKey: LTAI4G7puhXtLyHzHQpD6H7A
+  secretKey: nEbq3xWNQd1qLpdy2u71qFweHkZjSG
+  nameSrvAddr: http://MQ_INST_1894469520484605_BXhXuzkZ.mq-internet-access.mq-internet.aliyuncs.com:80
+  topic: topic_crawler_etl_test
+  groupId: GID_CRAWLER_ETL_TEST
+  tag: '*'

+ 6 - 0
pom.xml

@@ -126,6 +126,12 @@
             <version>1.0.3.2</version>
             <classifier>jar-with-dependencies</classifier>
         </dependency>
+        <dependency>
+            <groupId>com.aliyun.openservices</groupId>
+            <artifactId>ons-client</artifactId>
+            <!--建议替换为Java SDK的最新版本号-->
+            <version>2.0.6.Final</version>
+        </dependency>
         <dependency>
             <groupId>commons-collections</groupId>
             <artifactId>commons-collections</artifactId>