|
@@ -71,7 +71,7 @@ public class EtlMQConsumer {
|
|
|
@Value("${consumer.thread.size:32}")
|
|
|
private Integer threadSize;
|
|
|
/**
|
|
|
- * crawler:etl:dedup:{messageId}
|
|
|
+ * crawler:etl:dedup:{messageKey}
|
|
|
*/
|
|
|
private static final String DEDUP_KEY = "crawler:etl:dedup:%s";
|
|
|
|
|
@@ -112,13 +112,14 @@ public class EtlMQConsumer {
|
|
|
try {
|
|
|
log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
|
|
|
String messageId = message.getMessageId();
|
|
|
- String key = String.format(DEDUP_KEY, messageId);
|
|
|
+ String messageKey = message.getMessageKey();
|
|
|
+ String key = String.format(DEDUP_KEY, messageKey);
|
|
|
if (!redisUtil.setNx(key, "1", CacheConstant.ONE_DAY)) {
|
|
|
- log.info("dedup message: {} from topic: {}, group: {} messageId: {}", message, topic, groupId);
|
|
|
+ log.info("dedup message: {} from topic: {}, group: {} messageId: {} messageKey = {}", message, topic, groupId, messageId, messageKey);
|
|
|
continue;
|
|
|
}
|
|
|
CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
|
|
|
- param.setMessageId(message.getMessageId());
|
|
|
+ param.setMessageId(messageId);
|
|
|
etlService.deal(param);
|
|
|
log.info("Deal done of message: {} from topic: {}, group: {}", message, topic, groupId);
|
|
|
} catch (Exception e) {
|