zhangbo il y a 10 mois
Parent
commit
85676c34fd

+ 62 - 8
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_17_bucketDataPrint_20240617.scala

@@ -270,7 +270,8 @@ object makedata_17_bucketDataPrint_20240617 {
 //    }
 
     val data2 = sc.textFile(savePath + "/" + readDate + "*").mapPartitions(row=>{
-      val result = new ArrayBuffer[(String, Map[String, String], Map[String, String], List[String], List[String])]()
+      val result = new ArrayBuffer[(String,
+        Map[String, String], Map[String, String], List[String], List[String], List[String], List[String])]()
       val contentList = contentList_br.value
       row.foreach(r=>{
         val rList = r.split("\t")
@@ -312,8 +313,43 @@ object makedata_17_bucketDataPrint_20240617 {
           }
         }).filter(_.nonEmpty)
 
+        val v5 = contentList.map(name => {
+          val useOfflineNames = Set("d1_exp", "d1_return_n", "d1_rovn")
+          if (useOfflineNames.contains(name)) {
+            if (offlineFeatrueMap.contains(name)) {
+              name + ":" + offlineFeatrueMap(name)
+            } else {
+              ""
+            }
+          } else {
+            if (allfeaturemap.contains(name)) {
+              name + ":" + allfeaturemap(name)
+            } else {
+              ""
+            }
+          }
+        }).filter(_.nonEmpty)
+
+        val v6 = contentList.map(name => {
+          val useOfflineNames = Set("playcnt_6h", "playcnt_1d", "playcnt_3d", "playcnt_7d",
+            "share_pv_12h", "share_pv_1d", "share_pv_3d", "share_pv_7d", "return_uv_12h", "return_uv_1d", "return_uv_3d", "return_uv_7d")
+          if (useOfflineNames.contains(name)) {
+            if (offlineFeatrueMap.contains(name)) {
+              name + ":" + offlineFeatrueMap(name)
+            } else {
+              ""
+            }
+          } else {
+            if (allfeaturemap.contains(name)) {
+              name + ":" + allfeaturemap(name)
+            } else {
+              ""
+            }
+          }
+        }).filter(_.nonEmpty)
+
 
-        result.add((label, offlineFeatrueMap, allfeaturemap, v3, v4))
+        result.add((label, offlineFeatrueMap, allfeaturemap, v3, v4, v5, v6))
 
       })
       result.iterator
@@ -346,13 +382,31 @@ object makedata_17_bucketDataPrint_20240617 {
 //      println("路径不合法,无法写入:" + saveV3)
 //    }
 
-    val saveV4 = "/dw/recommend/model/17_for_check_v4/" + readDate
-    if (saveV4.nonEmpty && saveV4.startsWith("/dw/recommend/model/")) {
-      println("删除路径并开始数据写入:" + saveV4)
-      MyHdfsUtils.delete_hdfs_path(saveV4)
-      data2.map(r => r._1 + "\t" + r._5.mkString("\t")).saveAsTextFile(saveV4, classOf[GzipCodec])
+//    val saveV4 = "/dw/recommend/model/17_for_check_v4/" + readDate
+//    if (saveV4.nonEmpty && saveV4.startsWith("/dw/recommend/model/")) {
+//      println("删除路径并开始数据写入:" + saveV4)
+//      MyHdfsUtils.delete_hdfs_path(saveV4)
+//      data2.map(r => r._1 + "\t" + r._5.mkString("\t")).saveAsTextFile(saveV4, classOf[GzipCodec])
+//    } else {
+//      println("路径不合法,无法写入:" + saveV4)
+//    }
+
+      val saveV5 = "/dw/recommend/model/17_for_check_v5/" + readDate
+      if (saveV5.nonEmpty && saveV5.startsWith("/dw/recommend/model/")) {
+        println("删除路径并开始数据写入:" + saveV5)
+        MyHdfsUtils.delete_hdfs_path(saveV5)
+        data2.map(r => r._1 + "\t" + r._6.mkString("\t")).saveAsTextFile(saveV5, classOf[GzipCodec])
+      } else {
+        println("路径不合法,无法写入:" + saveV5)
+      }
+
+    val saveV6 = "/dw/recommend/model/17_for_check_v6/" + readDate
+    if (saveV6.nonEmpty && saveV6.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + saveV6)
+      MyHdfsUtils.delete_hdfs_path(saveV6)
+      data2.map(r => r._1 + "\t" + r._7.mkString("\t")).saveAsTextFile(saveV6, classOf[GzipCodec])
     } else {
-      println("路径不合法,无法写入:" + saveV4)
+      println("路径不合法,无法写入:" + saveV6)
     }