zhangbo 1 рік тому
батько
коміт
373a91e4e2

+ 13 - 5
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -56,18 +56,22 @@ object makedata_02_writeredis {
     if (ifUser){
       println("user特征处理")
       var userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = tablePart)
+        .filter {
+          case (mid, fea, feaSize) =>
+            mid.nonEmpty && fea.nonEmpty && feaSize > 0
+        }
       if (userSampleIDs.nonEmpty){
         val IDs = userSampleIDs.split(",").filter(_.nonEmpty).map(_.toInt).toList
         userData = userData.filter(r => IDs.contains(r._1.hashCode % 10))
       }
       if (ifDebug){
         println("user特征处理-debug开启-只保留5条数据-特征数量大于1")
-        val userDataTake = userData.filter(_._3 > 1).take(5)
+        val userDataTake = userData.take(5)
         userDataTake.foreach(r=> println(r._1 + "\t" + r._2 + "\t" + r._3))
         userData = sc.parallelize(userDataTake)
       }
       if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {
-        var savePathPart = savePathUser + "/user/" + partition
+        var savePathPart = savePathUser + "/" + partition
         if (userSampleIDs.nonEmpty) {
           savePathPart = savePathPart + "_" + userSampleIDs
         }
@@ -80,7 +84,8 @@ object makedata_02_writeredis {
     }
 
     if (ifDeleteRedisUser){
-      var savePathPart = savePathUser + "/user/" + partition
+      println("user redis 删除")
+      var savePathPart = savePathUser + "/" + partition
       if (userSampleIDs.nonEmpty) {
         savePathPart = savePathPart + "_" + userSampleIDs
       }
@@ -108,10 +113,13 @@ object makedata_02_writeredis {
         redisFormat.iterator
       })
       println("delete redis.count=" + userDataTakeRddRun.count())
+    } else {
+      println("不处理user的redis删除")
     }
 
     if (ifWriteRedisUser){
-      var savePathPart = savePathUser + "/user/" + partition
+      println("user redis 写入")
+      var savePathPart = savePathUser + "/" + partition
       if (userSampleIDs.nonEmpty) {
         savePathPart = savePathPart + "_" + userSampleIDs
       }
@@ -180,7 +188,7 @@ object makedata_02_writeredis {
         redisFormat.iterator
       })
       if (savePathVideo.nonEmpty && savePathVideo.startsWith("/dw/recommend/model/")){
-        val savePathPart = savePathVideo + "/video/" + partition
+        val savePathPart = savePathVideo + "/" + partition
         MyHdfsUtils.delete_hdfs_path(savePathPart)
         itemDataTakeRddRun.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
       }