zhangbo 9 месяцев назад
Родитель
Сommit
e1f663d81b

+ 314 - 0
src/main/resources/20240709_recsys_feature_name_314.txt

@@ -0,0 +1,314 @@
+b123_1h_STR
+b123_1h_log(share)
+b123_1h_ROV
+b123_1h_log(return)
+b123_1h_ROV*log(return)
+b123_1h_ROS
+b123_2h_STR
+b123_2h_log(share)
+b123_2h_ROV
+b123_2h_log(return)
+b123_2h_ROV*log(return)
+b123_2h_ROS
+b123_3h_STR
+b123_3h_log(share)
+b123_3h_ROV
+b123_3h_log(return)
+b123_3h_ROV*log(return)
+b123_3h_ROS
+b123_4h_STR
+b123_4h_log(share)
+b123_4h_ROV
+b123_4h_log(return)
+b123_4h_ROV*log(return)
+b123_4h_ROS
+b123_12h_STR
+b123_12h_log(share)
+b123_12h_ROV
+b123_12h_log(return)
+b123_12h_ROV*log(return)
+b123_12h_ROS
+b123_1d_STR
+b123_1d_log(share)
+b123_1d_ROV
+b123_1d_log(return)
+b123_1d_ROV*log(return)
+b123_1d_ROS
+b123_3d_STR
+b123_3d_log(share)
+b123_3d_ROV
+b123_3d_log(return)
+b123_3d_ROV*log(return)
+b123_3d_ROS
+b123_7d_STR
+b123_7d_log(share)
+b123_7d_ROV
+b123_7d_log(return)
+b123_7d_ROV*log(return)
+b123_7d_ROS
+b167_1h_STR
+b167_1h_log(share)
+b167_1h_ROV
+b167_1h_log(return)
+b167_1h_ROV*log(return)
+b167_1h_ROS
+b167_2h_STR
+b167_2h_log(share)
+b167_2h_ROV
+b167_2h_log(return)
+b167_2h_ROV*log(return)
+b167_2h_ROS
+b167_3h_STR
+b167_3h_log(share)
+b167_3h_ROV
+b167_3h_log(return)
+b167_3h_ROV*log(return)
+b167_3h_ROS
+b167_4h_STR
+b167_4h_log(share)
+b167_4h_ROV
+b167_4h_log(return)
+b167_4h_ROV*log(return)
+b167_4h_ROS
+b167_12h_STR
+b167_12h_log(share)
+b167_12h_ROV
+b167_12h_log(return)
+b167_12h_ROV*log(return)
+b167_12h_ROS
+b167_1d_STR
+b167_1d_log(share)
+b167_1d_ROV
+b167_1d_log(return)
+b167_1d_ROV*log(return)
+b167_1d_ROS
+b167_3d_STR
+b167_3d_log(share)
+b167_3d_ROV
+b167_3d_log(return)
+b167_3d_ROV*log(return)
+b167_3d_ROS
+b167_7d_STR
+b167_7d_log(share)
+b167_7d_ROV
+b167_7d_log(return)
+b167_7d_ROV*log(return)
+b167_7d_ROS
+b8910_1h_STR
+b8910_1h_log(share)
+b8910_1h_ROV
+b8910_1h_log(return)
+b8910_1h_ROV*log(return)
+b8910_1h_ROS
+b8910_2h_STR
+b8910_2h_log(share)
+b8910_2h_ROV
+b8910_2h_log(return)
+b8910_2h_ROV*log(return)
+b8910_2h_ROS
+b8910_3h_STR
+b8910_3h_log(share)
+b8910_3h_ROV
+b8910_3h_log(return)
+b8910_3h_ROV*log(return)
+b8910_3h_ROS
+b8910_4h_STR
+b8910_4h_log(share)
+b8910_4h_ROV
+b8910_4h_log(return)
+b8910_4h_ROV*log(return)
+b8910_4h_ROS
+b8910_12h_STR
+b8910_12h_log(share)
+b8910_12h_ROV
+b8910_12h_log(return)
+b8910_12h_ROV*log(return)
+b8910_12h_ROS
+b8910_1d_STR
+b8910_1d_log(share)
+b8910_1d_ROV
+b8910_1d_log(return)
+b8910_1d_ROV*log(return)
+b8910_1d_ROS
+b8910_3d_STR
+b8910_3d_log(share)
+b8910_3d_ROV
+b8910_3d_log(return)
+b8910_3d_ROV*log(return)
+b8910_3d_ROS
+b8910_7d_STR
+b8910_7d_log(share)
+b8910_7d_ROV
+b8910_7d_log(return)
+b8910_7d_ROV*log(return)
+b8910_7d_ROS
+b111213_1h_STR
+b111213_1h_log(share)
+b111213_1h_ROV
+b111213_1h_log(return)
+b111213_1h_ROV*log(return)
+b111213_1h_ROS
+b111213_2h_STR
+b111213_2h_log(share)
+b111213_2h_ROV
+b111213_2h_log(return)
+b111213_2h_ROV*log(return)
+b111213_2h_ROS
+b111213_3h_STR
+b111213_3h_log(share)
+b111213_3h_ROV
+b111213_3h_log(return)
+b111213_3h_ROV*log(return)
+b111213_3h_ROS
+b111213_4h_STR
+b111213_4h_log(share)
+b111213_4h_ROV
+b111213_4h_log(return)
+b111213_4h_ROV*log(return)
+b111213_4h_ROS
+b111213_12h_STR
+b111213_12h_log(share)
+b111213_12h_ROV
+b111213_12h_log(return)
+b111213_12h_ROV*log(return)
+b111213_12h_ROS
+b111213_1d_STR
+b111213_1d_log(share)
+b111213_1d_ROV
+b111213_1d_log(return)
+b111213_1d_ROV*log(return)
+b111213_1d_ROS
+b111213_3d_STR
+b111213_3d_log(share)
+b111213_3d_ROV
+b111213_3d_log(return)
+b111213_3d_ROV*log(return)
+b111213_3d_ROS
+b111213_7d_STR
+b111213_7d_log(share)
+b111213_7d_ROV
+b111213_7d_log(return)
+b111213_7d_ROV*log(return)
+b111213_7d_ROS
+b171819_1h_STR
+b171819_1h_log(share)
+b171819_1h_ROV
+b171819_1h_log(return)
+b171819_1h_ROV*log(return)
+b171819_1h_ROS
+b171819_2h_STR
+b171819_2h_log(share)
+b171819_2h_ROV
+b171819_2h_log(return)
+b171819_2h_ROV*log(return)
+b171819_2h_ROS
+b171819_3h_STR
+b171819_3h_log(share)
+b171819_3h_ROV
+b171819_3h_log(return)
+b171819_3h_ROV*log(return)
+b171819_3h_ROS
+b171819_4h_STR
+b171819_4h_log(share)
+b171819_4h_ROV
+b171819_4h_log(return)
+b171819_4h_ROV*log(return)
+b171819_4h_ROS
+b171819_12h_STR
+b171819_12h_log(share)
+b171819_12h_ROV
+b171819_12h_log(return)
+b171819_12h_ROV*log(return)
+b171819_12h_ROS
+b171819_1d_STR
+b171819_1d_log(share)
+b171819_1d_ROV
+b171819_1d_log(return)
+b171819_1d_ROV*log(return)
+b171819_1d_ROS
+b171819_3d_STR
+b171819_3d_log(share)
+b171819_3d_ROV
+b171819_3d_log(return)
+b171819_3d_ROV*log(return)
+b171819_3d_ROS
+b171819_7d_STR
+b171819_7d_log(share)
+b171819_7d_ROV
+b171819_7d_log(return)
+b171819_7d_ROV*log(return)
+b171819_7d_ROS
+total_time
+bit_rate
+playcnt_6h
+playcnt_1d
+playcnt_3d
+playcnt_7d
+share_pv_12h
+share_pv_1d
+share_pv_3d
+share_pv_7d
+return_uv_12h
+return_uv_1d
+return_uv_3d
+return_uv_7d
+c3_feature_tags_1d_matchnum
+c3_feature_tags_1d_maxscore
+c3_feature_tags_1d_avgscore
+c3_feature_tags_3d_matchnum
+c3_feature_tags_3d_maxscore
+c3_feature_tags_3d_avgscore
+c3_feature_tags_7d_matchnum
+c3_feature_tags_7d_maxscore
+c3_feature_tags_7d_avgscore
+c4_feature_tags_1d_matchnum
+c4_feature_tags_1d_maxscore
+c4_feature_tags_1d_avgscore
+c4_feature_tags_3d_matchnum
+c4_feature_tags_3d_maxscore
+c4_feature_tags_3d_avgscore
+c4_feature_tags_7d_matchnum
+c4_feature_tags_7d_maxscore
+c4_feature_tags_7d_avgscore
+c5_feature_tags_1d_matchnum
+c5_feature_tags_1d_maxscore
+c5_feature_tags_1d_avgscore
+c5_feature_tags_3d_matchnum
+c5_feature_tags_3d_maxscore
+c5_feature_tags_3d_avgscore
+c5_feature_tags_7d_matchnum
+c5_feature_tags_7d_maxscore
+c5_feature_tags_7d_avgscore
+c6_feature_tags_1d_matchnum
+c6_feature_tags_1d_maxscore
+c6_feature_tags_1d_avgscore
+c6_feature_tags_3d_matchnum
+c6_feature_tags_3d_maxscore
+c6_feature_tags_3d_avgscore
+c6_feature_tags_7d_matchnum
+c6_feature_tags_7d_maxscore
+c6_feature_tags_7d_avgscore
+c7_feature_tags_1d_matchnum
+c7_feature_tags_1d_maxscore
+c7_feature_tags_1d_avgscore
+c7_feature_tags_3d_matchnum
+c7_feature_tags_3d_maxscore
+c7_feature_tags_3d_avgscore
+c7_feature_tags_7d_matchnum
+c7_feature_tags_7d_maxscore
+c7_feature_tags_7d_avgscore
+c8_feature_share_score
+c8_feature_share_num
+c8_feature_share_rank
+c8_feature_return_score
+c8_feature_return_num
+c8_feature_return_rank
+c9_feature_share_score
+c9_feature_share_num
+c9_feature_share_rank
+c9_feature_return_score
+c9_feature_return_num
+c9_feature_return_rank
+d1_exp
+d1_return_n
+d1_rovn

+ 103 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/makedata_recsys_42_bucket_20240709.scala

@@ -0,0 +1,103 @@
+package com.aliyun.odps.spark.examples.makedata_recsys
+
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+/*
+
+ */
+
+object makedata_recsys_42_bucket_20240709 {
+  def main(args: Array[String]): Unit = {
+
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val loader = getClass.getClassLoader
+    val resourceUrl = loader.getResource("20240709_recsys_feature_name_314.txt")
+    val content =
+      if (resourceUrl != null) {
+        val content = Source.fromURL(resourceUrl).getLines().mkString("\n")
+        Source.fromURL(resourceUrl).close()
+        content
+      } else {
+        ""
+      }
+    println(content)
+    val contentList = content.split("\n")
+      .map(r=> r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r=> r.nonEmpty).toList
+
+
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/41_recsys_sample_data_v1/20240705*")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/41_recsys_bucket/")
+    val fileName = param.getOrElse("fileName", "20240705_314_200")
+    val sampleRate = param.getOrElse("sampleRate", "1.0").toDouble
+    val bucketNum = param.getOrElse("bucketNum", "200").toInt
+
+    val data = sc.textFile(readPath)
+    println("问题数据数量:" + data.filter(r=>r.split("\t").length != 3).count())
+    val data1 = data.map(r => {
+      val rList = r.split("\t")
+      val jsons = JSON.parseObject(rList(2))
+      val doubles = scala.collection.mutable.Map[String, Double]()
+      jsons.foreach(r =>{
+        doubles.put(r._1, jsons.getDoubleValue(r._1))
+      })
+      doubles
+    }).sample(false, sampleRate ).repartition(20)
+
+    val result = new ArrayBuffer[String]()
+
+    for (i <- contentList.indices){
+      println("特征:" + contentList(i))
+      val data2 = data1.map(r => r.getOrDefault(contentList(i), 0D)).filter(_ > 1E-8).collect().sorted
+      val len = data2.length
+      if (len == 0){
+        result.add(contentList(i) + "\t" + bucketNum.toString + "\t" + "0")
+      }else{
+        val oneBucketNum = (len - 1) / (bucketNum - 1) + 1 // 确保每个桶至少有一个元素
+        val buffers = new ArrayBuffer[Double]()
+
+        var lastBucketValue = data2(0) // 记录上一个桶的切分点
+        for (j <- 0 until len by oneBucketNum) {
+          val d = data2(j)
+          if (j > 0 && d != lastBucketValue) {
+            // 如果当前切分点不同于上一个切分点,则保存当前切分点
+            buffers += d
+          }
+          lastBucketValue = d // 更新上一个桶的切分点
+        }
+
+        // 最后一个桶的结束点应该是数组的最后一个元素
+        if (!buffers.contains(data2.last)) {
+          buffers += data2.last
+        }
+        result.add(contentList(i) + "\t" + bucketNum.toString + "\t" + buffers.mkString(","))
+      }
+    }
+    val data3 = sc.parallelize(result)
+
+
+    // 4 保存数据到hdfs
+    val hdfsPath = savePath + "/" + fileName
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      data3.repartition(1).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
+    }
+  }
+}

+ 127 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/makedata_recsys_43_bucketData_20240709.scala

@@ -0,0 +1,127 @@
+package com.aliyun.odps.spark.examples.makedata_recsys
+
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
+import examples.extractor.ExtractorUtils
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+/*
+
+ */
+
+object makedata_recsys_43_bucketData_20240709 {
+  def main(args: Array[String]): Unit = {
+
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val loader = getClass.getClassLoader
+
+    val resourceUrlBucket = loader.getResource("20240709_recsys_bucket_314.txt")
+    val buckets =
+      if (resourceUrlBucket != null) {
+        val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
+        Source.fromURL(resourceUrlBucket).close()
+        buckets
+      } else {
+        ""
+      }
+    println(buckets)
+    val bucketsMap = buckets.split("\n")
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty)
+      .map(r =>{
+        val rList = r.split("\t")
+        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
+      }).toMap
+    val bucketsMap_br = sc.broadcast(bucketsMap)
+
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/41_recsys_sample_data_v1/")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/43_recsys_train_data_v1/")
+    val beginStr = param.getOrElse("beginStr", "20240703")
+    val endStr = param.getOrElse("endStr", "20240703")
+    val repartition = param.getOrElse("repartition", "100").toInt
+    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
+    val whatLabel = param.getOrElse("whatLabel", "is_return")
+    val whatApps = param.getOrElse("whatApps", "0,4,5,21,3,6").split(",").toSet
+
+    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
+    for (date <- dateRange) {
+      println("开始执行:" + date)
+      val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
+        val rList = r.split("\t")
+        val logKey = rList(0)
+        val labelKey = rList(1)
+        val jsons = JSON.parseObject(rList(2))
+        val features = scala.collection.mutable.Map[String, Double]()
+        jsons.foreach(r => {
+          features.put(r._1, jsons.getDoubleValue(r._1))
+        })
+        (logKey, labelKey, features)
+      })
+        .filter{
+          case (logKey, labelKey, features) =>
+            val logKeyList = logKey.split(",")
+            val apptype = logKeyList(0)
+            val pagesource = logKeyList(1)
+            whatApps.contains(apptype) && pagesource.endsWith("recommend")
+        }
+        .map{
+          case (logKey, labelKey, features) =>
+            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
+            (label, features)
+        }
+        .mapPartitions(row => {
+          val result = new ArrayBuffer[String]()
+          val bucketsMap = bucketsMap_br.value
+          row.foreach{
+            case (label, features) =>
+              val featuresBucket = features.map{
+                case (name, score) =>
+                  var ifFilter = false
+                  if (filterNames.nonEmpty){
+                    filterNames.foreach(r=> if (!ifFilter && name.startsWith(r)) {ifFilter = true} )
+                  }
+                  if (ifFilter){
+                    ""
+                  }else{
+                    if (score > 1E-8) {
+                      if (bucketsMap.contains(name)) {
+                        val (bucketsNum, buckets) = bucketsMap(name)
+                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
+                        name + ":" + scoreNew.toString
+                      } else {
+                        name + ":" + score.toString
+                      }
+                    } else {
+                      ""
+                    }
+                  }
+              }.filter(_.nonEmpty)
+              result.add(label + "\t" + featuresBucket.mkString("\t"))
+          }
+          result.iterator
+      })
+
+      // 4 保存数据到hdfs
+      val hdfsPath = savePath + "/" + date
+      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+        println("删除路径并开始数据写入:" + hdfsPath)
+        MyHdfsUtils.delete_hdfs_path(hdfsPath)
+        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+      } else {
+        println("路径不合法,无法写入:" + hdfsPath)
+      }
+    }
+  }
+}

+ 13 - 8
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本-推荐

@@ -64,8 +64,14 @@ nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.s
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
 savePath:/dw/recommend/model/04_str_data/ beginStr:20240311 endStr:20240312 featureVersion:v4 ifRepart:100 \
 > p7.log 2>&1 &
-
----
+---------------------------------------------------------------------------------------------
+---------------------------------------------------------------------------------------------
+--------------------------------下-----------------------------------------------------------
+--------------------------------面-----------------------------------------------------------
+--------------------------------为-----------------------------------------------------------
+--------------------------------准-----------------------------------------------------------
+---------------------------------------------------------------------------------------------
+---------------------------------------------------------------------------------------------
 
 nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.examples.makedata.makedata_13_originData_20240529 \
@@ -170,9 +176,8 @@ nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.s
 --class com.aliyun.odps.spark.examples.makedata_recsys.makedata_recsys_41_originData_20240709 \
 --master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 repartition:16 \
-beginStr:2024070108 endStr:2024070323 \
-savePath:/dw/recommend/model/31_ad_sample_data_v3/ \
-table:alg_recsys_ad_sample_all filterHours:00,01,02,03,04,05,06,07 \
-idDefaultValue:0.01 \
-> p31_2024070108.log 2>&1 &
+tablePart:64 repartition:32 \
+beginStr:2024070500 endStr:2024070508 \
+savePath:/dw/recommend/model/41_recsys_sample_data_v1/ \
+table:alg_recsys_sample_all \
+> p41_2024070500.log 2>&1 &