浏览代码

i2i样本制作,第4步。 连续值分桶

zhangbo 5 月之前
父节点
当前提交
d2a6be4954

+ 172 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_dssm/makedata_i2i_04_bucketFile_20241128.scala

@@ -0,0 +1,172 @@
+package com.aliyun.odps.spark.examples.makedata_dssm
+
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+object makedata_i2i_04_bucketFile_20241128 {
+  def func(record: Record, schema: TableSchema): Record = {
+    record
+  }
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/52_dssm_i2i_joinfeature/20241128*")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/54_dssm_i2i_bucketfile/")
+    val fileName = param.getOrElse("fileName", "XXXXX")
+    val bucketNum = param.getOrElse("bucketNum", "100").toInt
+    // 3 循环执行数据生产
+    val data = sc.textFile(readPath).flatMap(r=>{
+      val rList = r.split("\t")
+      val f1 = rList(6)
+      val f11 = rList(7)
+      val f2 = rList(8)
+      val f22 = rList(9)
+      val f3 = rList(10)
+      val f33 = rList(11)
+      val result = new ArrayBuffer[(String, Double)]()
+      Set(f1, f11).toSeq.foreach(f=> {
+        JSON.parseObject(f).foreach {
+          case (k, v) =>
+            val value = v.toString.toDouble
+            k match {
+              case "str_day1" => result += ("action:str_day1", value)
+              case "rov_day1" => result += ("action:rov_day1", value)
+              case "ros_day1" => result += ("action:ros_day1", value)
+              case "str_day7" => result += ("action:str_day7", value)
+              case "rov_day7" => result += ("action:rov_day7", value)
+              case "ros_day7" => result += ("action:ros_day7", value)
+              case "str_day21" => result += ("action:str_day21", value)
+              case "rov_day21" => result += ("action:rov_day21", value)
+              case "ros_day21" => result += ("action:ros_day21", value)
+              case "str_day336" => result += ("action:str_day336", value)
+              case "rov_day336" => result += ("action:rov_day336", value)
+              case "ros_day336" => result += ("action:ros_day336", value)
+              case "vovd1_day7" => result += ("action:vovd1_day7", value)
+              case "vovd1_day21" => result += ("action:vovd1_day21", value)
+              case "vovd1_day336" => result += ("action:vovd1_day336", value)
+            }
+        }
+      })
+      Set(f2, f22).toSeq.foreach(f => {
+        JSON.parseObject(f).foreach {
+          case (k, v) =>
+            val value = v.toString.toDouble
+            k match {
+              case "str_day1" => result += ("cate1:str_day1", value)
+              case "rov_day1" => result += ("cate1:rov_day1", value)
+              case "ros_day1" => result += ("cate1:ros_day1", value)
+              case "str_day3" => result += ("cate1:str_day3", value)
+              case "rov_day3" => result += ("cate1:rov_day3", value)
+              case "ros_day3" => result += ("cate1:ros_day3", value)
+              case "str_day7" => result += ("cate1:str_day7", value)
+              case "rov_day7" => result += ("cate1:rov_day7", value)
+              case "ros_day7" => result += ("cate1:ros_day7", value)
+              case "str_day30" => result += ("cate1:str_day30", value)
+              case "rov_day30" => result += ("cate1:rov_day30", value)
+              case "ros_day30" => result += ("cate1:ros_day30", value)
+              case "vovd1_day1" => result += ("cate1:vovd1_day1", value)
+              case "vovd1_day3" => result += ("cate1:vovd1_day3", value)
+              case "vovd1_day7" => result += ("cate1:vovd1_day7", value)
+              case "vovd1_day30" => result += ("cate1:vovd1_day30", value)
+            }
+        }
+      })
+      Set(f3, f33).toSeq.foreach(f => {
+        JSON.parseObject(f).foreach {
+          case (k, v) =>
+            val value = v.toString.toDouble
+            k match {
+              case "str_day1" => result += ("cate2:str_day1", value)
+              case "rov_day1" => result += ("cate2:rov_day1", value)
+              case "ros_day1" => result += ("cate2:ros_day1", value)
+              case "str_day3" => result += ("cate2:str_day3", value)
+              case "rov_day3" => result += ("cate2:rov_day3", value)
+              case "ros_day3" => result += ("cate2:ros_day3", value)
+              case "str_day7" => result += ("cate2:str_day7", value)
+              case "rov_day7" => result += ("cate2:rov_day7", value)
+              case "ros_day7" => result += ("cate2:ros_day7", value)
+              case "str_day30" => result += ("cate2:str_day30", value)
+              case "rov_day30" => result += ("cate2:rov_day30", value)
+              case "ros_day30" => result += ("cate2:ros_day30", value)
+              case "vovd1_day1" => result += ("cate2:vovd1_day1", value)
+              case "vovd1_day3" => result += ("cate2:vovd1_day3", value)
+              case "vovd1_day7" => result += ("cate2:vovd1_day7", value)
+              case "vovd1_day30" => result += ("cate2:vovd1_day30", value)
+            }
+        }
+      })
+      result
+    })
+
+    val contentList = List[String](
+      "action:str_day1", "action:rov_day1", "action:ros_day1", "action:str_day7", "action:rov_day7","action:ros_day7",
+      "action:str_day21", "action:rov_day21", "action:ros_day21", "action:str_day336","action:rov_day336", "action:ros_day336",
+      "action:vovd1_day7", "action:vovd1_day21", "action:vovd1_day336",
+
+      "cate1:str_day1", "cate1:rov_day1", "cate1:ros_day1", "cate1:str_day3", "cate1:rov_day3", "cate1:ros_day3",
+      "cate1:str_day7", "cate1:rov_day7", "cate1:ros_day7", "cate1:str_day30", "cate1:rov_day30", "cate1:ros_day30",
+      "cate1:vovd1_day1", "cate1:vovd1_day3", "cate1:vovd1_day7", "cate1:vovd1_day30",
+
+      "cate2:str_day1", "cate2:rov_day1", "cate2:ros_day1", "cate2:str_day3", "cate2:rov_day3", "cate2:ros_day3",
+      "cate2:str_day7", "cate2:rov_day7", "cate2:ros_day7", "cate2:str_day30", "cate2:rov_day30", "cate2:ros_day30",
+      "cate2:vovd1_day1", "cate2:vovd1_day3", "cate2:vovd1_day7", "cate2:vovd1_day30"
+
+
+    )
+    val result = new ArrayBuffer[String]()
+
+    for (i <- contentList.indices) {
+      println("特征:" + contentList(i))
+      val data2 = data.filter(_._1.equals(contentList(i))).map(_._2).filter(_ > 1E-8).collect().sorted
+      val len = data2.length
+      if (len == 0) {
+        result.add(contentList(i) + "\t" + bucketNum.toString + "\t" + "0")
+      } else {
+        val oneBucketNum = (len - 1) / (bucketNum - 1) + 1 // 确保每个桶至少有一个元素
+        val buffers = new ArrayBuffer[Double]()
+
+        var lastBucketValue = data2(0) // 记录上一个桶的切分点
+        for (j <- 0 until len by oneBucketNum) {
+          val d = data2(j)
+          if (j > 0 && d != lastBucketValue) {
+            // 如果当前切分点不同于上一个切分点,则保存当前切分点
+            buffers += d
+          }
+          lastBucketValue = d // 更新上一个桶的切分点
+        }
+
+        // 最后一个桶的结束点应该是数组的最后一个元素
+        if (!buffers.contains(data2.last)) {
+          buffers += data2.last
+        }
+        result.add(contentList(i) + "\t" + bucketNum.toString + "\t" + buffers.mkString(","))
+      }
+    }
+    val data3 = sc.parallelize(result)
+
+    // 4 保存数据到hdfs
+    val hdfsPath = savePath + "/" + fileName
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      data3.repartition(1).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
+    }
+
+  }
+}

+ 11 - 0
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本-I2I

@@ -23,6 +23,9 @@ tablePart:64 \
 readPath:/dw/recommend/model/53_dssm_i2i_onehot/20241128 \
 savePath:/dw/recommend/model/53_dssm_i2i_onehot/20241128 > p53.log 2>&1 &
 
+
+
+
 数据量:3415597打印各个特征多少枚举值:
 vid       3407301
 video_style     6517
@@ -41,3 +44,11 @@ background_music_type   6
 captions        3
 has_end_credit_guide    2
 
+nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata_dssm.makedata_i2i_04_bucketFile_20241128 \
+--master yarn --driver-memory 16G --executor-memory 2G --executor-cores 1 --num-executors 32 \
+./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+readPath:/dw/recommend/model/52_dssm_i2i_joinfeature/20241128* \
+savePath:/dw/recommend/model/54_dssm_i2i_bucketfile/ \
+readPath:/dw/recommend/model/53_dssm_i2i_onehot/20241128 \
+fileName:47_rate_v1  bucketNum:100 > p53.log 2>&1 &