|
@@ -56,32 +56,33 @@ public class RedisService implements Serializable {
|
|
|
|
|
|
public void mSetV2(Iterator<Map<String, String>> dataIte, DTSConfig config) {
|
|
|
Map<String, String> batch = new HashMap<>();
|
|
|
+ Jedis jedis = new Jedis(hostName, port);
|
|
|
+ jedis.auth(password);
|
|
|
while (dataIte.hasNext()) {
|
|
|
Map<String, String> record = dataIte.next();
|
|
|
String redisKey = redisKey(record, config);
|
|
|
String value = JSONUtils.toJson(record);
|
|
|
batch.put(redisKey, value);
|
|
|
if (batch.size() > 2000) {
|
|
|
- mSet(batch, config.getRedis().getExpire(), TimeUnit.SECONDS);
|
|
|
+ mSet(jedis, batch, config.getRedis().getExpire(), TimeUnit.SECONDS);
|
|
|
batch.clear();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
if (MapUtils.isNotEmpty(batch)) {
|
|
|
- mSet(batch, config.getRedis().getExpire(), TimeUnit.SECONDS);
|
|
|
+ mSet(jedis, batch, config.getRedis().getExpire(), TimeUnit.SECONDS);
|
|
|
}
|
|
|
+ jedis.close();
|
|
|
}
|
|
|
|
|
|
- private void mSet(Map<String, String> batch, long expire, TimeUnit timeUnit) {
|
|
|
+ private void mSet(Jedis jedis, Map<String, String> batch, long expire, TimeUnit timeUnit) {
|
|
|
long expireSeconds = timeUnit.toSeconds(expire);
|
|
|
- Jedis jedis = new Jedis(hostName, port);
|
|
|
- jedis.auth(password);
|
|
|
+
|
|
|
Pipeline pipeline = jedis.pipelined();
|
|
|
for (Map.Entry<String, String> e : batch.entrySet()) {
|
|
|
pipeline.setex(e.getKey(), expireSeconds, e.getValue());
|
|
|
}
|
|
|
pipeline.sync();
|
|
|
- jedis.close();
|
|
|
+
|
|
|
}
|
|
|
|
|
|
public String redisKey(Map<String, String> fieldValueMap, DTSConfig config) {
|