|
@@ -47,10 +47,12 @@ public class ODPSToRedis {
|
|
List<Map<String, String>> fieldValues = odpsService.read(config, argMap);
|
|
List<Map<String, String>> fieldValues = odpsService.read(config, argMap);
|
|
log.info("odps count {}", fieldValues.size());
|
|
log.info("odps count {}", fieldValues.size());
|
|
if (CollectionUtils.isEmpty(fieldValues)) {
|
|
if (CollectionUtils.isEmpty(fieldValues)) {
|
|
|
|
+ log.info("odps empty");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
// RDD
|
|
// RDD
|
|
|
|
+ log.info("sync redis");
|
|
RedisService redisService = new RedisService(env);
|
|
RedisService redisService = new RedisService(env);
|
|
SparkConf sparkConf = new SparkConf()
|
|
SparkConf sparkConf = new SparkConf()
|
|
.setAppName("odps sync to redis");
|
|
.setAppName("odps sync to redis");
|
|
@@ -60,9 +62,7 @@ public class ODPSToRedis {
|
|
JavaRDD<Pair<String, String>> json = readData.map(
|
|
JavaRDD<Pair<String, String>> json = readData.map(
|
|
f -> Pair.of(redisService.redisKey(f, config), JSONUtils.toJson(f))
|
|
f -> Pair.of(redisService.redisKey(f, config), JSONUtils.toJson(f))
|
|
);
|
|
);
|
|
-
|
|
|
|
int partitionNum = fieldValues.size() / 1000 + 1;
|
|
int partitionNum = fieldValues.size() / 1000 + 1;
|
|
- log.info("sync redis");
|
|
|
|
json.repartition(partitionNum).foreachPartition(iterator -> {
|
|
json.repartition(partitionNum).foreachPartition(iterator -> {
|
|
redisService.mSet(iterator, config);
|
|
redisService.mSet(iterator, config);
|
|
});
|
|
});
|