@@ -103,6 +103,9 @@ public class ODPSToRedis {
fieldValues.repartition(partitionNum).foreachPartition(iterator -> {
redisService.mSetV2(iterator, config);
});
+
+ redisService.setMonitor(config, argMap);
jsc.stop();
}
@@ -57,6 +57,14 @@ public class RedisService implements Serializable {
jedis.close();
+ public void setMonitor(DTSConfig config, Map<String, String> argMap) {
+ Jedis jedis = new Jedis(hostName, port);
+ jedis.auth(password);
+ long expire = config.getRedis().getExpire();
+ String redisKey = "ttl:" + argMap.get("table");
+ jedis.setex(redisKey, expire, "");
+ }
private void mSet(Jedis jedis, Map<String, String> batch, long expire, TimeUnit timeUnit) {
long expireSeconds = timeUnit.toSeconds(expire);