Explorar o código

fixed task loss when priority queue is full

ehlxr %!s(int64=2) %!d(string=hai) anos
pai
achega
57a3857cc3

+ 38 - 0
etl-server/src/main/java/com/tzld/crawler/etl/controller/IndexController.java

@@ -1,13 +1,19 @@
 package com.tzld.crawler.etl.controller;
 
 import com.alibaba.fastjson.JSONObject;
+import com.aliyun.mq.http.MQClient;
+import com.aliyun.mq.http.MQProducer;
+import com.aliyun.mq.http.model.TopicMessage;
 import com.tzld.crawler.etl.model.vo.CrawlerEtlParam;
 import com.tzld.crawler.etl.service.EtlService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.web.bind.annotation.*;
 
+import java.util.stream.IntStream;
+
 /**
  * @author ehlxr
  */
@@ -18,6 +24,19 @@ public class IndexController {
     @Autowired
     private EtlService etlService;
 
+    @Value("${rocketmq.accessKey:}")
+    private String accessKey;
+    @Value("${rocketmq.secretKey:}")
+    private String secretKey;
+    @Value("${rocketmq.httpEndpoint:}")
+    private String httpEndpoint;
+    @Value("${rocketmq.instanceId:}")
+    private String instanceId;
+    @Value("${rocketmq.crawler.etl.topic:}")
+    private String topic;
+    @Value("${rocketmq.crawler.etl.groupid:}")
+    private String groupId;
+
     /**
      * 探活
      *
@@ -39,4 +58,23 @@ public class IndexController {
         etlService.deal(param);
         return "ok";
     }
+
+    @PostMapping("/sendMsg")
+    public String sendMsg() {
+        MQClient mqClient = new MQClient(httpEndpoint, accessKey, secretKey);
+        MQProducer producer = mqClient.getProducer(instanceId, topic);
+
+        String msg = "{\"userId\":6282865,\"outUserId\":\"51101170\",\"platform\":\"douyin\",\"strategy\":\"search\",\"outVideoId\":\"ppp212122fdf22\",\"videoTitle\":\"早上起床前念四句话,身体有可能变得更健康\",\"coverUrl\":\"https://cdn-xphoto2.xiaoniangao.cn/5111511988@690w_385h_0e_1pr_0r.jpg\",\"videoUrl\":\"http://v9-xg-web-pc.ixigua.com/c8347a501c9f589ff862d459249aa1eb/635fff\",\"audioUrl\":\"http://cdn-xalbum-baishan.xiaoniangao.cn/5111511797?Expires=1704038400\"}";
+
+        IntStream.range(0, 10000).parallel().forEach((i) -> {
+            TopicMessage pubMsg = new TopicMessage(msg.getBytes());
+            pubMsg.setMessageKey("test_" + i);
+            TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
+            LOGGER.info("Send mq message: id={} success. Topic is: {}, msgId is: {}",
+                    i, topic, pubResultMsg.getMessageId());
+        });
+
+
+        return "ok";
+    }
 }

+ 45 - 0
etl-server/src/main/resources/application-test.yml

@@ -0,0 +1,45 @@
+server:
+  port: 8081
+
+spring:
+  datasource:
+    url: jdbc:mysql://rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com:3306/piaoquan-crawler?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false
+    username: crawler
+    password: crawler123456@
+
+apollo:
+  meta: http://devapolloconfig-internal.piaoquantv.com
+
+aliyun:
+  log:
+    project: crawler-log-dev
+    logstore:
+      crawler: crawler-log-dev
+      metric: etl-metric
+
+longvideo:
+  feign:
+    url: videotest-internal.yishihui.com
+
+logging:
+  level:
+    com:
+      tzld:
+        crawler:
+          etl: info
+
+feign:
+  client:
+    config:
+      default:
+        # NONE 没有日志(默认)
+        # BASIC 只记录请求方法和 URL 以及响应状态码和执行时间
+        # HEADERS 记录基本信息以及请求和响应头
+        # FULL 记录请求和响应的头、正文和元数据
+        logger-level: FULL
+
+rocketmq:
+  crawler:
+    etl:
+      topic: topic_crawler_etl_test
+      groupid: GID_CRAWLER_ETL_TEST