|
@@ -28,13 +28,22 @@ public class ODPSToRedis {
|
|
log.info("args {}", JSONUtils.toJson(args));
|
|
log.info("args {}", JSONUtils.toJson(args));
|
|
|
|
|
|
Map<String, String> argMap = cmdService.parse(args);
|
|
Map<String, String> argMap = cmdService.parse(args);
|
|
|
|
+
|
|
|
|
+// argMap.put("project", "loghubods");
|
|
|
|
+// argMap.put("table", "alg_mid_feature_share_and_return");
|
|
|
|
+// argMap.put("dt", "20240905");
|
|
|
|
+// argMap.put("hh", "13");
|
|
|
|
+// argMap.put("mi", "00");
|
|
|
|
+// argMap.put("env", "test");
|
|
|
|
+// argMap.put("odpsBatchSize", "300000");
|
|
|
|
+
|
|
if (MapUtils.isEmpty(argMap)) {
|
|
if (MapUtils.isEmpty(argMap)) {
|
|
log.error("args is empty");
|
|
log.error("args is empty");
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
SparkConf sparkConf = new SparkConf()
|
|
SparkConf sparkConf = new SparkConf()
|
|
- // .setMaster("local")
|
|
|
|
|
|
+ //.setMaster("local")
|
|
.setAppName("odps sync to redis : " + argMap.get("table"));
|
|
.setAppName("odps sync to redis : " + argMap.get("table"));
|
|
for (Map.Entry<String, String> e : argMap.entrySet()) {
|
|
for (Map.Entry<String, String> e : argMap.entrySet()) {
|
|
sparkConf.set(e.getKey(), e.getValue());
|
|
sparkConf.set(e.getKey(), e.getValue());
|
|
@@ -71,7 +80,8 @@ public class ODPSToRedis {
|
|
}
|
|
}
|
|
|
|
|
|
log.info("odps count {}", count);
|
|
log.info("odps count {}", count);
|
|
- argMap.put("partitionNum", String.valueOf(count / 50000 + 1));
|
|
|
|
|
|
+ long odpsBatchSize = NumberUtils.toLong(argMap.getOrDefault("odpsBatchSize", "200000"));
|
|
|
|
+ argMap.put("partitionNum", String.valueOf(count / odpsBatchSize + 1));
|
|
|
|
|
|
JavaRDD<Map<String, String>> fieldValues = odpsService.read(jsc, config, argMap);
|
|
JavaRDD<Map<String, String>> fieldValues = odpsService.read(jsc, config, argMap);
|
|
if (fieldValues == null) {
|
|
if (fieldValues == null) {
|