supeng 3 дней назад
Родитель
Сommit
72f7e5aa46

+ 1 - 0
etl-core/src/main/java/com/tzld/crawler/etl/enums/MetricTypeEnum.java

@@ -31,6 +31,7 @@ import com.fasterxml.jackson.annotation.JsonValue;
  * @since 2023-07-18 14:32.
  */
 public enum MetricTypeEnum {
+    KEY_DUPLICATED("keyDuplicated", "messageKey重复"),
     PARAM_INVALID("paramInvalid", "参数无效"),
     SAVE_DB_FAILED("saveDBFailed", "入库失败"),
     STRATEGY_FAILED("strategyFailed", "执行策略失败"),

+ 18 - 2
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -31,8 +31,11 @@ import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;
 import com.tzld.crawler.etl.common.base.CacheConstant;
 import com.tzld.crawler.etl.common.base.Constant;
+import com.tzld.crawler.etl.enums.MetricTypeEnum;
+import com.tzld.crawler.etl.model.dto.MetricLogDto;
 import com.tzld.crawler.etl.model.vo.CrawlerEtlParam;
 import com.tzld.crawler.etl.service.EtlService;
+import com.tzld.crawler.etl.service.SlsService;
 import com.tzld.crawler.etl.util.RedisUtil;
 import okhttp3.Cache;
 import org.slf4j.Logger;
@@ -80,6 +83,9 @@ public class EtlMQConsumer {
 
     private MQConsumer consumer;
 
+    @Autowired
+    private SlsService slsService;
+
     public EtlMQConsumer(EtlService etlService) {
         this.etlService = etlService;
     }
@@ -113,13 +119,14 @@ public class EtlMQConsumer {
                     log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
                     String messageId = message.getMessageId();
                     String messageKey = message.getMessageKey();
+                    CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
+                    param.setMessageId(messageId);
                     String key = String.format(DEDUP_KEY, messageKey);
                     if (!redisUtil.setNx(key, "1", CacheConstant.TWENTY_HOUR_SECOND)) {
+                        metric(MetricTypeEnum.PARAM_INVALID, JSONObject.toJSONString(message), param);
                         log.info("dedup message: {} from topic: {}, group: {} messageId: {} messageKey = {}", message, topic, groupId, messageId, messageKey);
                         continue;
                     }
-                    CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
-                    param.setMessageId(messageId);
                     etlService.deal(param);
                     log.info("Deal done of message: {} from topic: {}, group: {}", message, topic, groupId);
                 } catch (Exception e) {
@@ -163,4 +170,13 @@ public class EtlMQConsumer {
 //            }
         } while (true);
     }
+
+    private void metric(MetricTypeEnum typeEnum, Object data, CrawlerEtlParam param) {
+        slsService.metric(MetricLogDto.newBuiler()
+                .platform(param.getPlatform())
+                .strategy(param.getStrategy())
+                .type(typeEnum)
+                .data(data)
+                .msg(param.toString()).build());
+    }
 }