Jelajahi Sumber

feat:修改partationNum

zhaohaipeng 1 bulan lalu
induk
melakukan
f9a9ba57c8

+ 16 - 9
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/ODPSToRedis.java

@@ -1,6 +1,10 @@
 package com.tzld.piaoquan.recommend.feature.produce;
 
 import com.alibaba.fastjson.JSONObject;
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.RequestBody;
 import com.tzld.piaoquan.recommend.feature.produce.model.DTSConfig;
 import com.tzld.piaoquan.recommend.feature.produce.service.CMDService;
 import com.tzld.piaoquan.recommend.feature.produce.service.DTSConfigService;
@@ -8,7 +12,6 @@ import com.tzld.piaoquan.recommend.feature.produce.service.ODPSService;
 import com.tzld.piaoquan.recommend.feature.produce.service.RedisService;
 import com.tzld.piaoquan.recommend.feature.produce.util.JSONUtils;
 import lombok.extern.slf4j.Slf4j;
-import com.squareup.okhttp.*;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.spark.SparkConf;
@@ -110,24 +113,28 @@ public class ODPSToRedis {
             }
         }
 
-        log.info("odps count {}", count);
-        if (!argMap.containsKey("partitionNum")){
-            long odpsBatchSize = NumberUtils.toLong(argMap.getOrDefault("odpsBatchSize", "200000"));
-            argMap.put("partitionNum", String.valueOf(count / odpsBatchSize + 1));
-        }
-
         JavaRDD<Map<String, String>> fieldValues = odpsService.read(jsc, config, argMap);
         if (fieldValues == null) {
             log.info("odps empty");
             return;
         }
-        //log.info("odps count {}", fieldValues.count());
+        // log.info("odps count {}", fieldValues.count());
 
         // RDD
         log.info("sync redis");
         RedisService redisService = new RedisService(env);
+
+
+        log.info("odps count {}", count);
+        long odpsBatchSize = NumberUtils.toLong(argMap.getOrDefault("odpsBatchSize", "200000"));
+        int calcPartationNum = (int) (count / odpsBatchSize + 1);
+
         int partitionNum = NumberUtils.toInt(argMap.get("partitionNum"), 10);
-        fieldValues.repartition(partitionNum).foreachPartition(iterator -> {
+        int finalPartationNum = Math.min(partitionNum, calcPartationNum);
+
+        log.info("argMap.partitionNum: {} calcPartationNum: {}, finalPartationNum: {}", partitionNum, calcPartationNum, finalPartationNum);
+
+        fieldValues.repartition(finalPartationNum).foreachPartition(iterator -> {
             redisService.mSetV2(iterator, config);
         });