often 5 months ago
parent
commit
dd7470ec65

+ 6 - 8
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/video_dssm_sampler.scala

@@ -17,7 +17,7 @@ object video_dssm_sampler {
 
   // 配置参数
   private val CONFIG = Map(
-    "shuffle.partitions" -> "200",
+    "shuffle.partitions" -> "400",
     "memory.fraction" -> "0.8",
     "default.parallelism" -> "200"
   )
@@ -129,10 +129,10 @@ object video_dssm_sampler {
         transfer = funcPositive,
         numPartition = CONFIG("shuffle.partitions").toInt
       ).sample(false, 0.001) // 随机抽样千分之一的数据
-       .persist(StorageLevel.MEMORY_AND_DISK)
+       .persist(StorageLevel.MEMORY_AND_DISK_SER)
       println("开始执行partiton:" + partition)
 
-      val positivePairs = spark.createDataFrame(rdd, schema).persist(StorageLevel.MEMORY_AND_DISK)
+      val positivePairs = spark.createDataFrame(rdd, schema).persist(StorageLevel.MEMORY_AND_DISK_SER)
       stats.positiveSamplesCount = positivePairs.count()
       logger.info(s"start read vid list for date $dt")
 
@@ -186,7 +186,7 @@ object video_dssm_sampler {
           negativeSamplesDF
             .withColumn("label", lit(0))
             .withColumn("logid", concat(lit("neg_"), monotonically_increasing_id()))
-        ).persist(StorageLevel.MEMORY_AND_DISK)
+        ).persist(StorageLevel.MEMORY_AND_DISK_SER)
 
       // 6. 获取左侧特征
       // 读取L1类别统计特征
@@ -301,7 +301,7 @@ object video_dssm_sampler {
           col("vid_left_cate_l1_feature"),
           col("vid_left_cate_l2_feature")
         )
-        .persist(StorageLevel.MEMORY_AND_DISK)
+        .persist(StorageLevel.MEMORY_AND_DISK_SER)
 
 
 
@@ -365,9 +365,7 @@ object video_dssm_sampler {
           col("vid_right_cate_l1_feature"),
           col("vid_right_cate_l2_feature")
         )
-        .persist(StorageLevel.MEMORY_AND_DISK)
-
-
+        .persist(StorageLevel.MEMORY_AND_DISK_SER)