Prechádzať zdrojové kódy

feat:makedata_ad目录结构修改

zhaohaipeng 8 mesiacov pred
rodič
commit
8d2c272926
17 zmenil súbory, kde vykonal 1 pridanie a 3076 odobranie
  1. 1 1
      src/main/java/examples/utils/AdUtil.java
  2. 0 18
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_31_originDataCheck_20240620.scala
  3. 0 401
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_31_originData_20240620.scala
  4. 0 103
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_32_bucket_20240622.scala
  5. 0 396
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_33_bucketDataPrint_20240628.scala
  6. 0 128
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_33_bucketData_20240622.scala
  7. 0 128
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_33_bucketData_20240717.scala
  8. 0 417
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_31_originData_20240718.scala
  9. 0 105
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_32_bucket_20240718.scala
  10. 0 429
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketDataPrint_20240718.scala
  11. 0 128
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_20240718.scala
  12. 0 158
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_20240726.scala
  13. 0 152
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_20240729.scala
  14. 0 181
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_20240729_copy_zheng.scala
  15. 0 130
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_20240729_reduce_feature.scala
  16. 0 140
      src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_default_value_20240718.scala
  17. 0 61
      src/main/scala/com/aliyun/odps/spark/zhp/临时记录的脚本-广告

+ 1 - 1
src/main/scala/com/aliyun/odps/spark/zhp/utils/AdUtil.java → src/main/java/examples/utils/AdUtil.java

@@ -1,4 +1,4 @@
-package com.aliyun.odps.spark.zhp.utils;
+package examples.utils;
 
 import com.alibaba.fastjson.JSONObject;
 

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 0 - 18
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_31_originDataCheck_20240620.scala


+ 0 - 401
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_31_originData_20240620.scala

@@ -1,401 +0,0 @@
-package com.aliyun.odps.spark.zhp.makedata_ad
-
-import com.alibaba.fastjson.{JSON, JSONObject}
-import com.aliyun.odps.TableSchema
-import com.aliyun.odps.data.Record
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
-import examples.extractor.RankExtractorFeature_20240530
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-import org.xm.Similarity
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-/*
-   20240608 提取特征
- */
-
-object makedata_ad_31_originData_20240620 {
-  def main(args: Array[String]): Unit = {
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val tablePart = param.getOrElse("tablePart", "64").toInt
-    val beginStr = param.getOrElse("beginStr", "2024062008")
-    val endStr = param.getOrElse("endStr", "2024062023")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/31_ad_sample_data/")
-    val project = param.getOrElse("project", "loghubods")
-    val table = param.getOrElse("table", "alg_recsys_ad_sample_all")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val filterHours = param.getOrElse("filterHours", "00,01,02,03,04,05,06,07").split(",").toSet
-    val idDefaultValue = param.getOrElse("idDefaultValue", "1.0").toDouble
-    // 2 读取odps+表信息
-    val odpsOps = env.getODPS(sc)
-
-    // 3 循环执行数据生产
-    val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
-    for (dt_hh <- timeRange) {
-      val dt = dt_hh.substring(0, 8)
-      val hh = dt_hh.substring(8, 10)
-      val partition = s"dt=$dt,hh=$hh"
-      if (filterHours.nonEmpty && filterHours.contains(hh)){
-        println("不执行partiton:" + partition)
-      }else{
-        println("开始执行partiton:" + partition)
-        val odpsData = odpsOps.readTable(project = project,
-            table = table,
-            partition = partition,
-            transfer = func,
-            numPartition = tablePart)
-          .map(record => {
-
-
-            val ts = record.getString("ts").toInt
-            val cid = record.getString("cid")
-
-
-            val featureMap = new JSONObject()
-
-            val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b1_feature"))
-            val b2: JSONObject = if (record.isNull("b2_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b2_feature"))
-            val b3: JSONObject = if (record.isNull("b3_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b3_feature"))
-            val b4: JSONObject = if (record.isNull("b4_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b4_feature"))
-            val b5: JSONObject = if (record.isNull("b5_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b5_feature"))
-            val b6: JSONObject = if (record.isNull("b6_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b6_feature"))
-            val b7: JSONObject = if (record.isNull("b7_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b7_feature"))
-            val b8: JSONObject = if (record.isNull("b8_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b8_feature"))
-            val b9: JSONObject = if (record.isNull("b9_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b9_feature"))
-
-
-            featureMap.put("cid_" + cid, idDefaultValue)
-            if (b1.containsKey("adid") && b1.getString("adid").nonEmpty) {
-              featureMap.put("adid_" + b1.getString("adid"), idDefaultValue)
-            }
-            if (b1.containsKey("adverid") && b1.getString("adverid").nonEmpty) {
-              featureMap.put("adverid_" + b1.getString("adverid"), idDefaultValue)
-            }
-            if (b1.containsKey("targeting_conversion") && b1.getString("targeting_conversion").nonEmpty) {
-              featureMap.put("targeting_conversion_" + b1.getString("targeting_conversion"), idDefaultValue)
-            }
-
-
-            if (b1.containsKey("cpa")) {
-              featureMap.put("cpa", b1.getString("cpa").toDouble)
-            }
-
-            for ((bn, prefix1) <- List(
-              (b2, "b2"), (b3, "b3"), (b4, "b4"), (b5, "b5"), (b8, "b8")
-            )) {
-              for (prefix2 <- List(
-                "3h", "6h", "12h", "1d", "3d", "7d"
-              )) {
-                val view = if (bn.isEmpty) 0D else bn.getIntValue("ad_view_" + prefix2).toDouble
-                val click = if (bn.isEmpty) 0D else bn.getIntValue("ad_click_" + prefix2).toDouble
-                val conver = if (bn.isEmpty) 0D else bn.getIntValue("ad_conversion_" + prefix2).toDouble
-                val income = if (bn.isEmpty) 0D else bn.getIntValue("ad_income_" + prefix2).toDouble
-                val f1 = RankExtractorFeature_20240530.calDiv(click, view)
-                val f2 = RankExtractorFeature_20240530.calDiv(conver, view)
-                val f3 = RankExtractorFeature_20240530.calDiv(conver, click)
-                val f4 = conver
-                val f5 = RankExtractorFeature_20240530.calDiv(income * 1000, view)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctr", f1)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctcvr", f2)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "cvr", f3)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver", f4)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ecpm", f5)
-
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "click", click)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*log(view)", conver * RankExtractorFeature_20240530.calLog(view))
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*ctcvr", conver * f2)
-              }
-            }
-
-            for ((bn, prefix1) <- List(
-              (b6, "b6"), (b7, "b7")
-            )) {
-              for (prefix2 <- List(
-                "7d", "14d"
-              )) {
-                val view = if (bn.isEmpty) 0D else bn.getIntValue("ad_view_" + prefix2).toDouble
-                val click = if (bn.isEmpty) 0D else bn.getIntValue("ad_click_" + prefix2).toDouble
-                val conver = if (bn.isEmpty) 0D else bn.getIntValue("ad_conversion_" + prefix2).toDouble
-                val income = if (bn.isEmpty) 0D else bn.getIntValue("ad_income_" + prefix2).toDouble
-                val f1 = RankExtractorFeature_20240530.calDiv(click, view)
-                val f2 = RankExtractorFeature_20240530.calDiv(conver, view)
-                val f3 = RankExtractorFeature_20240530.calDiv(conver, click)
-                val f4 = conver
-                val f5 = RankExtractorFeature_20240530.calDiv(income * 1000, view)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctr", f1)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctcvr", f2)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "cvr", f3)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver", f4)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ecpm", f5)
-
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "click", click)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*log(view)", conver * RankExtractorFeature_20240530.calLog(view))
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*ctcvr", conver * f2)
-              }
-            }
-
-            val c1: JSONObject = if (record.isNull("c1_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("c1_feature"))
-
-            val midActionList = if (c1.containsKey("action") && c1.getString("action").nonEmpty) {
-              c1.getString("action").split(",").map(r => {
-                val rList = r.split(":")
-                (rList(0), (rList(1).toInt, rList(2).toInt, rList(3).toInt, rList(4).toInt, rList(5)))
-              }).sortBy(-_._2._1).toList
-            } else {
-              new ArrayBuffer[(String, (Int, Int, Int, Int, String))]().toList
-            }
-            // u特征
-            val viewAll = midActionList.size.toDouble
-            val clickAll = midActionList.map(_._2._2).sum.toDouble
-            val converAll = midActionList.map(_._2._3).sum.toDouble
-            val incomeAll = midActionList.map(_._2._4).sum.toDouble
-            featureMap.put("viewAll", viewAll)
-            featureMap.put("clickAll", clickAll)
-            featureMap.put("converAll", converAll)
-            featureMap.put("incomeAll", incomeAll)
-            featureMap.put("ctr_all", RankExtractorFeature_20240530.calDiv(clickAll, viewAll))
-            featureMap.put("ctcvr_all", RankExtractorFeature_20240530.calDiv(converAll, viewAll))
-            featureMap.put("cvr_all", RankExtractorFeature_20240530.calDiv(clickAll, converAll))
-            featureMap.put("ecpm_all", RankExtractorFeature_20240530.calDiv(incomeAll * 1000, viewAll))
-
-            // ui特征
-            val midTimeDiff = scala.collection.mutable.Map[String, Double]()
-            midActionList.foreach {
-              case (cid, (ts_history, click, conver, income, title)) =>
-                if (!midTimeDiff.contains("timediff_view_" + cid)) {
-                  midTimeDiff.put("timediff_view_" + cid, 1.0 / ((ts - ts_history).toDouble / 3600.0 / 24.0))
-                }
-                if (!midTimeDiff.contains("timediff_click_" + cid) && click > 0) {
-                  midTimeDiff.put("timediff_click_" + cid, 1.0 / ((ts - ts_history).toDouble / 3600.0 / 24.0))
-                }
-                if (!midTimeDiff.contains("timediff_conver_" + cid) && conver > 0) {
-                  midTimeDiff.put("timediff_conver_" + cid, 1.0 / ((ts - ts_history).toDouble / 3600.0 / 24.0))
-                }
-            }
-
-            val midActionStatic = scala.collection.mutable.Map[String, Double]()
-            midActionList.foreach {
-              case (cid, (ts_history, click, conver, income, title)) =>
-                midActionStatic.put("actionstatic_view_" + cid, 1.0 + midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0))
-                midActionStatic.put("actionstatic_click_" + cid, click + midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0))
-                midActionStatic.put("actionstatic_conver_" + cid, conver + midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0))
-                midActionStatic.put("actionstatic_income_" + cid, income + midActionStatic.getOrDefault("actionstatic_income_" + cid, 0.0))
-            }
-
-            if (midTimeDiff.contains("timediff_view_" + cid)) {
-              featureMap.put("timediff_view", midTimeDiff.getOrDefault("timediff_view_" + cid, 0.0))
-            }
-            if (midTimeDiff.contains("timediff_click_" + cid)) {
-              featureMap.put("timediff_click", midTimeDiff.getOrDefault("timediff_click_" + cid, 0.0))
-            }
-            if (midTimeDiff.contains("timediff_conver_" + cid)) {
-              featureMap.put("timediff_conver", midTimeDiff.getOrDefault("timediff_conver_" + cid, 0.0))
-            }
-            if (midActionStatic.contains("actionstatic_view_" + cid)) {
-              featureMap.put("actionstatic_view", midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0))
-            }
-            if (midActionStatic.contains("actionstatic_click_" + cid)) {
-              featureMap.put("actionstatic_click", midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0))
-            }
-            if (midActionStatic.contains("actionstatic_conver_" + cid)) {
-              featureMap.put("actionstatic_conver", midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0))
-            }
-            if (midActionStatic.contains("actionstatic_income_" + cid)) {
-              featureMap.put("actionstatic_income", midActionStatic.getOrDefault("actionstatic_income_" + cid, 0.0))
-            }
-            if (midActionStatic.contains("actionstatic_view_" + cid) && midActionStatic.contains("actionstatic_click_" + cid)) {
-              featureMap.put("actionstatic_ctr", RankExtractorFeature_20240530.calDiv(
-                midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0),
-                midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0)
-              ))
-            }
-            if (midActionStatic.contains("actionstatic_view_" + cid) && midActionStatic.contains("actionstatic_conver_" + cid)) {
-              featureMap.put("actionstatic_ctcvr", RankExtractorFeature_20240530.calDiv(
-                midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0),
-                midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0)
-              ))
-            }
-            if (midActionStatic.contains("actionstatic_conver_" + cid) && midActionStatic.contains("actionstatic_click_" + cid)) {
-              featureMap.put("actionstatic_cvr", RankExtractorFeature_20240530.calDiv(
-                midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0),
-                midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0)
-              ))
-            }
-
-            val e1: JSONObject = if (record.isNull("e1_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("e1_feature"))
-            val e2: JSONObject = if (record.isNull("e2_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("e2_feature"))
-            val title = b1.getOrDefault("cidtitle", "").toString
-            if (title.nonEmpty) {
-              for ((en, prefix1) <- List((e1, "e1"), (e2, "e2"))) {
-                for (prefix2 <- List("tags_3d", "tags_7d", "tags_14d")) {
-                  if (en.nonEmpty && en.containsKey(prefix2) && en.getString(prefix2).nonEmpty) {
-                    val (f1, f2, f3, f4) = funcC34567ForTags(en.getString(prefix2), title)
-                    featureMap.put(prefix1 + "_" + prefix2 + "_matchnum", f1)
-                    featureMap.put(prefix1 + "_" + prefix2 + "_maxscore", f3)
-                    featureMap.put(prefix1 + "_" + prefix2 + "_avgscore", f4)
-
-                  }
-                }
-              }
-            }
-
-            val d1: JSONObject = if (record.isNull("d1_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("d1_feature"))
-            val d2: JSONObject = if (record.isNull("d2_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("d2_feature"))
-
-            if (d1.nonEmpty) {
-              for (prefix <- List("3h", "6h", "12h", "1d", "3d", "7d")) {
-                val view = if (!d1.containsKey("ad_view_" + prefix)) 0D else d1.getIntValue("ad_view_" + prefix).toDouble
-                val click = if (!d1.containsKey("ad_click_" + prefix)) 0D else d1.getIntValue("ad_click_" + prefix).toDouble
-                val conver = if (!d1.containsKey("ad_conversion_" + prefix)) 0D else d1.getIntValue("ad_conversion_" + prefix).toDouble
-                val income = if (!d1.containsKey("ad_income_" + prefix)) 0D else d1.getIntValue("ad_income_" + prefix).toDouble
-                val f1 = RankExtractorFeature_20240530.calDiv(click, view)
-                val f2 = RankExtractorFeature_20240530.calDiv(conver, view)
-                val f3 = RankExtractorFeature_20240530.calDiv(conver, click)
-                val f4 = conver
-                val f5 = RankExtractorFeature_20240530.calDiv(income * 1000, view)
-                featureMap.put("d1_feature" + "_" + prefix + "_" + "ctr", f1)
-                featureMap.put("d1_feature" + "_" + prefix + "_" + "ctcvr", f2)
-                featureMap.put("d1_feature" + "_" + prefix + "_" + "cvr", f3)
-                featureMap.put("d1_feature" + "_" + prefix + "_" + "conver", f4)
-                featureMap.put("d1_feature" + "_" + prefix + "_" + "ecpm", f5)
-              }
-            }
-
-            val vidRankMaps = scala.collection.mutable.Map[String, scala.collection.immutable.Map[String, Double]]()
-            if (d2.nonEmpty) {
-              d2.foreach(r => {
-                val key = r._1
-                val value = d2.getString(key).split(",").map(r => {
-                  val rList = r.split(":")
-                  (rList(0), rList(2).toDouble)
-                }).toMap
-                vidRankMaps.put(key, value)
-              })
-            }
-            for (prefix1 <- List("ctr", "ctcvr", "ecpm")) {
-              for (prefix2 <- List("1d", "3d", "7d", "14d")) {
-                if (vidRankMaps.contains(prefix1 + "_" + prefix2)) {
-                  val rank = vidRankMaps(prefix1 + "_" + prefix2).getOrDefault(cid, 0.0)
-                  if (rank >= 1.0) {
-                    featureMap.put("vid_rank_" + prefix1 + "_" + prefix2, 1.0 / rank)
-                  }
-                }
-              }
-            }
-
-
-            /*
-            广告
-              sparse:cid adid adverid targeting_conversion
-
-              cpa --> 1个
-              adverid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr conver ecpm  --> 30个
-              cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
-              地理//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
-              app//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
-              手机品牌//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
-              系统 无数据
-              week//cid下的 7d 14d、 ctr ctcvr cvr ecpm conver --> 10个
-              hour//cid下的 7d 14d、 ctr ctcvr cvr ecpm conver --> 10个
-
-            用户
-              用户历史 点击/转化 的title tag;3d 7d 14d; cid的title; 数量/最高分/平均分 --> 18个
-              用户历史 14d 看过/点过/转化次数/income; ctr cvr ctcvr ecpm;  --> 8个
-
-              用户到cid的ui特征 --> 10个
-                1/用户最近看过这个cid的时间间隔
-                1/用户最近点过这个cid的时间间隔
-                1/用户最近转过这个cid的时间间隔
-                用户看过这个cid多少次
-                用户点过这个cid多少次
-                用户转过这个cid多少次
-                用户对这个cid花了多少钱
-                用户对这个cid的ctr ctcvr cvr
-
-            视频
-              title与cid的 sim-score-1/-2 无数据
-              vid//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
-              vid//cid下的 1d 3d 7d 14d、 ctr ctcvr ecpm 的rank值 倒数 --> 12个
-
-             */
-
-
-            //4 处理label信息。
-            val labels = new JSONObject
-            for (labelKey <- List("ad_is_click", "ad_is_conversion")) {
-              if (!record.isNull(labelKey)) {
-                labels.put(labelKey, record.getString(labelKey))
-              }
-            }
-            //5 处理log key表头。
-            val apptype = record.getString("apptype")
-            val mid = record.getString("mid")
-            val headvideoid = record.getString("headvideoid")
-            val logKey = (apptype, mid, cid, ts, headvideoid).productIterator.mkString(",")
-            val labelKey = labels.toString()
-            val featureKey = featureMap.toString()
-            //6 拼接数据,保存。
-            logKey + "\t" + labelKey + "\t" + featureKey
-          })
-
-        // 4 保存数据到hdfs
-        val savePartition = dt + hh
-        val hdfsPath = savePath + "/" + savePartition
-        if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-          println("删除路径并开始数据写入:" + hdfsPath)
-          MyHdfsUtils.delete_hdfs_path(hdfsPath)
-          odpsData.coalesce(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-        } else {
-          println("路径不合法,无法写入:" + hdfsPath)
-        }
-      }
-
-    }
-  }
-
-  def func(record: Record, schema: TableSchema): Record = {
-    record
-  }
-
-  def funcC34567ForTags(tags: String, title: String): Tuple4[Double, String, Double, Double] = {
-    // 匹配数量 匹配词 语义最高相似度分 语义平均相似度分
-    val tagsList = tags.split(",")
-    var d1 = 0.0
-    val d2 = new ArrayBuffer[String]()
-    var d3 = 0.0
-    var d4 = 0.0
-    for (tag <- tagsList) {
-      if (title.contains(tag)) {
-        d1 = d1 + 1.0
-        d2.add(tag)
-      }
-      val score = Similarity.conceptSimilarity(tag, title)
-      d3 = if (score > d3) score else d3
-      d4 = d4 + score
-    }
-    d4 = if (tagsList.nonEmpty) d4 / tagsList.size else d4
-    (d1, d2.mkString(","), d3, d4)
-  }
-}

+ 0 - 103
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_32_bucket_20240622.scala

@@ -1,103 +0,0 @@
-package com.aliyun.odps.spark.zhp.makedata_ad
-
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils}
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
-/*
-
- */
-
-object makedata_ad_32_bucket_20240622 {
-  def main(args: Array[String]): Unit = {
-
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-    val loader = getClass.getClassLoader
-    val resourceUrl = loader.getResource("20240622_ad_feature_name.txt")
-    val content =
-      if (resourceUrl != null) {
-        val content = Source.fromURL(resourceUrl).getLines().mkString("\n")
-        Source.fromURL(resourceUrl).close()
-        content
-      } else {
-        ""
-      }
-    println(content)
-    val contentList = content.split("\n")
-      .map(r=> r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r=> r.nonEmpty).toList
-
-
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/20240620*")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/32_bucket_data/")
-    val fileName = param.getOrElse("fileName", "20240620_100")
-    val sampleRate = param.getOrElse("sampleRate", "1.0").toDouble
-    val bucketNum = param.getOrElse("bucketNum", "100").toInt
-
-    val data = sc.textFile(readPath)
-    println("问题数据数量:" + data.filter(r=>r.split("\t").length != 3).count())
-    val data1 = data.map(r => {
-      val rList = r.split("\t")
-      val jsons = JSON.parseObject(rList(2))
-      val doubles = scala.collection.mutable.Map[String, Double]()
-      jsons.foreach(r =>{
-        doubles.put(r._1, jsons.getDoubleValue(r._1))
-      })
-      doubles
-    }).sample(false, sampleRate ).repartition(20)
-
-    val result = new ArrayBuffer[String]()
-
-    for (i <- contentList.indices){
-      println("特征:" + contentList(i))
-      val data2 = data1.map(r => r.getOrDefault(contentList(i), 0D)).filter(_ > 1E-8).collect().sorted
-      val len = data2.length
-      if (len == 0){
-        result.add(contentList(i) + "\t" + bucketNum.toString + "\t" + "0")
-      }else{
-        val oneBucketNum = (len - 1) / (bucketNum - 1) + 1 // 确保每个桶至少有一个元素
-        val buffers = new ArrayBuffer[Double]()
-
-        var lastBucketValue = data2(0) // 记录上一个桶的切分点
-        for (j <- 0 until len by oneBucketNum) {
-          val d = data2(j)
-          if (j > 0 && d != lastBucketValue) {
-            // 如果当前切分点不同于上一个切分点,则保存当前切分点
-            buffers += d
-          }
-          lastBucketValue = d // 更新上一个桶的切分点
-        }
-
-        // 最后一个桶的结束点应该是数组的最后一个元素
-        if (!buffers.contains(data2.last)) {
-          buffers += data2.last
-        }
-        result.add(contentList(i) + "\t" + bucketNum.toString + "\t" + buffers.mkString(","))
-      }
-    }
-    val data3 = sc.parallelize(result)
-
-
-    // 4 保存数据到hdfs
-    val hdfsPath = savePath + "/" + fileName
-    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-      println("删除路径并开始数据写入:" + hdfsPath)
-      MyHdfsUtils.delete_hdfs_path(hdfsPath)
-      data3.repartition(1).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-    } else {
-      println("路径不合法,无法写入:" + hdfsPath)
-    }
-  }
-}

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 0 - 396
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_33_bucketDataPrint_20240628.scala


+ 0 - 128
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_33_bucketData_20240622.scala

@@ -1,128 +0,0 @@
-package com.aliyun.odps.spark.zhp.makedata_ad
-
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
-import examples.extractor.ExtractorUtils
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
-/*
-
- */
-
-object makedata_ad_33_bucketData_20240622 {
-  def main(args: Array[String]): Unit = {
-
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-    val loader = getClass.getClassLoader
-
-    val resourceUrlBucket = loader.getResource("20240704_ad_bucket_351.txt")
-    val buckets =
-      if (resourceUrlBucket != null) {
-        val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
-        Source.fromURL(resourceUrlBucket).close()
-        buckets
-      } else {
-        ""
-      }
-    println(buckets)
-    val bucketsMap = buckets.split("\n")
-      .map(r => r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r => r.nonEmpty)
-      .map(r =>{
-        val rList = r.split("\t")
-        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
-      }).toMap
-    val bucketsMap_br = sc.broadcast(bucketsMap)
-
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
-    val beginStr = param.getOrElse("beginStr", "20240620")
-    val endStr = param.getOrElse("endStr", "20240620")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
-    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
-
-    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
-    for (date <- dateRange) {
-      println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
-          val rList = r.split("\t")
-          val logKey = rList(0)
-          val labelKey = rList(1)
-          val jsons = JSON.parseObject(rList(2))
-          val features = scala.collection.mutable.Map[String, Double]()
-          jsons.foreach(r => {
-            features.put(r._1, jsons.getDoubleValue(r._1))
-          })
-          (logKey, labelKey, features)
-        })
-        .filter{
-          case (logKey, labelKey, features) =>
-            val logKeyList = logKey.split(",")
-            val apptype = logKeyList(0)
-            !Set("12", "13").contains(apptype)
-        }
-        .map{
-          case (logKey, labelKey, features) =>
-            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-            (label, features)
-        }
-        .mapPartitions(row => {
-          val result = new ArrayBuffer[String]()
-          val bucketsMap = bucketsMap_br.value
-          row.foreach{
-            case (label, features) =>
-              val featuresBucket = features.map{
-                case (name, score) =>
-                  var ifFilter = false
-                  if (filterNames.nonEmpty){
-                    filterNames.foreach(r=> if (!ifFilter && name.startsWith(r)) {ifFilter = true} )
-                  }
-                  if (ifFilter){
-                    ""
-                  }else{
-                    if (score > 1E-8) {
-                      if (bucketsMap.contains(name)) {
-                        val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                        name + ":" + scoreNew.toString
-                      } else {
-                        name + ":" + score.toString
-                      }
-                    } else {
-                      ""
-                    }
-                  }
-              }.filter(_.nonEmpty)
-              result.add(label + "\t" + featuresBucket.mkString("\t"))
-          }
-          result.iterator
-        })
-
-      // 4 保存数据到hdfs
-      val hdfsPath = savePath + "/" + date
-      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-        println("删除路径并开始数据写入:" + hdfsPath)
-        MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-      } else {
-        println("路径不合法,无法写入:" + hdfsPath)
-      }
-    }
-
-
-
-  }
-}

+ 0 - 128
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240620/makedata_ad_33_bucketData_20240717.scala

@@ -1,128 +0,0 @@
-package com.aliyun.odps.spark.zhp.makedata_ad
-
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
-import examples.extractor.ExtractorUtils
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
-/*
-
- */
-
-object makedata_ad_33_bucketData_20240717 {
-  def main(args: Array[String]): Unit = {
-
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-    val loader = getClass.getClassLoader
-
-    val resourceUrlBucket = loader.getResource("20240704_ad_bucket_351.txt")
-    val buckets =
-      if (resourceUrlBucket != null) {
-        val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
-        Source.fromURL(resourceUrlBucket).close()
-        buckets
-      } else {
-        ""
-      }
-    println(buckets)
-    val bucketsMap = buckets.split("\n")
-      .map(r => r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r => r.nonEmpty)
-      .map(r =>{
-        val rList = r.split("\t")
-        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
-      }).toMap
-    val bucketsMap_br = sc.broadcast(bucketsMap)
-
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
-    val beginStr = param.getOrElse("beginStr", "2024062000")
-    val endStr = param.getOrElse("endStr", "2024062023")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
-    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
-
-    val dateRange = MyDateUtils.getDateHourRange(beginStr, endStr)
-    for (date <- dateRange) {
-      println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
-          val rList = r.split("\t")
-          val logKey = rList(0)
-          val labelKey = rList(1)
-          val jsons = JSON.parseObject(rList(2))
-          val features = scala.collection.mutable.Map[String, Double]()
-          jsons.foreach(r => {
-            features.put(r._1, jsons.getDoubleValue(r._1))
-          })
-          (logKey, labelKey, features)
-        })
-        .filter{
-          case (logKey, labelKey, features) =>
-            val logKeyList = logKey.split(",")
-            val apptype = logKeyList(0)
-            !Set("12", "13").contains(apptype)
-        }
-        .map{
-          case (logKey, labelKey, features) =>
-            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-            (label, features)
-        }
-        .mapPartitions(row => {
-          val result = new ArrayBuffer[String]()
-          val bucketsMap = bucketsMap_br.value
-          row.foreach{
-            case (label, features) =>
-              val featuresBucket = features.map{
-                case (name, score) =>
-                  var ifFilter = false
-                  if (filterNames.nonEmpty){
-                    filterNames.foreach(r=> if (!ifFilter && name.startsWith(r)) {ifFilter = true} )
-                  }
-                  if (ifFilter){
-                    ""
-                  }else{
-                    if (score > 1E-8) {
-                      if (bucketsMap.contains(name)) {
-                        val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                        name + ":" + scoreNew.toString
-                      } else {
-                        name + ":" + score.toString
-                      }
-                    } else {
-                      ""
-                    }
-                  }
-              }.filter(_.nonEmpty)
-              result.add(label + "\t" + featuresBucket.mkString("\t"))
-          }
-          result.iterator
-        })
-
-      // 4 保存数据到hdfs
-      val hdfsPath = savePath + "/" + date
-      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-        println("删除路径并开始数据写入:" + hdfsPath)
-        MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-      } else {
-        println("路径不合法,无法写入:" + hdfsPath)
-      }
-    }
-
-
-
-  }
-}

+ 0 - 417
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_31_originData_20240718.scala

@@ -1,417 +0,0 @@
-package com.aliyun.odps.spark.zhp.makedata_ad.v20240718
-
-import com.alibaba.fastjson.{JSON, JSONObject}
-import com.aliyun.odps.TableSchema
-import com.aliyun.odps.data.Record
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
-import examples.extractor.RankExtractorFeature_20240530
-import examples.utils.DateTimeUtil
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-import org.xm.Similarity
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-/*
-   20240608 提取特征
- */
-
-object makedata_ad_31_originData_20240718 {
-  def main(args: Array[String]): Unit = {
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val tablePart = param.getOrElse("tablePart", "64").toInt
-    val beginStr = param.getOrElse("beginStr", "2024062008")
-    val endStr = param.getOrElse("endStr", "2024062023")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/31_ad_sample_data/")
-    val project = param.getOrElse("project", "loghubods")
-    val table = param.getOrElse("table", "alg_recsys_ad_sample_all")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val filterHours = param.getOrElse("filterHours", "00,01,02,03,04,05,06,07").split(",").toSet
-    val idDefaultValue = param.getOrElse("idDefaultValue", "1.0").toDouble
-    // 2 读取odps+表信息
-    val odpsOps = env.getODPS(sc)
-
-    // 3 循环执行数据生产
-    val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
-    for (dt_hh <- timeRange) {
-      val dt = dt_hh.substring(0, 8)
-      val hh = dt_hh.substring(8, 10)
-      val partition = s"dt=$dt,hh=$hh"
-      if (filterHours.nonEmpty && filterHours.contains(hh)){
-        println("不执行partiton:" + partition)
-      }else{
-        println("开始执行partiton:" + partition)
-        val odpsData = odpsOps.readTable(project = project,
-            table = table,
-            partition = partition,
-            transfer = func,
-            numPartition = tablePart)
-          .map(record => {
-
-
-            val ts = record.getString("ts").toInt
-            val cid = record.getString("cid")
-
-
-            val featureMap = new JSONObject()
-
-            val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b1_feature"))
-            val b2: JSONObject = if (record.isNull("b2_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b2_feature"))
-            val b3: JSONObject = if (record.isNull("b3_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b3_feature"))
-            val b4: JSONObject = if (record.isNull("b4_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b4_feature"))
-            val b5: JSONObject = if (record.isNull("b5_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b5_feature"))
-            val b6: JSONObject = if (record.isNull("b6_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b6_feature"))
-            val b7: JSONObject = if (record.isNull("b7_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b7_feature"))
-            val b8: JSONObject = if (record.isNull("b8_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b8_feature"))
-            val b9: JSONObject = if (record.isNull("b9_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("b9_feature"))
-
-
-            featureMap.put("cid_" + cid, idDefaultValue)
-            if (b1.containsKey("adid") && b1.getString("adid").nonEmpty) {
-              featureMap.put("adid_" + b1.getString("adid"), idDefaultValue)
-            }
-            if (b1.containsKey("adverid") && b1.getString("adverid").nonEmpty) {
-              featureMap.put("adverid_" + b1.getString("adverid"), idDefaultValue)
-            }
-            if (b1.containsKey("targeting_conversion") && b1.getString("targeting_conversion").nonEmpty) {
-              featureMap.put("targeting_conversion_" + b1.getString("targeting_conversion"), idDefaultValue)
-            }
-
-            val hour = DateTimeUtil.getHourByTimestamp(ts)
-            featureMap.put("hour_" + hour, 0.1)
-
-            val dayOfWeek = DateTimeUtil.getDayOrWeekByTimestamp(ts)
-            featureMap.put("dayofweek_" + dayOfWeek, 0.1);
-
-            if (b1.containsKey("cpa")) {
-              featureMap.put("cpa", b1.getString("cpa").toDouble)
-            }
-            if (b1.containsKey("weight") && b1.getString("weight").nonEmpty){
-              featureMap.put("weight", b1.getString("weight").toDouble)
-            }
-
-            for ((bn, prefix1) <- List(
-              (b2, "b2"), (b3, "b3"), (b4, "b4"), (b5, "b5"), (b8, "b8"), (b9, "b9")
-            )) {
-              for (prefix2 <- List(
-                "1h", "2h", "3h", "4h", "5h", "6h", "12h", "1d", "3d", "7d", "today", "yesterday"
-              )) {
-                val view = if (bn.isEmpty) 0D else bn.getIntValue("ad_view_" + prefix2).toDouble
-                val click = if (bn.isEmpty) 0D else bn.getIntValue("ad_click_" + prefix2).toDouble
-                val conver = if (bn.isEmpty) 0D else bn.getIntValue("ad_conversion_" + prefix2).toDouble
-                val income = if (bn.isEmpty) 0D else bn.getIntValue("ad_income_" + prefix2).toDouble
-                val f1 = RankExtractorFeature_20240530.calDiv(click, view)
-                val f2 = RankExtractorFeature_20240530.calDiv(conver, view)
-                val f3 = RankExtractorFeature_20240530.calDiv(conver, click)
-                val f4 = conver
-                val f5 = RankExtractorFeature_20240530.calDiv(income * 1000, view)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctr", f1)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctcvr", f2)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "cvr", f3)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver", f4)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ecpm", f5)
-
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "click", click)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*log(view)", conver * RankExtractorFeature_20240530.calLog(view))
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*ctcvr", conver * f2)
-              }
-            }
-
-            for ((bn, prefix1) <- List(
-              (b6, "b6"), (b7, "b7")
-            )) {
-              for (prefix2 <- List(
-                "7d", "14d"
-              )) {
-                val view = if (bn.isEmpty) 0D else bn.getIntValue("ad_view_" + prefix2).toDouble
-                val click = if (bn.isEmpty) 0D else bn.getIntValue("ad_click_" + prefix2).toDouble
-                val conver = if (bn.isEmpty) 0D else bn.getIntValue("ad_conversion_" + prefix2).toDouble
-                val income = if (bn.isEmpty) 0D else bn.getIntValue("ad_income_" + prefix2).toDouble
-                val f1 = RankExtractorFeature_20240530.calDiv(click, view)
-                val f2 = RankExtractorFeature_20240530.calDiv(conver, view)
-                val f3 = RankExtractorFeature_20240530.calDiv(conver, click)
-                val f4 = conver
-                val f5 = RankExtractorFeature_20240530.calDiv(income * 1000, view)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctr", f1)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ctcvr", f2)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "cvr", f3)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver", f4)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "ecpm", f5)
-
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "click", click)
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*log(view)", conver * RankExtractorFeature_20240530.calLog(view))
-                featureMap.put(prefix1 + "_" + prefix2 + "_" + "conver*ctcvr", conver * f2)
-              }
-            }
-
-            val c1: JSONObject = if (record.isNull("c1_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("c1_feature"))
-
-            val midActionList = if (c1.containsKey("action") && c1.getString("action").nonEmpty) {
-              c1.getString("action").split(",").map(r => {
-                val rList = r.split(":")
-                (rList(0), (rList(1).toInt, rList(2).toInt, rList(3).toInt, rList(4).toInt, rList(5)))
-              }).sortBy(-_._2._1).toList
-            } else {
-              new ArrayBuffer[(String, (Int, Int, Int, Int, String))]().toList
-            }
-            // u特征
-            val viewAll = midActionList.size.toDouble
-            val clickAll = midActionList.map(_._2._2).sum.toDouble
-            val converAll = midActionList.map(_._2._3).sum.toDouble
-            val incomeAll = midActionList.map(_._2._4).sum.toDouble
-            featureMap.put("viewAll", viewAll)
-            featureMap.put("clickAll", clickAll)
-            featureMap.put("converAll", converAll)
-            featureMap.put("incomeAll", incomeAll)
-            featureMap.put("ctr_all", RankExtractorFeature_20240530.calDiv(clickAll, viewAll))
-            featureMap.put("ctcvr_all", RankExtractorFeature_20240530.calDiv(converAll, viewAll))
-            featureMap.put("cvr_all", RankExtractorFeature_20240530.calDiv(clickAll, converAll))
-            featureMap.put("ecpm_all", RankExtractorFeature_20240530.calDiv(incomeAll * 1000, viewAll))
-
-            // ui特征
-            val midTimeDiff = scala.collection.mutable.Map[String, Double]()
-            midActionList.foreach {
-              case (cid, (ts_history, click, conver, income, title)) =>
-                if (!midTimeDiff.contains("timediff_view_" + cid)) {
-                  midTimeDiff.put("timediff_view_" + cid, 1.0 / ((ts - ts_history).toDouble / 3600.0 / 24.0))
-                }
-                if (!midTimeDiff.contains("timediff_click_" + cid) && click > 0) {
-                  midTimeDiff.put("timediff_click_" + cid, 1.0 / ((ts - ts_history).toDouble / 3600.0 / 24.0))
-                }
-                if (!midTimeDiff.contains("timediff_conver_" + cid) && conver > 0) {
-                  midTimeDiff.put("timediff_conver_" + cid, 1.0 / ((ts - ts_history).toDouble / 3600.0 / 24.0))
-                }
-            }
-
-            val midActionStatic = scala.collection.mutable.Map[String, Double]()
-            midActionList.foreach {
-              case (cid, (ts_history, click, conver, income, title)) =>
-                midActionStatic.put("actionstatic_view_" + cid, 1.0 + midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0))
-                midActionStatic.put("actionstatic_click_" + cid, click + midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0))
-                midActionStatic.put("actionstatic_conver_" + cid, conver + midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0))
-                midActionStatic.put("actionstatic_income_" + cid, income + midActionStatic.getOrDefault("actionstatic_income_" + cid, 0.0))
-            }
-
-            if (midTimeDiff.contains("timediff_view_" + cid)) {
-              featureMap.put("timediff_view", midTimeDiff.getOrDefault("timediff_view_" + cid, 0.0))
-            }
-            if (midTimeDiff.contains("timediff_click_" + cid)) {
-              featureMap.put("timediff_click", midTimeDiff.getOrDefault("timediff_click_" + cid, 0.0))
-            }
-            if (midTimeDiff.contains("timediff_conver_" + cid)) {
-              featureMap.put("timediff_conver", midTimeDiff.getOrDefault("timediff_conver_" + cid, 0.0))
-            }
-            if (midActionStatic.contains("actionstatic_view_" + cid)) {
-              featureMap.put("actionstatic_view", midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0))
-            }
-            if (midActionStatic.contains("actionstatic_click_" + cid)) {
-              featureMap.put("actionstatic_click", midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0))
-            }
-            if (midActionStatic.contains("actionstatic_conver_" + cid)) {
-              featureMap.put("actionstatic_conver", midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0))
-            }
-            if (midActionStatic.contains("actionstatic_income_" + cid)) {
-              featureMap.put("actionstatic_income", midActionStatic.getOrDefault("actionstatic_income_" + cid, 0.0))
-            }
-            if (midActionStatic.contains("actionstatic_view_" + cid) && midActionStatic.contains("actionstatic_click_" + cid)) {
-              featureMap.put("actionstatic_ctr", RankExtractorFeature_20240530.calDiv(
-                midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0),
-                midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0)
-              ))
-            }
-            if (midActionStatic.contains("actionstatic_view_" + cid) && midActionStatic.contains("actionstatic_conver_" + cid)) {
-              featureMap.put("actionstatic_ctcvr", RankExtractorFeature_20240530.calDiv(
-                midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0),
-                midActionStatic.getOrDefault("actionstatic_view_" + cid, 0.0)
-              ))
-            }
-            if (midActionStatic.contains("actionstatic_conver_" + cid) && midActionStatic.contains("actionstatic_click_" + cid)) {
-              featureMap.put("actionstatic_cvr", RankExtractorFeature_20240530.calDiv(
-                midActionStatic.getOrDefault("actionstatic_conver_" + cid, 0.0),
-                midActionStatic.getOrDefault("actionstatic_click_" + cid, 0.0)
-              ))
-            }
-
-            val e1: JSONObject = if (record.isNull("e1_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("e1_feature"))
-            val e2: JSONObject = if (record.isNull("e2_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("e2_feature"))
-            val title = b1.getOrDefault("cidtitle", "").toString
-            if (title.nonEmpty) {
-              for ((en, prefix1) <- List((e1, "e1"), (e2, "e2"))) {
-                for (prefix2 <- List("tags_3d", "tags_7d", "tags_14d")) {
-                  if (en.nonEmpty && en.containsKey(prefix2) && en.getString(prefix2).nonEmpty) {
-                    val (f1, f2, f3, f4) = funcC34567ForTags(en.getString(prefix2), title)
-                    featureMap.put(prefix1 + "_" + prefix2 + "_matchnum", f1)
-                    featureMap.put(prefix1 + "_" + prefix2 + "_maxscore", f3)
-                    featureMap.put(prefix1 + "_" + prefix2 + "_avgscore", f4)
-
-                  }
-                }
-              }
-            }
-
-            val d1: JSONObject = if (record.isNull("d1_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("d1_feature"))
-            val d2: JSONObject = if (record.isNull("d2_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("d2_feature"))
-            val d3: JSONObject = if (record.isNull("d3_feature")) new JSONObject() else
-              JSON.parseObject(record.getString("d3_feature"))
-
-            if (d1.nonEmpty) {
-              for (prefix <- List("3h", "6h", "12h", "1d", "3d", "7d")) {
-                val view = if (!d1.containsKey("ad_view_" + prefix)) 0D else d1.getIntValue("ad_view_" + prefix).toDouble
-                val click = if (!d1.containsKey("ad_click_" + prefix)) 0D else d1.getIntValue("ad_click_" + prefix).toDouble
-                val conver = if (!d1.containsKey("ad_conversion_" + prefix)) 0D else d1.getIntValue("ad_conversion_" + prefix).toDouble
-                val income = if (!d1.containsKey("ad_income_" + prefix)) 0D else d1.getIntValue("ad_income_" + prefix).toDouble
-                val f1 = RankExtractorFeature_20240530.calDiv(click, view)
-                val f2 = RankExtractorFeature_20240530.calDiv(conver, view)
-                val f3 = RankExtractorFeature_20240530.calDiv(conver, click)
-                val f4 = conver
-                val f5 = RankExtractorFeature_20240530.calDiv(income * 1000, view)
-                featureMap.put("d1_feature" + "_" + prefix + "_" + "ctr", f1)
-                featureMap.put("d1_feature" + "_" + prefix + "_" + "ctcvr", f2)
-                featureMap.put("d1_feature" + "_" + prefix + "_" + "cvr", f3)
-                featureMap.put("d1_feature" + "_" + prefix + "_" + "conver", f4)
-                featureMap.put("d1_feature" + "_" + prefix + "_" + "ecpm", f5)
-              }
-            }
-
-            val vidRankMaps = scala.collection.mutable.Map[String, scala.collection.immutable.Map[String, Double]]()
-            if (d2.nonEmpty) {
-              d2.foreach(r => {
-                val key = r._1
-                val value = d2.getString(key).split(",").map(r => {
-                  val rList = r.split(":")
-                  (rList(0), rList(2).toDouble)
-                }).toMap
-                vidRankMaps.put(key, value)
-              })
-            }
-            for (prefix1 <- List("ctr", "ctcvr", "ecpm")) {
-              for (prefix2 <- List("1d", "3d", "7d", "14d")) {
-                if (vidRankMaps.contains(prefix1 + "_" + prefix2)) {
-                  val rank = vidRankMaps(prefix1 + "_" + prefix2).getOrDefault(cid, 0.0)
-                  if (rank >= 1.0) {
-                    featureMap.put("vid_rank_" + prefix1 + "_" + prefix2, 1.0 / rank)
-                  }
-                }
-              }
-            }
-
-            if (d3.nonEmpty){
-              val vTitle= d3.getString("title")
-              val score = Similarity.conceptSimilarity(title, vTitle)
-              featureMap.put("ctitle_vtitle_similarity", score);
-            }
-
-            /*
-            广告
-              sparse:cid adid adverid targeting_conversion
-
-              cpa --> 1个
-              adverid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr conver ecpm  --> 30个
-              cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
-              地理//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
-              app//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
-              手机品牌//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
-              系统 无数据
-              week//cid下的 7d 14d、 ctr ctcvr cvr ecpm conver --> 10个
-              hour//cid下的 7d 14d、 ctr ctcvr cvr ecpm conver --> 10个
-
-            用户
-              用户历史 点击/转化 的title tag;3d 7d 14d; cid的title; 数量/最高分/平均分 --> 18个
-              用户历史 14d 看过/点过/转化次数/income; ctr cvr ctcvr ecpm;  --> 8个
-
-              用户到cid的ui特征 --> 10个
-                1/用户最近看过这个cid的时间间隔
-                1/用户最近点过这个cid的时间间隔
-                1/用户最近转过这个cid的时间间隔
-                用户看过这个cid多少次
-                用户点过这个cid多少次
-                用户转过这个cid多少次
-                用户对这个cid花了多少钱
-                用户对这个cid的ctr ctcvr cvr
-
-            视频
-              title与cid的 sim-score-1/-2 无数据
-              vid//cid下的 3h 6h 12h 1d 3d 7d 、 ctr ctcvr cvr ecpm conver --> 30个
-              vid//cid下的 1d 3d 7d 14d、 ctr ctcvr ecpm 的rank值 倒数 --> 12个
-
-             */
-
-
-            //4 处理label信息。
-            val labels = new JSONObject
-            for (labelKey <- List("ad_is_click", "ad_is_conversion")) {
-              if (!record.isNull(labelKey)) {
-                labels.put(labelKey, record.getString(labelKey))
-              }
-            }
-            //5 处理log key表头。
-            val apptype = record.getString("apptype")
-            val mid = record.getString("mid")
-            val headvideoid = record.getString("headvideoid")
-            val logKey = (apptype, mid, cid, ts, headvideoid).productIterator.mkString(",")
-            val labelKey = labels.toString()
-            val featureKey = featureMap.toString()
-            //6 拼接数据,保存。
-            logKey + "\t" + labelKey + "\t" + featureKey
-          })
-
-        // 4 保存数据到hdfs
-        val savePartition = dt + hh
-        val hdfsPath = savePath + "/" + savePartition
-        if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-          println("删除路径并开始数据写入:" + hdfsPath)
-          MyHdfsUtils.delete_hdfs_path(hdfsPath)
-          odpsData.coalesce(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-        } else {
-          println("路径不合法,无法写入:" + hdfsPath)
-        }
-      }
-
-    }
-  }
-
-  def func(record: Record, schema: TableSchema): Record = {
-    record
-  }
-
-  def funcC34567ForTags(tags: String, title: String): Tuple4[Double, String, Double, Double] = {
-    // 匹配数量 匹配词 语义最高相似度分 语义平均相似度分
-    val tagsList = tags.split(",")
-    var d1 = 0.0
-    val d2 = new ArrayBuffer[String]()
-    var d3 = 0.0
-    var d4 = 0.0
-    for (tag <- tagsList) {
-      if (title.contains(tag)) {
-        d1 = d1 + 1.0
-        d2.add(tag)
-      }
-      val score = Similarity.conceptSimilarity(tag, title)
-      d3 = if (score > d3) score else d3
-      d4 = d4 + score
-    }
-    d4 = if (tagsList.nonEmpty) d4 / tagsList.size else d4
-    (d1, d2.mkString(","), d3, d4)
-  }
-}

+ 0 - 105
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_32_bucket_20240718.scala

@@ -1,105 +0,0 @@
-package com.aliyun.odps.spark.zhp.makedata_ad.v20240718
-
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils}
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
-/*
-
- */
-
-object makedata_ad_32_bucket_20240718 {
-  def main(args: Array[String]): Unit = {
-
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/20240620*")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/32_bucket_data/")
-    val fileName = param.getOrElse("fileName", "20240620_100")
-    val sampleRate = param.getOrElse("sampleRate", "1.0").toDouble
-    val bucketNum = param.getOrElse("bucketNum", "100").toInt
-    val featureNameFile = param.getOrElse("featureNameFile", "20240718_ad_feature_name.txt");
-
-
-    val loader = getClass.getClassLoader
-    val resourceUrl = loader.getResource(featureNameFile)
-    val content =
-      if (resourceUrl != null) {
-        val content = Source.fromURL(resourceUrl).getLines().mkString("\n")
-        Source.fromURL(resourceUrl).close()
-        content
-      } else {
-        ""
-      }
-    println(content)
-    val contentList = content.split("\n")
-      .map(r=> r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r=> r.nonEmpty).toList
-
-
-
-    val data = sc.textFile(readPath)
-    println("问题数据数量:" + data.filter(r=>r.split("\t").length != 3).count())
-    val data1 = data.map(r => {
-      val rList = r.split("\t")
-      val jsons = JSON.parseObject(rList(2))
-      val doubles = scala.collection.mutable.Map[String, Double]()
-      jsons.foreach(r =>{
-        doubles.put(r._1, jsons.getDoubleValue(r._1))
-      })
-      doubles
-    }).sample(false, sampleRate ).repartition(20)
-
-    val result = new ArrayBuffer[String]()
-
-    for (i <- contentList.indices){
-      println("特征:" + contentList(i))
-      val data2 = data1.map(r => r.getOrDefault(contentList(i), 0D)).filter(_ > 1E-8).collect().sorted
-      val len = data2.length
-      if (len == 0){
-        result.add(contentList(i) + "\t" + bucketNum.toString + "\t" + "0")
-      }else{
-        val oneBucketNum = (len - 1) / (bucketNum - 1) + 1 // 确保每个桶至少有一个元素
-        val buffers = new ArrayBuffer[Double]()
-
-        var lastBucketValue = data2(0) // 记录上一个桶的切分点
-        for (j <- 0 until len by oneBucketNum) {
-          val d = data2(j)
-          if (j > 0 && d != lastBucketValue) {
-            // 如果当前切分点不同于上一个切分点,则保存当前切分点
-            buffers += d
-          }
-          lastBucketValue = d // 更新上一个桶的切分点
-        }
-
-        // 最后一个桶的结束点应该是数组的最后一个元素
-        if (!buffers.contains(data2.last)) {
-          buffers += data2.last
-        }
-        result.add(contentList(i) + "\t" + bucketNum.toString + "\t" + buffers.mkString(","))
-      }
-    }
-    val data3 = sc.parallelize(result)
-
-
-    // 4 保存数据到hdfs
-    val hdfsPath = savePath + "/" + fileName
-    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-      println("删除路径并开始数据写入:" + hdfsPath)
-      MyHdfsUtils.delete_hdfs_path(hdfsPath)
-      data3.repartition(1).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-    } else {
-      println("路径不合法,无法写入:" + hdfsPath)
-    }
-  }
-}

Rozdielové dáta súboru neboli zobrazené, pretože súbor je príliš veľký
+ 0 - 429
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketDataPrint_20240718.scala


+ 0 - 128
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_20240718.scala

@@ -1,128 +0,0 @@
-package com.aliyun.odps.spark.examples.makedata_ad
-
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
-import examples.extractor.ExtractorUtils
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
-/*
-
- */
-
-object makedata_ad_33_bucketData_20240718 {
-  def main(args: Array[String]): Unit = {
-
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-    val loader = getClass.getClassLoader
-
-    val resourceUrlBucket = loader.getResource("20240718_ad_bucket_688.txt")
-    val buckets =
-      if (resourceUrlBucket != null) {
-        val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
-        Source.fromURL(resourceUrlBucket).close()
-        buckets
-      } else {
-        ""
-      }
-    println(buckets)
-    val bucketsMap = buckets.split("\n")
-      .map(r => r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r => r.nonEmpty)
-      .map(r =>{
-        val rList = r.split("\t")
-        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
-      }).toMap
-    val bucketsMap_br = sc.broadcast(bucketsMap)
-
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
-    val beginStr = param.getOrElse("beginStr", "20240620")
-    val endStr = param.getOrElse("endStr", "20240620")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
-    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
-
-    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
-    for (date <- dateRange) {
-      println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
-        val rList = r.split("\t")
-        val logKey = rList(0)
-        val labelKey = rList(1)
-        val jsons = JSON.parseObject(rList(2))
-        val features = scala.collection.mutable.Map[String, Double]()
-        jsons.foreach(r => {
-          features.put(r._1, jsons.getDoubleValue(r._1))
-        })
-        (logKey, labelKey, features)
-      })
-        .filter{
-          case (logKey, labelKey, features) =>
-            val logKeyList = logKey.split(",")
-            val apptype = logKeyList(0)
-            !Set("12", "13").contains(apptype)
-        }
-        .map{
-          case (logKey, labelKey, features) =>
-            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-            (label, features)
-        }
-        .mapPartitions(row => {
-          val result = new ArrayBuffer[String]()
-          val bucketsMap = bucketsMap_br.value
-          row.foreach{
-            case (label, features) =>
-              val featuresBucket = features.map{
-                case (name, score) =>
-                  var ifFilter = false
-                  if (filterNames.nonEmpty){
-                    filterNames.foreach(r=> if (!ifFilter && name.contains(r)) {ifFilter = true} )
-                  }
-                  if (ifFilter){
-                    ""
-                  }else{
-                    if (score > 1E-8) {
-                      if (bucketsMap.contains(name)) {
-                        val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                        name + ":" + scoreNew.toString
-                      } else {
-                        name + ":" + score.toString
-                      }
-                    } else {
-                      ""
-                    }
-                  }
-              }.filter(_.nonEmpty)
-              result.add(label + "\t" + featuresBucket.mkString("\t"))
-          }
-          result.iterator
-      })
-
-      // 4 保存数据到hdfs
-      val hdfsPath = savePath + "/" + date
-      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-        println("删除路径并开始数据写入:" + hdfsPath)
-        MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-      } else {
-        println("路径不合法,无法写入:" + hdfsPath)
-      }
-    }
-
-
-
-  }
-}

+ 0 - 158
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_20240726.scala

@@ -1,158 +0,0 @@
-package com.aliyun.odps.spark.examples.makedata_ad
-
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
-import examples.extractor.ExtractorUtils
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
-
-/*
-
- */
-
-object makedata_ad_33_bucketData_20240726 {
-  def main(args: Array[String]): Unit = {
-
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
-    val beginStr = param.getOrElse("beginStr", "20240620")
-    val endStr = param.getOrElse("endStr", "20240620")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
-    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
-    val featureNameFile = param.getOrElse("featureNameFile", "20240718_ad_feature_name_517.txt");
-
-
-    val loader = getClass.getClassLoader
-
-    val resourceUrlBucket = loader.getResource("20240718_ad_bucket_517.txt")
-    val buckets =
-      if (resourceUrlBucket != null) {
-        val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
-        Source.fromURL(resourceUrlBucket).close()
-        buckets
-      } else {
-        ""
-      }
-    println(buckets)
-    val bucketsMap = buckets.split("\n")
-      .map(r => r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r => r.nonEmpty)
-      .map(r => {
-        val rList = r.split("\t")
-        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
-      }).toMap
-    val bucketsMap_br = sc.broadcast(bucketsMap)
-
-    val resourceUrl = loader.getResource(featureNameFile)
-    val content =
-      if (resourceUrl != null) {
-        val content = Source.fromURL(resourceUrl).getLines().mkString("\n")
-        Source.fromURL(resourceUrl).close()
-        content
-      } else {
-        ""
-      }
-
-    println()
-    println()
-    println()
-    println(content)
-    val contentList = content.split("\n")
-      .map(r => r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r => r.nonEmpty).toList
-
-    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
-    for (date <- dateRange) {
-      println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r => {
-          val rList = r.split("\t")
-          val logKey = rList(0)
-          val labelKey = rList(1)
-          val jsons = JSON.parseObject(rList(2))
-          val features = scala.collection.mutable.Map[String, Double]()
-          jsons.foreach(r => {
-            features.put(r._1, jsons.getDoubleValue(r._1))
-          })
-
-          for (name <- contentList) {
-            if (!features.contains(name)) {
-              features.put(name, 0)
-            }
-          }
-
-          (logKey, labelKey, features)
-        })
-        .filter {
-          case (logKey, labelKey, features) =>
-            val logKeyList = logKey.split(",")
-            val apptype = logKeyList(0)
-            !Set("12", "13").contains(apptype)
-        }
-        .map {
-          case (logKey, labelKey, features) =>
-            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-
-            (label, features)
-        }
-        .mapPartitions(row => {
-          val result = new ArrayBuffer[String]()
-          val bucketsMap = bucketsMap_br.value
-          row.foreach {
-            case (label, features) =>
-              val featuresBucket = features.map {
-                case (name, score) =>
-                  var ifFilter = false
-                  if (filterNames.nonEmpty) {
-                    filterNames.foreach(r => if (!ifFilter && name.contains(r)) {
-                      ifFilter = true
-                    })
-                  }
-                  if (ifFilter) {
-                    ""
-                  } else {
-                    if (score > 1E-8) {
-                      if (bucketsMap.contains(name)) {
-                        val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 0.01 + (1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0))
-                        name + ":" + scoreNew.toString
-                      } else {
-                        name + ":" + score.toString
-                      }
-                    } else {
-                      name + ":" + "0.01"
-                    }
-                  }
-              }.filter(_.nonEmpty)
-              result.add(label + "\t" + featuresBucket.mkString("\t"))
-          }
-          result.iterator
-        })
-
-      // 4 保存数据到hdfs
-      val hdfsPath = savePath + "/" + date
-      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-        println("删除路径并开始数据写入:" + hdfsPath)
-        MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-      } else {
-        println("路径不合法,无法写入:" + hdfsPath)
-      }
-    }
-
-
-  }
-}

+ 0 - 152
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_20240729.scala

@@ -1,152 +0,0 @@
-package com.aliyun.odps.spark.examples.makedata_ad
-
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
-import examples.extractor.ExtractorUtils
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
-import scala.util.Random
-
-/*
-
- */
-
-object makedata_ad_33_bucketData_20240729 {
-  def main(args: Array[String]): Unit = {
-
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
-    val beginStr = param.getOrElse("beginStr", "20240620")
-    val endStr = param.getOrElse("endStr", "20240620")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
-    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
-
-    val loader = getClass.getClassLoader
-
-    val resourceUrlBucket = loader.getResource("20240718_ad_bucket_688.txt")
-    val buckets =
-      if (resourceUrlBucket != null) {
-        val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
-        Source.fromURL(resourceUrlBucket).close()
-        buckets
-      } else {
-        ""
-      }
-    println(buckets)
-    val bucketsMap = buckets.split("\n")
-      .map(r => r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r => r.nonEmpty)
-      .map(r => {
-        val rList = r.split("\t")
-        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
-      }).toMap
-    val bucketsMap_br = sc.broadcast(bucketsMap)
-
-    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
-    val cidCountMap = scala.collection.mutable.Map[String, Int]()
-    for (date <- dateRange) {
-      println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r => {
-          val rList = r.split("\t")
-          val logKey = rList(0)
-          val labelKey = rList(1)
-          val jsons = JSON.parseObject(rList(2))
-          val features = scala.collection.mutable.Map[String, Double]()
-          jsons.foreach(r => {
-            features.put(r._1, jsons.getDoubleValue(r._1))
-          })
-          (logKey, labelKey, features)
-        })
-        .filter {
-          case (logKey, labelKey, features) =>
-            val logKeyList = logKey.split(",")
-            val apptype = logKeyList(0)
-            !Set("12", "13").contains(apptype)
-        }.filter {
-          case (logKey, labelKey, features) =>
-            var key = ""
-            for (elem <- features) {
-              if (elem._1.contains("cid_")) {
-                key = elem._1
-              }
-            }
-
-            if (key.equals("cid_3319")) {
-              true
-            } else if (key.equals("cid_3024")) {
-              // 创建一个Random实例
-              val rand = new Random()
-
-              // 生成一个0到1之间的随机浮点数
-              val randomDouble = rand.nextDouble()
-
-              randomDouble < 0.01
-            } else {
-              false
-            }
-        }.map {
-          case (logKey, labelKey, features) =>
-            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-            (label, features)
-        }
-        .mapPartitions(row => {
-          val result = new ArrayBuffer[String]()
-          val bucketsMap = bucketsMap_br.value
-          row.foreach {
-            case (label, features) =>
-              val featuresBucket = features.map {
-                case (name, score) =>
-                  var ifFilter = false
-                  if (filterNames.nonEmpty) {
-                    filterNames.foreach(r => if (!ifFilter && name.contains(r)) {
-                      ifFilter = true
-                    })
-                  }
-                  if (ifFilter) {
-                    ""
-                  } else {
-                    if (score > 1E-8) {
-                      if (bucketsMap.contains(name)) {
-                        val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                        name + ":" + scoreNew.toString
-                      } else {
-                        name + ":" + score.toString
-                      }
-                    } else {
-                      ""
-                    }
-                  }
-              }.filter(_.nonEmpty)
-              result.add(label + "\t" + featuresBucket.mkString("\t"))
-          }
-          result.iterator
-        })
-
-      // 4 保存数据到hdfs
-      val hdfsPath = savePath + "/" + date
-      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-        println("删除路径并开始数据写入:" + hdfsPath)
-        MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-      } else {
-        println("路径不合法,无法写入:" + hdfsPath)
-      }
-    }
-
-
-  }
-}

+ 0 - 181
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_20240729_copy_zheng.scala

@@ -1,181 +0,0 @@
-package com.aliyun.odps.spark.examples.makedata_ad
-
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
-import examples.extractor.ExtractorUtils
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
-import scala.util.Random
-
-/*
-
- */
-
-object makedata_ad_33_bucketData_20240729_copy_zheng {
-  def main(args: Array[String]): Unit = {
-
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
-    val beginStr = param.getOrElse("beginStr", "20240620")
-    val endStr = param.getOrElse("endStr", "20240620")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
-    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
-
-    val loader = getClass.getClassLoader
-
-    val resourceUrlBucket = loader.getResource("20240718_ad_bucket_688.txt")
-    val buckets =
-      if (resourceUrlBucket != null) {
-        val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
-        Source.fromURL(resourceUrlBucket).close()
-        buckets
-      } else {
-        ""
-      }
-    println(buckets)
-    val bucketsMap = buckets.split("\n")
-      .map(r => r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r => r.nonEmpty)
-      .map(r => {
-        val rList = r.split("\t")
-        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
-      }).toMap
-    val bucketsMap_br = sc.broadcast(bucketsMap)
-
-    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
-    val cidCountMap = scala.collection.mutable.Map[String, Int]()
-    for (date <- dateRange) {
-      println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r => {
-          val rList = r.split("\t")
-          val logKey = rList(0)
-          val labelKey = rList(1)
-          val jsons = JSON.parseObject(rList(2))
-          val features = scala.collection.mutable.Map[String, Double]()
-          jsons.foreach(r => {
-            features.put(r._1, jsons.getDoubleValue(r._1))
-          })
-          (logKey, labelKey, features)
-        })
-        .filter {
-          case (logKey, labelKey, features) =>
-            val logKeyList = logKey.split(",")
-            val apptype = logKeyList(0)
-            !Set("12", "13").contains(apptype)
-        }.filter {
-          case (logKey, labelKey, features) =>
-            var key = ""
-            for (elem <- features) {
-              if (elem._1.contains("cid_")) {
-                key = elem._1
-              }
-            }
-
-            if (key.equals("cid_3319")) {
-              true
-            } else if (key.equals("cid_3024")) {
-              // 创建一个Random实例
-              val rand = new Random()
-
-              // 生成一个0到1之间的随机浮点数
-              val randomDouble = rand.nextDouble()
-
-              randomDouble < 0.01
-            } else {
-              false
-            }
-        }.flatMap {
-          case (logKey, labelKey, features) =>
-            var key = ""
-            for (elem <- features) {
-              if (elem._1.contains("cid_")) {
-                key = elem._1
-              }
-            }
-            if (key.equals("cid_3319")) {
-              val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-              if (!label.equals("0")) {
-                Seq(
-                  (logKey, labelKey, features),
-                  (logKey, labelKey, features),
-                  (logKey, labelKey, features),
-                  (logKey, labelKey, features),
-                  (logKey, labelKey, features),
-                  (logKey, labelKey, features),
-                  (logKey, labelKey, features),
-                  (logKey, labelKey, features),
-                  (logKey, labelKey, features),
-                  (logKey, labelKey, features)
-                )
-              } else {
-                Seq((logKey, labelKey, features))
-              }
-            } else {
-              Seq((logKey, labelKey, features))
-            }
-        }.map {
-          case (logKey, labelKey, features) =>
-            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-            (label, features)
-        }
-        .mapPartitions(row => {
-          val result = new ArrayBuffer[String]()
-          val bucketsMap = bucketsMap_br.value
-          row.foreach {
-            case (label, features) =>
-              val featuresBucket = features.map {
-                case (name, score) =>
-                  var ifFilter = false
-                  if (filterNames.nonEmpty) {
-                    filterNames.foreach(r => if (!ifFilter && name.contains(r)) {
-                      ifFilter = true
-                    })
-                  }
-                  if (ifFilter) {
-                    ""
-                  } else {
-                    if (score > 1E-8) {
-                      if (bucketsMap.contains(name)) {
-                        val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                        name + ":" + scoreNew.toString
-                      } else {
-                        name + ":" + score.toString
-                      }
-                    } else {
-                      ""
-                    }
-                  }
-              }.filter(_.nonEmpty)
-              result.add(label + "\t" + featuresBucket.mkString("\t"))
-          }
-          result.iterator
-        })
-
-      // 4 保存数据到hdfs
-      val hdfsPath = savePath + "/" + date
-      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-        println("删除路径并开始数据写入:" + hdfsPath)
-        MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-      } else {
-        println("路径不合法,无法写入:" + hdfsPath)
-      }
-    }
-
-
-  }
-}

+ 0 - 130
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_20240729_reduce_feature.scala

@@ -1,130 +0,0 @@
-package com.aliyun.odps.spark.examples.makedata_ad
-
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
-import examples.extractor.ExtractorUtils
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
-import scala.util.Random
-
-/*
-
- */
-
-object makedata_ad_33_bucketData_20240729_reduce_feature {
-  def main(args: Array[String]): Unit = {
-
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
-    val beginStr = param.getOrElse("beginStr", "20240620")
-    val endStr = param.getOrElse("endStr", "20240620")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val retainNames = param.getOrElse("retainNames", "").split(",").toSet
-    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
-
-    val loader = getClass.getClassLoader
-
-    val resourceUrlBucket = loader.getResource("20240718_ad_bucket_688.txt")
-    val buckets =
-      if (resourceUrlBucket != null) {
-        val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
-        Source.fromURL(resourceUrlBucket).close()
-        buckets
-      } else {
-        ""
-      }
-    println(buckets)
-    val bucketsMap = buckets.split("\n")
-      .map(r => r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r => r.nonEmpty)
-      .map(r => {
-        val rList = r.split("\t")
-        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
-      }).toMap
-    val bucketsMap_br = sc.broadcast(bucketsMap)
-
-    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
-    val cidCountMap = scala.collection.mutable.Map[String, Int]()
-    for (date <- dateRange) {
-      println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r => {
-          val rList = r.split("\t")
-          val logKey = rList(0)
-          val labelKey = rList(1)
-          val jsons = JSON.parseObject(rList(2))
-          val features = scala.collection.mutable.Map[String, Double]()
-          jsons.foreach(r => {
-            features.put(r._1, jsons.getDoubleValue(r._1))
-          })
-          (logKey, labelKey, features)
-        })
-        .filter {
-          case (logKey, labelKey, features) =>
-            val logKeyList = logKey.split(",")
-            val apptype = logKeyList(0)
-            !Set("12", "13").contains(apptype)
-        }.map {
-          case (logKey, labelKey, features) =>
-            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-            (label, features)
-        }
-        .mapPartitions(row => {
-          val result = new ArrayBuffer[String]()
-          val bucketsMap = bucketsMap_br.value
-          row.foreach {
-            case (label, features) =>
-              val featuresBucket = features.map {
-                case (name, score) =>
-                  var isRetain = false
-                  if (retainNames.nonEmpty) {
-                    retainNames.foreach(r => if (!isRetain && name.contains(r)) {
-                      isRetain = true
-                    })
-                  }
-                  if (isRetain) {
-                    if (score > 1E-8) {
-                      if (bucketsMap.contains(name)) {
-                        val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                        name + ":" + scoreNew.toString
-                      } else {
-                        name + ":" + score.toString
-                      }
-                    } else {
-                      ""
-                    }
-                  } else {
-                    ""
-                  }
-              }.filter(_.nonEmpty)
-              result.add(label + "\t" + featuresBucket.mkString("\t"))
-          }
-          result.iterator
-        })
-
-      // 4 保存数据到hdfs
-      val hdfsPath = savePath + "/" + date
-      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-        println("删除路径并开始数据写入:" + hdfsPath)
-        MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-      } else {
-        println("路径不合法,无法写入:" + hdfsPath)
-      }
-    }
-
-
-  }
-}

+ 0 - 140
src/main/scala/com/aliyun/odps/spark/zhp/makedata_ad/v20240718/makedata_ad_33_bucketData_default_value_20240718.scala

@@ -1,140 +0,0 @@
-package com.aliyun.odps.spark.zhp.makedata_ad.v20240718
-
-import com.alibaba.fastjson.JSON
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
-import examples.extractor.ExtractorUtils
-import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
-/*
-
- */
-
-object makedata_ad_33_bucketData_default_value_20240718 {
-  def main(args: Array[String]): Unit = {
-
-    val spark = SparkSession
-      .builder()
-      .appName(this.getClass.getName)
-      .getOrCreate()
-    val sc = spark.sparkContext
-
-    val loader = getClass.getClassLoader
-
-    val resourceUrlBucket = loader.getResource("20240718_ad_bucket_688.txt")
-    val buckets =
-      if (resourceUrlBucket != null) {
-        val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
-        Source.fromURL(resourceUrlBucket).close()
-        buckets
-      } else {
-        ""
-      }
-    println(buckets)
-    val bucketsMap = buckets.split("\n")
-      .map(r => r.replace(" ", "").replaceAll("\n", ""))
-      .filter(r => r.nonEmpty)
-      .map(r =>{
-        val rList = r.split("\t")
-        (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
-      }).toMap
-    val bucketsMap_br = sc.broadcast(bucketsMap)
-
-
-    // 1 读取参数
-    val param = ParamUtils.parseArgs(args)
-    val readPath = param.getOrElse("readPath", "/dw/recommend/model/31_ad_sample_data/")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/33_ad_train_data/")
-    val beginStr = param.getOrElse("beginStr", "20240620")
-    val endStr = param.getOrElse("endStr", "20240620")
-    val repartition = param.getOrElse("repartition", "100").toInt
-    val filterNames = param.getOrElse("filterNames", "").split(",").toSet
-    val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
-    val modifyFeatureName= param.getOrElse("modifyName", "").split(",").toSet
-    val defaultValue= param.getOrElse("defaultValue", "0.01")
-
-    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
-    for (date <- dateRange) {
-      println("开始执行:" + date)
-      val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
-        val rList = r.split("\t")
-        val logKey = rList(0)
-        val labelKey = rList(1)
-        val jsons = JSON.parseObject(rList(2))
-        val features = scala.collection.mutable.Map[String, Double]()
-        jsons.foreach(r => {
-          features.put(r._1, jsons.getDoubleValue(r._1))
-        })
-        (logKey, labelKey, features)
-      })
-        .filter{
-          case (logKey, labelKey, features) =>
-            val logKeyList = logKey.split(",")
-            val apptype = logKeyList(0)
-            !Set("12", "13").contains(apptype)
-        }
-        .map{
-          case (logKey, labelKey, features) =>
-            val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
-            (label, features)
-        }
-        .mapPartitions(row => {
-          val result = new ArrayBuffer[String]()
-          val bucketsMap = bucketsMap_br.value
-          row.foreach{
-            case (label, features) =>
-              val featuresBucket = features.map{
-                case (name, score) =>
-                  var ifFilter = false
-                  if (filterNames.nonEmpty){
-                    filterNames.foreach(r=> if (!ifFilter && name.contains(r)) {ifFilter = true} )
-                  }
-                  if (ifFilter){
-                    ""
-                  }else{
-                    if (score > 1E-8) {
-                      if (bucketsMap.contains(name)) {
-                        val (bucketsNum, buckets) = bucketsMap(name)
-                        val scoreNew = 1.0 / bucketsNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
-                        name + ":" + scoreNew.toString
-                      } else {
-                        var isModify = false
-                        if (modifyFeatureName.nonEmpty) {
-                          modifyFeatureName.foreach(r => if (!isModify && name.startsWith(r)) {
-                            isModify = true
-                          })
-                        }
-                        if (isModify) {
-                          name + ":" + defaultValue
-                        } else {
-                          name + ":" + score.toString
-                        }
-                      }
-                    } else {
-                      ""
-                    }
-                  }
-              }.filter(_.nonEmpty)
-              result.add(label + "\t" + featuresBucket.mkString("\t"))
-          }
-          result.iterator
-      })
-
-      // 4 保存数据到hdfs
-      val hdfsPath = savePath + "/" + date
-      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
-        println("删除路径并开始数据写入:" + hdfsPath)
-        MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
-      } else {
-        println("路径不合法,无法写入:" + hdfsPath)
-      }
-    }
-
-
-
-  }
-}

+ 0 - 61
src/main/scala/com/aliyun/odps/spark/zhp/临时记录的脚本-广告

@@ -1,61 +0,0 @@
-
-
-// 模型特征生产
-nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.zhp.makedata_ad.makedata_ad_31_originData_20240620 \
---master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 16 \
-./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-tablePart:64 repartition:16 \
-beginStr:2024071200 endStr:2024071223 \
-savePath:/dw/recommend/model/31_ad_sample_data_v4 \
-table:alg_recsys_ad_sample_all filterHours:00,01,02,03,04,05,06,07 \
-idDefaultValue:0.01
-> logs/p31_2024062008.log 2>&1 &
-
-
-
-// 特征分桶--生成分桶文件
-nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_ad.makedata_ad_32_bucket_20240622 \
---master yarn --driver-memory 16G --executor-memory 1G --executor-cores 1 --num-executors 16 \
---conf spark.driver.maxResultSize=16G \
-./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-fileName:20240620_100_fix \
-savePath:/dw/recommend/model/32_bucket_data/ \
-> p32_data.log 2>&1 &
-
-
-// 特征分桶--过滤固定前缀的特征
-nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_ad.makedata_ad_33_bucketData_20240622 \
---master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-beginStr:20240623 endStr:20240623 repartition:400 \
-filterNames:XXXXXXX \
-> p33_data.log 2>&1 &
-
-filterNames:cid_,adid_,adverid_,targeting_conversion_ \
-savePath:/dw/recommend/model/33_ad_train_data_nosparse/ \
-
-
-/dw/recommend/model/31_ad_sample_data/
-/dw/recommend/model/33_ad_train_data/
-
-/dw/recommend/model/31_ad_sample_data_fix/
-
-
-
-
-
-nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
---class com.aliyun.odps.spark.examples.makedata_ad.makedata_ad_33_bucketDataPrint_20240628 \
---master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
-./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-beginStr:2024062717 endStr:2024062723 \
-readDate:20240627 \
-table:alg_recsys_ad_sample_all_new \
-savePath:/dw/recommend/model/33_for_check/ \
-> p33_data_check.log 2>&1 &
-
-
-/dw/recommend/model/33_for_check_v1/

Niektoré súbory nie sú zobrazené, pretože je v týchto rozdielových dátach zmenené mnoho súborov