Browse Source

feat:添加VID过滤脚本

zhaohaipeng 8 months ago
parent
commit
5a820a2a60

+ 12 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/makedata_recsys_43_bucketData_20240709_vid.scala

@@ -15,6 +15,12 @@ object makedata_recsys_43_bucketData_20240709_vid {
 
 
     // 1 读取参数
     // 1 读取参数
     val param = ParamUtils.parseArgs(args)
     val param = ParamUtils.parseArgs(args)
+    param.foreach {
+      case (key, value) => {
+        println("Key: " + key + "; Value: " + value)
+      }
+    }
+
     val readPath = param.getOrElse("readPath", "/dw/recommend/model/41_recsys_sample_data_v1/")
     val readPath = param.getOrElse("readPath", "/dw/recommend/model/41_recsys_sample_data_v1/")
     val savePath = param.getOrElse("savePath", "/dw/recommend/model/43_recsys_train_data_v1/")
     val savePath = param.getOrElse("savePath", "/dw/recommend/model/43_recsys_train_data_v1/")
     val beginStr = param.getOrElse("beginStr", "20240703")
     val beginStr = param.getOrElse("beginStr", "20240703")
@@ -127,7 +133,12 @@ object makedata_recsys_43_bucketData_20240709_vid {
         if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
         if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
           println("删除路径并开始数据写入:" + hdfsPath)
           println("删除路径并开始数据写入:" + hdfsPath)
           MyHdfsUtils.delete_hdfs_path(hdfsPath)
           MyHdfsUtils.delete_hdfs_path(hdfsPath)
-          sc.parallelize(records.map(_._2).toSeq).repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+
+          // 创建局部变量避免闭包引用外部变量
+          val localRecords = records
+          val localRepartition = repartition
+
+          sc.parallelize(localRecords.toSeq).repartition(localRepartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
         } else {
         } else {
           println("路径不合法,无法写入:" + hdfsPath)
           println("路径不合法,无法写入:" + hdfsPath)
         }
         }