Bläddra i källkod

feat:添加特征延迟验证脚本

zhaohaipeng 8 månader sedan
förälder
incheckning
1d55cd4c45

+ 31 - 15
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/xgb/makedata_31_bucketDataPrint_20240821.scala

@@ -413,23 +413,24 @@ object makedata_31_bucketDataPrint_20240821 {
             }
             //5 处理log key表头。
             val mid = record.getString("mid")
-            val flag = record.isNull("allfeaturemap")
+            val flag = !record.isNull("allfeaturemap")
             val allfeature = if (record.isNull("allfeaturemap")) new JSONObject() else
               JSON.parseObject(record.getString("allfeaturemap"))
 
             val headvideoid = record.getString("headvideoid")
             // val logKey = (apptype, mid, cid, ts, headvideoid).productIterator.mkString(",")
             val labelKey = labels.toString()
+            val label = record.getString("ad_is_conversion")
             //6 拼接数据,保存。
-            (apptype, mid, cid, ts, headvideoid, labelKey, allfeature, featureMap, flag)
+            (apptype, mid, cid, ts, headvideoid, label, allfeature, featureMap, flag)
           }).filter {
-            case (apptype, mid, cid, ts, headvideoid, labelKey, allfeature, featureMap, flag) =>
+            case (apptype, mid, cid, ts, headvideoid, label, allfeature, featureMap, flag) =>
               flag
           }.mapPartitions(row => {
             val result = new ArrayBuffer[String]()
             val bucketsMap = bucketsMap_br.value
             row.foreach {
-              case (apptype, mid, cid, ts, headvideoid, labelKey, allfeature, featureMap, flag) =>
+              case (apptype, mid, cid, ts, headvideoid, label, allfeature, featureMap, flag) =>
                 val offlineFeatureMap = featureMap.filter(r => bucketsMap.contains(r._1)).map(r => {
                   val score = r._2.toString.toDouble
                   val name = r._1
@@ -446,7 +447,7 @@ object makedata_31_bucketDataPrint_20240821 {
                   }
                 }).filter(_.nonEmpty)
                 result.add(
-                  (apptype, mid, cid, ts, headvideoid, labelKey, allfeature.toString(), offlineFeatureMap.iterator.mkString(",")).productIterator.mkString("\t")
+                  (apptype, mid, cid, ts, headvideoid, label, allfeature.toString(), offlineFeatureMap.iterator.mkString(",")).productIterator.mkString("\t")
                 )
             }
             result.iterator
@@ -467,21 +468,17 @@ object makedata_31_bucketDataPrint_20240821 {
 
 
     val data2 = sc.textFile(savePath + "/" + readDate + "*").mapPartitions(row => {
-      val result = new ArrayBuffer[(String, List[String], List[String], List[String])]()
-      val contentList = contentList_br.value
+      val result = new ArrayBuffer[(String, List[String], List[String], List[String], List[String])]()
       // 680实验,517个特征
       row.foreach(r => {
         val rList = r.split("\t")
         val label = rList(5).toString
         val allFeatureMap = JSON.parseObject(rList(6)).toMap.map(r => (r._1, r._2.toString))
         val offlineFeature = rList(7).split(",").map(r => (r.split(":")(0), r.split(":")(1))).toMap
-        val offlineFeatureList = contentList.map(name => {
-          if (offlineFeature.containsKey(name)) {
-            name + ":" + offlineFeature(name)
-          } else {
-            ""
-          }
-        }).filter(_.nonEmpty)
+        val offlineFeatureList = offlineFeature.map {
+          case (key, value) =>
+            key + ":" + value
+        }.filter(_.nonEmpty).toList
 
         val allFeatureV1 = allFeatureMap.map {
           case (key, value) =>
@@ -498,7 +495,17 @@ object makedata_31_bucketDataPrint_20240821 {
             }
         }.filter(_.nonEmpty).toList
 
-        result.add((label, offlineFeatureList, allFeatureV1, allFeatureV2))
+        val ctcvrFeature = offlineFeature.map {
+          case (key, value) => {
+            if (key.contains("ctcvr") && key.contains("Ctcvr")) {
+              key + ":" + value
+            } else {
+              ""
+            }
+          }
+        }.filter(_.nonEmpty).toList
+
+        result.add((label, offlineFeatureList, allFeatureV1, allFeatureV2, ctcvrFeature))
       })
 
       result.iterator
@@ -531,6 +538,15 @@ object makedata_31_bucketDataPrint_20240821 {
       println("路径不合法,无法写入:" + allFeatureV2)
     }
 
+    val ctcvrFeature = "/dw/recommend/model/33_for_check_ctcvr/" + readDate
+    if (ctcvrFeature.nonEmpty && ctcvrFeature.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + ctcvrFeature)
+      MyHdfsUtils.delete_hdfs_path(ctcvrFeature)
+      data2.map(r => r._1 + "\t" + r._5.mkString("\t")).saveAsTextFile(ctcvrFeature, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + ctcvrFeature)
+    }
+
   }
 
   def func(record: Record, schema: TableSchema): Record = {