Browse Source

推荐样本生产-特征值拼接

zhangbo 1 năm trước cách đây
mục cha
commit
17d208da09

+ 18 - 0
src/main/resources/20240608_feature_name.txt

@@ -0,0 +1,18 @@
+total_time
+bit_rate
+b123_7d_STR
+b167_7d_log(share)
+b8910_7d_ROV
+b111213_7d_log(return)
+b171819_7d_ROV*log(return)
+playcnt_7d
+share_pv_7d
+return_uv_7d
+c3_feature_tags_7d_maxscore
+c4_feature_tags_7d_maxscore
+c5_feature_tags_7d_maxscore
+c6_feature_tags_7d_maxscore
+c7_feature_tags_7d_maxscore
+c8_feature_share_score
+c9_feature_return_score
+d1_rovn

+ 89 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_14_valueData_20240608.scala

@@ -0,0 +1,89 @@
+package com.aliyun.odps.spark.examples.makedata
+
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+/*
+
+ */
+
+object makedata_14_valueData_20240608 {
+  def main(args: Array[String]): Unit = {
+
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val loader = getClass.getClassLoader
+    val resourceUrl = loader.getResource("20240608_feature_name.txt")
+    val content =
+      if (resourceUrl != null) {
+        val content = Source.fromURL(resourceUrl).getLines().mkString("\n")
+        Source.fromURL(resourceUrl).close()
+        content
+      } else {
+        ""
+      }
+    println(content)
+    val contentList = content.split("\n")
+      .map(r=> r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r=> r.nonEmpty).toList
+    val contentList_bc = sc.broadcast(contentList)
+
+
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val partitionPrefix = param.getOrElse("partitionPrefix", "dt=20240607,hh=00")
+    val date = param.getOrElse("date", "20240607")
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/13_sample_data/")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/14_feature_data/")
+    val repartition = param.getOrElse("repartition", "200").toInt
+
+    val data = sc.textFile(readPath + partitionPrefix + "*")
+    val data1 = data.map(r => {
+      val rList = r.split("\t")
+      val logKey = rList(0)
+      val labelKey = rList(1)
+      val featureKey = rList(2)
+      (logKey, labelKey, featureKey)
+    }).filter(r=>
+      r._1.split(",")(6).equals("0")
+    ).mapPartitions(row => {
+      val result = new ArrayBuffer[String]()
+      val contentList = contentList_bc.value
+      row.foreach{
+        case (logKey, labelKey, featureKey) =>
+          val label = JSON.parseObject(labelKey).getOrDefault("is_return", "0").toString
+          val featureJson = JSON.parseObject(featureKey)
+
+          val featureValues = contentList.map(key=>{
+            if (featureJson.containsKey(key)){
+              featureJson.getDouble(key)
+            }else{
+              0.0
+            }
+          })
+          result.add(label + "\t" + featureValues.mkString(","))
+      }
+      result.iterator
+    })
+
+    // 4 保存数据到hdfs
+    val hdfsPath = savePath + "/" + date
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      data1.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
+    }
+  }
+}