zhangbo 1 year ago
parent
commit
07e05fd5af

+ 8 - 7
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_09_user2redis_freq.scala

@@ -94,16 +94,17 @@ object makedata_09_user2redis_freq {
 
     //5 用户区分
     val savePathPart = savePathUser + "/all/" + partition
-    val userDataRead = sc.textFile(savePathPart).filter(_.split("\t").length >= 2)
+    val userDataRead = sc.textFile(savePathPart).repartition(99).filter(_.split("\t").length >= 2)
       .map(r => {
         val rList = r.split("\t")
         (rList(0), rList(1))
-      }).leftOuterJoin(midRdd).map {
-        case (mid, (fea, Some(_))) =>
-          (mid, fea, true)
-        case (mid, (fea, None)) =>
-          (mid, fea, false)
-      }
+      }).join(midRdd).map(r => (r._1, r._2._1, true))
+//      .leftOuterJoin(midRdd).map {
+//        case (mid, (fea, Some(_))) =>
+//          (mid, fea, true)
+//        case (mid, (fea, None)) =>
+//          (mid, fea, false)
+//      }
     val userDataReadTrue = userDataRead.filter(_._3).map(r => r._1 + "\t" + r._2)
     // val userDataReadFalse = userDataRead.filter(!_._3).map(r => r._1 + "\t" + r._2)
     if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {

+ 1 - 1
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本

@@ -39,5 +39,5 @@ nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.s
 --master yarn --driver-memory 1G --executor-memory 4G --executor-cores 1 --num-executors 32 \
 --conf spark.yarn.executor.memoryoverhead=1024 \
 /root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-date:20240227 tablePart:32 expireDay:3 ifWriteRedisUser:False ifUser:False midDays:7 \
+date:20240227 tablePart:32 expireDay:3 ifWriteRedisUser:False ifUser:False midDays:7 redisLimit:100000000 \
 savePathUser:/dw/recommend/model/09_feature/user/ > p09.log 2>&1 &