Ver código fonte

add mq dedup

supeng 9 meses atrás
pai
commit
e8ce9ade09

+ 73 - 0
etl-core/src/main/java/com/tzld/crawler/etl/common/base/CacheConstant.java

@@ -0,0 +1,73 @@
+package com.tzld.crawler.etl.common.base;
+
+/**
+ * 缓存
+ *
+ * @author supeng
+ */
+public class CacheConstant {
+    /**
+     * 1分钟 毫秒
+     */
+    public static final long ONE_MINUTE_MILLS = 60 * 1000;
+    /**
+     * 1分钟 秒
+     */
+    public static final long ONE_MINUTE_SECOND = 60;
+    /**
+     * 5分钟 毫秒
+     */
+    public static final long FIVE_MINUTE_MILLS = 5 * ONE_MINUTE_MILLS;
+    /**
+     * 5分钟 秒
+     */
+    public static final long FIVE_MINUTE = 5 * 60;
+    /**
+     * 30分钟 毫秒
+     */
+    public static final long HALF_HOUR_MILLS = 30 * ONE_MINUTE_MILLS;
+    /**
+     * 1小时 毫秒
+     */
+    public static final long ONE_HOUR_MILLS = 60 * ONE_MINUTE_MILLS;
+    /**
+     * 1小时 秒
+     */
+    public static final long ONE_HOUR_SECOND = 60 * ONE_MINUTE_SECOND;
+    /**
+     * 1天 秒
+     */
+    public static final long ONE_DAY = 24 * 60 * 60;
+    /**
+     * 1天 毫秒
+     */
+    public static final long ONE_DAY_MILLS = 24 * ONE_HOUR_MILLS;
+    /**
+     * 7 天 秒
+     */
+    public static final long SEVEN_DAY = 7 * ONE_DAY;
+    /**
+     * 7天 毫秒
+     */
+    public static final long SEVEN_DAY_MILLS = 7 * ONE_DAY_MILLS;
+    /**
+     * 10天 毫秒
+     */
+    public static final long TEN_DAY_MILLS = 10 * ONE_DAY_MILLS;
+    /**
+     * 10 天 秒
+     */
+    public static final long TEN_DAY = 10 * ONE_DAY;
+    /**
+     * 1年 毫秒
+     */
+    public static final long ONE_YEAR_MILLS = 365 * ONE_DAY_MILLS;
+    /**
+     * 30天 秒
+     */
+    public static final long THIRTY_DAY = 30 * ONE_DAY;
+    /**
+     * 180天 秒
+     */
+    public static final long HALF_YEAR = 180 * ONE_DAY;
+}

+ 64 - 0
etl-core/src/main/java/com/tzld/crawler/etl/config/RedisTemplateConfig.java

@@ -0,0 +1,64 @@
+package com.tzld.crawler.etl.config;
+
+import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
+import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
+import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+/**
+ * @author supeng
+ * @date 2020/11/10
+ */
+@Configuration
+public class RedisTemplateConfig {
+
+    @Bean
+    @ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
+    public GenericObjectPoolConfig<LettucePoolingClientConfiguration> redisPool() {
+        return new GenericObjectPoolConfig<>();
+    }
+
+    @Bean
+    @ConfigurationProperties(prefix = "spring.redis")
+    public RedisStandaloneConfiguration redisConfig() {
+        return new RedisStandaloneConfiguration();
+    }
+
+    @Bean("factory")
+    @Primary
+    public LettuceConnectionFactory factory(GenericObjectPoolConfig<LettucePoolingClientConfiguration> config, RedisStandaloneConfiguration redisConfig) {
+        LettuceClientConfiguration lettuceClientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(config).build();
+        return new LettuceConnectionFactory(redisConfig, lettuceClientConfiguration);
+    }
+
+    @Bean(name = "redisTemplate")
+    public RedisTemplate<String, String> getRedisTemplate(@Qualifier("factory") RedisConnectionFactory factory) {
+        return buildRedisTemplateByString(factory);
+    }
+
+    /**
+     * 构建redisTemplate 使用string序列化
+     *
+     * @param factory
+     * @return
+     */
+    public RedisTemplate<String, String> buildRedisTemplateByString(RedisConnectionFactory factory) {
+        RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
+        redisTemplate.setConnectionFactory(factory);
+        // key的序列化类型 保证可读性
+        redisTemplate.setKeySerializer(new StringRedisSerializer());
+        redisTemplate.setValueSerializer(new StringRedisSerializer());
+        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
+        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
+        return redisTemplate;
+    }
+}

+ 17 - 0
etl-core/src/main/java/com/tzld/crawler/etl/mq/EtlMQConsumer.java

@@ -29,10 +29,15 @@ import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;
+import com.tzld.crawler.etl.common.base.CacheConstant;
+import com.tzld.crawler.etl.common.base.Constant;
 import com.tzld.crawler.etl.model.vo.CrawlerEtlParam;
 import com.tzld.crawler.etl.service.EtlService;
+import com.tzld.crawler.etl.util.RedisUtil;
+import okhttp3.Cache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
@@ -65,6 +70,13 @@ public class EtlMQConsumer {
     private String groupId;
     @Value("${consumer.thread.size:32}")
     private Integer threadSize;
+    /**
+     * crawler:etl:dedup:{messageId}
+     */
+    private static final String DEDUP_KEY = "crawler:etl:dedup:%s";
+
+    @Autowired
+    RedisUtil redisUtil;
 
     private MQConsumer consumer;
 
@@ -99,6 +111,11 @@ public class EtlMQConsumer {
             for (Message message : messages) {
                 try {
                     log.info("Receive message: {} from topic: {}, group: {}", message, topic, groupId);
+                    String messageId = message.getMessageId();
+                    String key = String.format(DEDUP_KEY, messageId);
+                    if (!redisUtil.setNx(key, "1", CacheConstant.ONE_DAY)) {
+
+                    }
                     CrawlerEtlParam param = JSONObject.parseObject(message.getMessageBodyString(), CrawlerEtlParam.class);
                     param.setMessageId(message.getMessageId());
                     etlService.deal(param);

+ 934 - 0
etl-core/src/main/java/com/tzld/crawler/etl/util/RedisUtil.java

@@ -0,0 +1,934 @@
+package com.tzld.crawler.etl.util;
+
+import org.apache.ibatis.cache.CacheException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.core.Cursor;
+import org.springframework.data.redis.core.RedisCallback;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.ScanOptions;
+import org.springframework.data.redis.core.ZSetOperations.TypedTuple;
+import org.springframework.data.redis.core.script.DefaultRedisScript;
+import org.springframework.scripting.support.StaticScriptSource;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * redis 操作
+ *
+ * @author supeng
+ */
+@Component
+public class RedisUtil {
+    /**
+     * 锁前缀
+     */
+    private static final String LOCK_PREFIX = "LOCK_";
+    /**
+     * 默认重试次数
+     */
+    private static final Integer DEFAULT_RETRIES = 1;
+    /**
+     * 默认 10毫秒
+     */
+    private static final Long DEFAULT_INTERVAL = 10L;
+
+    /**
+     * 加锁lua脚本
+     */
+    private static final String LOCK = "if (redis.call('exists', KEYS[1]) == 0) then " +
+            "redis.call('hset', KEYS[1], ARGV[2], 1); " +
+            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
+            "return 1; " +
+            "end; " +
+            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
+            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
+            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
+            "return 1; " +
+            "end; " +
+            "return 0;";
+
+    /**
+     * 解锁 lua 脚本
+     */
+    public static final String UNLOCK = "if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " +
+            "return nil; " +
+            "end; " +
+            "local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1); " +
+            "if (counter > 0) then " +
+            "return 0; " +
+            "else " +
+            "redis.call('del', KEYS[1]); " +
+            "return 1; " +
+            "end; " +
+            "return nil;";
+    @Qualifier("redisTemplate")
+    @Autowired
+    RedisTemplate<String, String> redisTemplate;
+
+    /** ================Key相关操作================ */
+
+    /**
+     * 是否存在key
+     *
+     * @param key
+     * @return
+     */
+    public Boolean hasKey(String key) {
+        return redisTemplate.hasKey(key);
+    }
+
+    /**
+     * scan
+     *
+     * @param matchKey
+     * @return
+     */
+    public Set<String> scan(String matchKey) {
+        Set<String> keys = redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
+            Set<String> keysTmp = new HashSet<>();
+            Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder().match("*" + matchKey + "*").count(1000).build());
+            while (cursor.hasNext()) {
+                keysTmp.add(new String(cursor.next()));
+            }
+            return keysTmp;
+        });
+        return keys;
+    }
+
+    /**
+     * 删除 key
+     *
+     * @param key
+     * @return
+     */
+    public Boolean deleteKey(String key) {
+        return redisTemplate.delete(key);
+    }
+
+    /**
+     * 删除多个key
+     *
+     * @param keys
+     * @return
+     */
+    public Long deleteKeys(Collection<String> keys) {
+        return redisTemplate.delete(keys);
+    }
+
+    /**
+     * 设置key的过期时间
+     *
+     * @param key
+     * @param timeout
+     * @return
+     */
+    public Boolean expire(String key, long timeout) {
+        if (timeout < 0) {
+            return false;
+        }
+        return redisTemplate.expire(key, timeout, TimeUnit.SECONDS);
+    }
+
+    /**
+     * 获取key剩余的过期时间,秒
+     *
+     * @param key
+     * @return key不存在,返回-2,key存在并且没有设置过期时间(永久有效),返回 -1
+     */
+    public Long getExpire(String key) {
+        return redisTemplate.getExpire(key, TimeUnit.SECONDS);
+    }
+
+    /** ================String相关操作================ */
+
+    /**
+     * 获取 值
+     *
+     * @param key
+     * @return
+     */
+    public String get(String key) {
+        return redisTemplate.opsForValue().get(key);
+    }
+
+    /**
+     * 获取多个值
+     *
+     * @param keys
+     * @return
+     */
+    public List<String> multiGet(Collection<String> keys) {
+        if (keys != null && keys.size() > 200) {
+            throw new CacheException("too many keys, max 200");
+        }
+        return redisTemplate.opsForValue().multiGet(keys);
+    }
+
+    /**
+     * set 值
+     *
+     * @param key
+     * @param value
+     */
+    public void set(String key, String value) {
+        redisTemplate.opsForValue().set(key, value);
+    }
+
+    /**
+     * set 值并指定过期时间
+     *
+     * @param key
+     * @param value
+     * @param timeout
+     */
+    public void set(String key, String value, long timeout) {
+        redisTemplate.opsForValue().set(key, value, timeout, TimeUnit.SECONDS);
+    }
+
+    /**
+     * 设置String缓存值,只有key不存在时才能设置成功
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    public Boolean setNx(String key, String value) {
+        return redisTemplate.opsForValue().setIfAbsent(key, value);
+    }
+
+    /**
+     * 设置String缓存值并指定过期时间,只有key不存在时才能设置成功
+     *
+     * @param key
+     * @param value
+     * @param timeout
+     * @return
+     */
+    public Boolean setNx(String key, String value, long timeout) {
+        return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.SECONDS);
+    }
+
+    /**
+     * getAndSet 值
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    public String getAndSet(String key, String value) {
+        return redisTemplate.opsForValue().getAndSet(key, value);
+    }
+
+    /**
+     * key的值 +1
+     *
+     * @param key
+     * @return
+     * @version 1.0
+     */
+    public Long incr(String key) {
+        return redisTemplate.opsForValue().increment(key);
+    }
+
+    /**
+     * key的值 +delta
+     *
+     * @param key
+     * @param delta
+     * @return
+     * @version 1.0
+     */
+    public Long incrBy(String key, long delta) {
+        return redisTemplate.opsForValue().increment(key, delta);
+    }
+
+    public Long incrBy(String key, long delta, long timeout) {
+        try {
+            Long result = redisTemplate.opsForValue().increment(key, delta);
+            redisTemplate.expire(key, timeout, TimeUnit.SECONDS);
+            return result;
+        } catch (Exception e) {
+
+        }
+        return null;
+    }
+
+    /**
+     * key的值 -1
+     *
+     * @param key
+     * @return
+     * @version 1.0
+     */
+    public Long decr(String key) {
+        return redisTemplate.opsForValue().decrement(key);
+    }
+
+    /**
+     * key的值 -delta
+     *
+     * @param key
+     * @param delta
+     * @return
+     * @version 1.0
+     */
+    public Long desrBy(String key, long delta) {
+        return redisTemplate.opsForValue().decrement(key, delta);
+    }
+
+    /** ================Hashes相关操作================ */
+
+    /**
+     * 是否存在 hashkey
+     *
+     * @param key
+     * @param hashKey
+     * @return
+     */
+    public boolean hasHashKey(String key, String hashKey) {
+        return redisTemplate.opsForHash().hasKey(key, hashKey);
+    }
+
+    /**
+     * 获取hash某个字段的值
+     *
+     * @param key
+     * @param hashKey
+     * @return
+     */
+    public Object hget(String key, String hashKey) {
+        return redisTemplate.opsForHash().get(key, hashKey);
+    }
+
+    /**
+     * 获取hash所有键值(hashkey超过200个时禁止使用)
+     *
+     * @param key
+     * @return
+     */
+    public Map<Object, Object> hgetAll(String key) {
+        return redisTemplate.opsForHash().entries(key);
+    }
+
+    /**
+     * 获取hash对应多个字段的值
+     *
+     * @param key
+     * @param hashKeys
+     * @return
+     */
+    public List<Object> hmget(String key, List<Object> hashKeys) {
+        return redisTemplate.opsForHash().multiGet(key, hashKeys);
+    }
+
+    /**
+     * 设置hash中某个字段的值,如果不存在将创建
+     *
+     * @param key
+     * @param hashKey
+     * @param value
+     */
+    public void hset(String key, String hashKey, String value) {
+        redisTemplate.opsForHash().put(key, hashKey, value);
+    }
+
+    /**
+     * 设置hash中某个字段的值并指定key的过期时间,如果不存在将创建
+     *
+     * @param key
+     * @param hashKey
+     * @param value
+     * @param timeout 单位:秒
+     */
+    public void hset(String key, String hashKey, String value, long timeout) {
+        redisTemplate.opsForHash().put(key, hashKey, value);
+        expire(key, timeout);
+    }
+
+    /**
+     * 设置hash中某个字段的值,只有当这个字段不存在时才能设置成功
+     *
+     * @param key
+     * @param hashKey
+     * @param value
+     * @return
+     */
+    public Boolean hsetNx(String key, String hashKey, String value) {
+        return redisTemplate.opsForHash().putIfAbsent(key, hashKey, value);
+    }
+
+    /**
+     * 删除hash中的多个字段
+     *
+     * @param key
+     * @param hashKey
+     */
+    public void hdel(String key, Object... hashKey) {
+        redisTemplate.opsForHash().delete(key, hashKey);
+    }
+
+    /**
+     * 将hash中的某个字段值增加delta
+     *
+     * @param key
+     * @param hashKey
+     * @param delta
+     * @return
+     */
+    public Long hincrBy(String key, String hashKey, long delta) {
+        return redisTemplate.opsForHash().increment(key, hashKey, delta);
+    }
+
+    public Long hincrBy(String key, String hashKey, long delta, long timeout) {
+        try {
+            Long result = redisTemplate.opsForHash().increment(key, hashKey, delta);
+            redisTemplate.expire(key, timeout, TimeUnit.SECONDS);
+            return result;
+        } catch (Exception e){
+
+        }
+        return null;
+    }
+
+
+    /** ================Lists相关操作================ */
+
+    /**
+     * 返回list中指定范围的元素,第一个元素为0,最后一个元素为-1,倒数第二个为-2,以此类推。
+     *
+     * @param key
+     * @param start
+     * @param end
+     * @return
+     */
+    public List<String> lrange(String key, long start, long end) {
+        return redisTemplate.opsForList().range(key, start, end);
+    }
+
+    /**
+     * 获取list的长度
+     *
+     * @param key
+     * @return
+     */
+    public Long llen(String key) {
+        return redisTemplate.opsForList().size(key);
+    }
+
+    /**
+     * 获取list中index索引的元素
+     *
+     * @param key
+     * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
+     * @return
+     */
+    public String lindex(String key, long index) {
+        return redisTemplate.opsForList().index(key, index);
+    }
+
+    /**
+     * 向list的尾部插入指定的元素
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    public Long rpush(String key, String value) {
+        return redisTemplate.opsForList().rightPush(key, value);
+    }
+
+    /**
+     * 向list的尾部插入多个元素
+     *
+     * @param key
+     * @param values
+     * @return
+     */
+    public Long rpushAll(String key, Collection<String> values) {
+        return redisTemplate.opsForList().rightPushAll(key, values);
+    }
+
+    /**
+     * 移除并返回list尾部的元素
+     *
+     * @param key
+     * @return
+     */
+    public String rpop(String key) {
+        return redisTemplate.opsForList().rightPop(key);
+    }
+
+    /**
+     * 向list的头部插入指定的元素
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    public Long lpush(String key, String value) {
+        return redisTemplate.opsForList().leftPush(key, value);
+    }
+
+    /**
+     * 向list的头部插入多个元素
+     *
+     * @param key
+     * @param values
+     * @return
+     */
+    public Long lpushAll(String key, Collection<String> values) {
+        return redisTemplate.opsForList().leftPushAll(key, values);
+    }
+
+    /**
+     * 移除并返回list头部的元素
+     *
+     * @param key
+     * @return
+     */
+    public String lpop(String key) {
+        return redisTemplate.opsForList().leftPop(key);
+    }
+
+    /**
+     * 修改list中index索引对应元素的值
+     *
+     * @param key
+     * @param index
+     * @param value
+     */
+    public void lset(String key, long index, String value) {
+        redisTemplate.opsForList().set(key, index, value);
+    }
+
+    /**
+     * 从list中移除前count个出现值为value的元素
+     *
+     * @param key
+     * @param count count > 0: 从头往尾移除值为 value 的元素。</br>
+     *              count < 0: 从尾往头移除值为 value 的元素。</br>
+     *              count = 0: 移除所有值为 value 的元素。
+     * @param value
+     * @return
+     */
+    public Long lrem(String key, long count, String value) {
+        return redisTemplate.opsForList().remove(key, count, value);
+    }
+
+    /**
+     * lpop多个数据
+     *
+     * @param key
+     * @param count
+     * @return
+     */
+    public List<Object> lpopMutil(String key, long count) {
+        List<Object> result = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
+            connection.lRange(key.getBytes(), 0, count - 1);
+            connection.lTrim(key.getBytes(), count, -1);
+            return null;
+        });
+        return result;
+    }
+
+    /**
+     * rpop多个数据
+     *
+     * @param key
+     * @param start 起始位置  end=-1
+     * @return
+     */
+    public List<Object> rpopMutil(String key, long start) {
+        List<Object> result = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
+            connection.lRange(key.getBytes(), start, -1);
+            connection.lTrim(key.getBytes(), 0, start - 1);
+            return null;
+        });
+        return result;
+    }
+
+    /**
+     * 从右取多个值
+     *
+     * @param key
+     * @param count 个数
+     * @return
+     */
+    public List<Object> rpopMutil(String key, int count) {
+        List<Object> result = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
+            connection.lRange(key.getBytes(), 0 - count, -1);
+            connection.lTrim(key.getBytes(), 0, -1 - count);
+            return null;
+        });
+        return result;
+    }
+
+    /** ================Sets相关操作================ */
+
+    /**
+     * set集合中是否存在值为value的元素
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    public boolean sIsMember(String key, String value) {
+        return redisTemplate.opsForSet().isMember(key, value);
+    }
+
+    /**
+     * 返回set集合所有元素,禁止直接使用members,通过scan获取
+     *
+     * @param key
+     * @return
+     */
+    public Set<String> sscan(String key) {
+        Set<String> result = new HashSet<>();
+        Cursor<String> cursor = redisTemplate.opsForSet().scan(key, ScanOptions.NONE);
+        while (cursor != null && cursor.hasNext()) {
+            result.add(cursor.next());
+        }
+        return result;
+    }
+
+    /**
+     * 向set集合添加多个元素
+     *
+     * @param key
+     * @param values
+     * @return
+     */
+    public Long sadd(String key, String... values) {
+        return redisTemplate.opsForSet().add(key, values);
+    }
+
+    /**
+     * 返回set集合的元素个数
+     *
+     * @param key
+     * @return
+     */
+    public Long scard(String key) {
+        return redisTemplate.opsForSet().size(key);
+    }
+
+    /**
+     * 从set中移除值为value的元素
+     *
+     * @param key
+     * @param values
+     * @return
+     */
+    public Long srem(String key, Object... values) {
+        return redisTemplate.opsForSet().remove(key, values);
+    }
+
+    /** ================Sorted Sets相关操作================ */
+
+    /**
+     * 向zset集合添加一个元素,或者更新已存在元素的分数
+     *
+     * @param key
+     * @param vaule
+     * @param score
+     */
+    public void zadd(String key, String vaule, double score) {
+        redisTemplate.opsForZSet().add(key, vaule, score);
+    }
+
+    /**
+     * 向zset集合添加一个元素,或者更新已存在元素的分数,并更新key的过期时间
+     *
+     * @param key
+     * @param vaule
+     * @param score
+     */
+    public void zadd(String key, String vaule, double score, long timeout) {
+        redisTemplate.opsForZSet().add(key, vaule, score);
+        expire(key, timeout);
+    }
+
+    /**
+     * 获取某个元素的分数
+     *
+     * @param key
+     * @param value
+     * @return
+     */
+    public Double zscore(String key, String value) {
+        return redisTemplate.opsForZSet().score(key, value);
+    }
+
+    /**
+     * 获取zset集合的元素数量
+     *
+     * @param key
+     * @return
+     */
+    public Long zcard(String key) {
+        return redisTemplate.opsForZSet().zCard(key);
+    }
+
+    /**
+     * zset 范围内元素个数
+     *
+     * @param key
+     * @param min
+     * @param max
+     * @return
+     */
+    public Long zcount(String key, double min, double max) {
+        return redisTemplate.opsForZSet().count(key, min, max);
+    }
+
+    /**
+     * 通过索引区间返回zset集合成指定区间内的元素,分数从小到大排序
+     *
+     * @param key
+     * @param start
+     * @param end
+     * @return
+     */
+    public Set<String> zrange(String key, long start, long end) {
+        return redisTemplate.opsForZSet().range(key, start, end);
+    }
+
+    /**
+     * 通过索引区间返回zset集合成指定区间内的元素,分数从大到小排序
+     *
+     * @param key
+     * @param start
+     * @param end
+     * @return
+     */
+    public Set<String> zreverseRange(String key, long start, long end) {
+        return redisTemplate.opsForZSet().reverseRange(key, start, end);
+    }
+
+    /**
+     * 通过索引区间返回zset集合成指定区间内的元素及分数
+     *
+     * @param key
+     * @param start
+     * @param end
+     * @return
+     */
+    public List<Map<String, Double>> zrangeWithScores(String key, long start, long end) {
+        List<Map<String, Double>> result = new ArrayList<>();
+        Set<TypedTuple<String>> cursor = redisTemplate.opsForZSet().rangeWithScores(key, start, end);
+        for (Iterator<TypedTuple<String>> it = cursor.iterator(); it.hasNext(); ) {
+            TypedTuple<String> item = it.next();
+            Map<String, Double> map = new HashMap<>();
+            map.put(item.getValue(), item.getScore());
+            result.add(map);
+        }
+        return result;
+    }
+
+    /**
+     * 通过索引区间返回zset集合成指定区间内的元素及分数
+     *
+     * @param key
+     * @param start
+     * @param end
+     * @return
+     */
+    public List<Map<String, Object>> zrangeWithScoresToList(String key, long start, long end) {
+        List<Map<String, Object>> result = new ArrayList<>();
+        Set<TypedTuple<String>> cursor = redisTemplate.opsForZSet().rangeWithScores(key, start, end);
+        for (Iterator<TypedTuple<String>> it = cursor.iterator(); it.hasNext(); ) {
+            TypedTuple<String> item = it.next();
+            Map<String, Object> map = new HashMap<>();
+            map.put("value", item.getValue());
+            map.put("score", item.getScore());
+            result.add(map);
+        }
+        return result;
+    }
+
+    /**
+     * 通过索引区间返回zset集合成指定区间内的元素及分数
+     * @param key
+     * @param start
+     * @param end
+     * @return map
+     */
+    public Map<String, Double> zrangeWithScoresToMap(String key, long start, long end) {
+        Map<String, Double> result = new HashMap<>();
+        Set<TypedTuple<String>> cursor = redisTemplate.opsForZSet().rangeWithScores(key, start, end);
+        for (Iterator<TypedTuple<String>> it = cursor.iterator(); it.hasNext(); ) {
+            TypedTuple<String> item = it.next();
+            result.put(item.getValue(), item.getScore());
+        }
+        return result;
+    }
+
+    /**
+     * 迭代zset集合中的元素(只包括元素成员)
+     *
+     * @param key
+     * @return
+     */
+    public Set<String> zscanValues(String key) {
+        Set<String> result = new HashSet<>();
+        Cursor<TypedTuple<String>> cursor = redisTemplate.opsForZSet().scan(key, ScanOptions.NONE);
+        while (cursor.hasNext()) {
+            TypedTuple<String> item = cursor.next();
+            result.add(item.getValue());
+        }
+        return result;
+    }
+
+    /**
+     * 迭代zset集合中的元素(包括元素和分数)
+     *
+     * @param key
+     * @return
+     */
+    public List<Map<String, Double>> zscan(String key) {
+        List<Map<String, Double>> result = new ArrayList<Map<String, Double>>();
+        Cursor<TypedTuple<String>> cursor = redisTemplate.opsForZSet().scan(key, ScanOptions.NONE);
+        while (cursor.hasNext()) {
+            TypedTuple<String> item = cursor.next();
+            Map<String, Double> map = new HashMap<String, Double>();
+            map.put(item.getValue(), item.getScore());
+            result.add(map);
+        }
+        return result;
+
+    }
+
+    /**
+     * 移除zset集合中的元素
+     *
+     * @param key
+     * @param values
+     */
+    public void zRemove(String key, Object... values) {
+        redisTemplate.opsForZSet().remove(key, values);
+    }
+
+    /**
+     * 移除zset集合指定范围数据
+     *
+     * @param key
+     * @param start
+     * @param end
+     */
+    public void zRemRange(String key, long start, long end) {
+        redisTemplate.opsForZSet().removeRange(key, start, end);
+    }
+
+
+    /**
+     * 删除
+     *
+     * @param key
+     * @param min
+     * @param max
+     */
+    public void zRemRangeByScore(String key, double min, double max) {
+        redisTemplate.opsForZSet().removeRangeByScore(key, min, max);
+    }
+
+    /**
+     * 加锁 不可重入
+     *
+     * @param key
+     * @param timeout 毫秒
+     * @return
+     */
+    public Boolean lock(String key, long timeout) {
+        return redisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + key, "1", timeout, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * 解锁 不可重入
+     *
+     * @param key
+     * @return
+     */
+    public Boolean unlock(String key) {
+        return redisTemplate.delete(LOCK_PREFIX + key);
+    }
+
+    /**
+     * 加锁
+     *
+     * @param key         key
+     * @param reentrantId 重入Id
+     * @param timeout     超时时间 毫秒ms
+     * @return
+     */
+    public Boolean lock(String key, String reentrantId, long timeout) {
+        return lock(key, reentrantId, timeout, DEFAULT_RETRIES);
+    }
+
+    /**
+     * 加锁
+     *
+     * @param key         key
+     * @param reentrantId 重入Id
+     * @param timeout     超时时间 毫秒ms
+     * @param retries     重试次数
+     * @return
+     */
+    public Boolean lock(String key, String reentrantId, long timeout, int retries) {
+        return lock(key, reentrantId, timeout, retries, DEFAULT_INTERVAL);
+    }
+
+    /**
+     * 加锁
+     *
+     * @param key         key
+     * @param reentrantId 重入Id
+     * @param timeout     超时时间 毫秒ms
+     * @param retries     重试次数
+     * @param interval    每次重试间隔时间 毫秒
+     * @return
+     */
+    public Boolean lock(String key, String reentrantId, long timeout, int retries, long interval) {
+        String lockKey = LOCK_PREFIX + key;
+        DefaultRedisScript<Long> script = new DefaultRedisScript<>();
+        script.setResultType(Long.class);
+        script.setScriptSource(new StaticScriptSource(LOCK));
+        for (int i = 0; i < retries; i++) {
+            Object result = redisTemplate.execute(script, Arrays.asList(lockKey), String.valueOf(timeout), reentrantId);
+            if (Objects.nonNull(result) && Objects.equals(1L, Long.valueOf(result.toString()))) {
+                return true;
+            }
+            try {
+                TimeUnit.MILLISECONDS.sleep(interval);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+                return null;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * 解锁
+     *
+     * @param key         key
+     * @param reentrantId 重入ID
+     */
+    public Boolean unlock(String key, String reentrantId) {
+        String lockKey = LOCK_PREFIX + key;
+        DefaultRedisScript<Long> script = new DefaultRedisScript<>();
+        script.setResultType(Long.class);
+        script.setScriptSource(new StaticScriptSource(UNLOCK));
+        Object result = redisTemplate.execute(script, Arrays.asList(lockKey), reentrantId);
+        if (Objects.isNull(result)) {
+            return null;
+        }
+        if (Objects.equals(1L, Long.valueOf(result.toString()))) {
+            return true;
+        }
+        return false;
+    }
+
+}

+ 12 - 1
etl-server/src/main/resources/application-dev.yml

@@ -6,7 +6,18 @@ spring:
     url: jdbc:mysql://rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com:3306/piaoquan-crawler?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false
     username: crawler
     password: crawler123456@
-
+  redis:
+#    hostName: r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com
+    hostName: r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com
+    port: 6379
+    password: Qingqu2019
+    timeout: 1000
+    lettuce:
+      pool:
+        max-active: 8
+        max-wait: -1
+        max-idle: 8
+        min-idle: 0
 apollo:
   meta: http://devapolloconfig-internal.piaoquantv.com
 

+ 11 - 0
etl-server/src/main/resources/application-prod.yml

@@ -6,6 +6,17 @@ spring:
     url: jdbc:mysql://rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com:3306/piaoquan-crawler?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false
     username: crawler
     password: crawler123456@
+  redis:
+    hostName: r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com
+    port: 6379
+    password: Wqsd@2019
+    timeout: 1000
+    lettuce:
+      pool:
+        max-active: 8
+        max-wait: -1
+        max-idle: 8
+        min-idle: 0
 
 apollo:
   meta: http://apolloconfig-internal.piaoquantv.com

+ 12 - 1
etl-server/src/main/resources/application-test.yml

@@ -6,7 +6,18 @@ spring:
     url: jdbc:mysql://rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com:3306/piaoquan-crawler?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false
     username: crawler
     password: crawler123456@
-
+  redis:
+    hostName: r-bp154bpw97gptefiqk.redis.rds.aliyuncs.com
+#    hostName: r-bp154bpw97gptefiqkpd.redis.rds.aliyuncs.com
+    port: 6379
+    password: Qingqu2019
+    timeout: 1000
+    lettuce:
+      pool:
+        max-active: 8
+        max-wait: -1
+        max-idle: 8
+        min-idle: 0
 apollo:
   meta: http://testapolloconfig-internal.piaoquantv.com
 

+ 1 - 1
etl-server/src/main/resources/application.yml

@@ -38,7 +38,7 @@ mybatis:
 
 logging:
   file:
-    path: /datalog/weblog/${spring.application.name}/
+    path: /Users/heyu/datalog/weblog/${spring.application.name}/
 
 app:
   id: ${spring.application.name}

+ 15 - 0
etl-server/src/test/java/com/tzld/crawler/etl/EtlServerApplicationTests.java

@@ -7,11 +7,13 @@ import com.google.common.collect.Lists;
 import com.huaban.analysis.jieba.JiebaSegmenter;
 import com.huaban.analysis.jieba.SegToken;
 import com.tzld.crawler.etl.mq.EtlMQConsumer;
+import com.tzld.crawler.etl.util.RedisUtil;
 import net.bramp.ffmpeg.FFprobe;
 import net.bramp.ffmpeg.probe.FFmpegFormat;
 import net.bramp.ffmpeg.probe.FFmpegProbeResult;
 import net.bramp.ffmpeg.probe.FFmpegStream;
 import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.mock.mockito.MockBean;
@@ -19,6 +21,7 @@ import org.springframework.boot.test.mock.mockito.MockBean;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
 @SpringBootTest
@@ -38,6 +41,8 @@ class EtlServerApplicationTests {
     @MockBean
     private EtlMQConsumer etlMQConsumer;
 
+    @Autowired
+    RedisUtil redisUtil;
 
     @Test
     void produceMsgTest() {
@@ -92,5 +97,15 @@ class EtlServerApplicationTests {
         );
     }
 
+    @Test
+    void testRedis() throws InterruptedException {
+        String key = "test:123:123";
+        System.out.println(redisUtil.setNx(key,"1", 10));
+        System.out.println(redisUtil.get(key));
+        System.out.println(redisUtil.setNx(key,"1", 10));
+        TimeUnit.SECONDS.sleep(11);
+        System.out.println(redisUtil.setNx(key,"1", 10));
+    }
+
 
 }

+ 8 - 1
pom.xml

@@ -163,7 +163,14 @@
             <groupId>org.hibernate.validator</groupId>
             <artifactId>hibernate-validator</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-redis</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+        </dependency>
     </dependencies>
 
 </project>