|
|
@@ -35,16 +35,6 @@ public class ODPSToRedis {
|
|
|
|
|
|
Map<String, String> argMap = cmdService.parse(args);
|
|
|
|
|
|
-// argMap.put("project", "loghubods");
|
|
|
-// argMap.put("table", "alg_video_source_feature_day");
|
|
|
-// argMap.put("dt", "20250610");
|
|
|
-// argMap.put("hh", "13");
|
|
|
-// argMap.put("mi", "00");
|
|
|
-// argMap.put("env", "test");
|
|
|
-// argMap.put("odpsBatchSize", "300000");
|
|
|
-// argMap.put("retry", "1");
|
|
|
-
|
|
|
-
|
|
|
if (MapUtils.isEmpty(argMap)) {
|
|
|
log.error("args is empty");
|
|
|
return;
|
|
|
@@ -126,14 +116,14 @@ public class ODPSToRedis {
|
|
|
|
|
|
|
|
|
log.info("odps count {}", count);
|
|
|
+
|
|
|
long odpsBatchSize = NumberUtils.toLong(argMap.getOrDefault("odpsBatchSize", "200000"));
|
|
|
int calcPartationNum = (int) (count / odpsBatchSize + 1);
|
|
|
|
|
|
- int partitionNum = NumberUtils.toInt(argMap.get("partitionNum"), 10);
|
|
|
- int finalPartationNum = Math.min(partitionNum, calcPartationNum);
|
|
|
-
|
|
|
- log.info("argMap.partitionNum: {} calcPartationNum: {}, finalPartationNum: {}", partitionNum, calcPartationNum, finalPartationNum);
|
|
|
+ int maxPartitionNum = NumberUtils.toInt(argMap.get("maxPartitionNum"), 30);
|
|
|
+ int finalPartationNum = Math.min(maxPartitionNum, calcPartationNum);
|
|
|
|
|
|
+ log.info("argMap.maxPartitionNum: {} calcPartationNum: {}, finalPartationNum: {}", maxPartitionNum, calcPartationNum, finalPartationNum);
|
|
|
fieldValues.repartition(finalPartationNum).foreachPartition(iterator -> {
|
|
|
redisService.mSetV2(iterator, config);
|
|
|
});
|