Pārlūkot izejas kodu

feat:添加VID过滤脚本

zhaohaipeng 8 mēneši atpakaļ
vecāks
revīzija
504f611d4c

+ 15 - 16
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/makedata_recsys_43_bucketData_20240709_vid.scala

@@ -10,10 +10,6 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
 
-/*
-
- */
-
 object makedata_recsys_43_bucketData_20240709_vid {
   def main(args: Array[String]): Unit = {
 
@@ -57,7 +53,6 @@ object makedata_recsys_43_bucketData_20240709_vid {
       }).toMap
     val bucketsMap_br = sc.broadcast(bucketsMap)
 
-
     val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
     for (date <- dateRange) {
       println("开始执行:" + date)
@@ -88,13 +83,14 @@ object makedata_recsys_43_bucketData_20240709_vid {
         .map {
           case (logKey, labelKey, features) =>
             val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-            (label, features)
+            val vid = logKey.split(",")(3)
+            (label, vid, features)
         }
         .mapPartitions(row => {
-          val result = new ArrayBuffer[String]()
+          val result = new ArrayBuffer[(String, String)]()
           val bucketsMap = bucketsMap_br.value
           row.foreach {
-            case (label, features) =>
+            case (label, vid, features) =>
               val featuresBucket = features.map {
                 case (name, score) =>
                   var ifFilter = false
@@ -119,19 +115,22 @@ object makedata_recsys_43_bucketData_20240709_vid {
                     }
                   }
               }.filter(_.nonEmpty)
-              result.add(label + "\t" + featuresBucket.mkString("\t"))
+              result.add((vid, label + "\t" + featuresBucket.mkString("\t")))
           }
           result.iterator
         })
+        .groupBy(_._1) // 按 vid 分组
 
       // 4 保存数据到hdfs
-      val hdfsPath = savePath + "/" + date
-      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-        println("删除路径并开始数据写入:" + hdfsPath)
-        MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-      } else {
-        println("路径不合法,无法写入:" + hdfsPath)
+      data.foreach { case (vid, records) =>
+        val hdfsPath = savePath + "/" + date + "/" + vid
+        if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+          println("删除路径并开始数据写入:" + hdfsPath)
+          MyHdfsUtils.delete_hdfs_path(hdfsPath)
+          sc.parallelize(records.map(_._2).toSeq).repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+        } else {
+          println("路径不合法,无法写入:" + hdfsPath)
+        }
       }
     }
   }