Quellcode durchsuchen

feat:添加VID过滤脚本

zhaohaipeng vor 8 Monaten
Ursprung
Commit
d3e589845f

+ 10 - 17
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/makedata_recsys_43_bucketData_20240709_vid.scala

@@ -93,7 +93,7 @@ object makedata_recsys_43_bucketData_20240709_vid {
             (label, vid, features)
         }
         .mapPartitions(row => {
-          val result = new ArrayBuffer[(String, String)]()
+          val result = new ArrayBuffer[String]()
           val bucketsMap = bucketsMap_br.value
           row.foreach {
             case (label, vid, features) =>
@@ -121,28 +121,21 @@ object makedata_recsys_43_bucketData_20240709_vid {
                     }
                   }
               }.filter(_.nonEmpty)
-              result.add((vid, label + "\t" + featuresBucket.mkString("\t")))
+              result.add(label + "\t" + vid + "\t" + featuresBucket.mkString("\t"))
           }
           result.iterator
         })
-        .groupBy(_._1) // 按 vid 分组
 
       // 4 保存数据到hdfs
-      data.foreach { case (vid, records) =>
-        val hdfsPath = savePath + "/" + date + "/" + vid
-        if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-          println("删除路径并开始数据写入:" + hdfsPath)
-          MyHdfsUtils.delete_hdfs_path(hdfsPath)
-
-          // 创建局部变量避免闭包引用外部变量
-          val localRecords = records
-          val localRepartition = repartition
-
-          sc.parallelize(localRecords.toSeq).repartition(localRepartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-        } else {
-          println("路径不合法,无法写入:" + hdfsPath)
-        }
+      val hdfsPath = savePath + "/" + date
+      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+        println("删除路径并开始数据写入:" + hdfsPath)
+        MyHdfsUtils.delete_hdfs_path(hdfsPath)
+        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+      } else {
+        println("路径不合法,无法写入:" + hdfsPath)
       }
     }
   }
 }
+