Ver código fonte

修改生成训练数据

xueyiming 3 semanas atrás
pai
commit
6c146fa657

+ 42 - 9
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketDataFromOriginToHive_20250228.scala

@@ -8,6 +8,9 @@ import examples.extractor.{ExtractorUtils, RankExtractorFeature_20240530}
 import examples.utils.{AdUtil, DateTimeUtil}
 import org.apache.spark.sql.SparkSession
 import org.xm.Similarity
+import java.time.Instant
+import java.time.ZoneId
+import java.time.ZonedDateTime
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
@@ -31,8 +34,8 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
     // 1 读取参数
     val param = ParamUtils.parseArgs(args)
     val tablePart = param.getOrElse("tablePart", "64").toInt
-    val beginStr = param.getOrElse("beginStr", "20250216")
-    val endStr = param.getOrElse("endStr", "20250216")
+    val beginStr = param.getOrElse("beginStr", "20250320")
+    val endStr = param.getOrElse("endStr", "20250320")
     val project = param.getOrElse("project", "loghubods")
     val inputTable = param.getOrElse("inputTable", "alg_recsys_ad_sample_all")
     val outputTable = param.getOrElse("outputTable", "ad_easyrec_train_data_v1_sampled")
@@ -68,7 +71,9 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
       "region", "city", "brand",
       "vid", "cate1", "cate2",
       "user_cid_click_list", "user_cid_conver_list",
-      "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d", "user_vid_return_tags_7d", "user_vid_return_tags_14d")
+      "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d", "user_vid_return_tags_7d",
+      "user_vid_return_tags_14d", "apptype", "ts", "mid", "pqtid", "hour", "hour_quarter", "root_source_scene",
+      "root_source_channel", "is_first_layer")
 
 
     // 2 读取odps+表信息
@@ -111,13 +116,26 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
             })
             .map(record => {
               val ts = record.getString("ts").toInt
+              val zonedDateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())
+              val featureMap = new JSONObject()
+              // 获取小时
+              val tsHour = zonedDateTime.getHour()
+              // 当前小时-刻钟(15分钟一个间隔,0~95)
+              val tsHourQuarter = zonedDateTime.getMinute() / 15 + zonedDateTime.getHour() * 4
               val cid = record.getString("cid")
+              val mid = record.getString("mid")
+              val pqtid = record.getString("pqtid")
               val apptype = record.getString("apptype")
-              val extend: JSONObject = if (record.isNull("extend")) new JSONObject() else
-                JSON.parseObject(record.getString("extend"))
 
-              val featureMap = new JSONObject()
+              featureMap.put("apptype", apptype)
+              featureMap.put("ts", ts)
+              featureMap.put("mid", mid)
+              featureMap.put("pqtid", pqtid)
+              featureMap.put("hour", tsHour)
+              featureMap.put("hour_quarter", tsHourQuarter)
 
+              val extend: JSONObject = if (record.isNull("extend")) new JSONObject() else
+                JSON.parseObject(record.getString("extend"))
               val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else
                 JSON.parseObject(record.getString("b1_feature"))
               val b2: JSONObject = if (record.isNull("b2_feature")) new JSONObject() else
@@ -157,6 +175,9 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
               if (b1.containsKey("adverid") && b1.getString("adverid").nonEmpty) {
                 featureMap.put("adverid", b1.getBigInteger("adverid"))
               }
+              if (b1.containsKey("profession") && b1.getString("profession").nonEmpty) {
+                featureMap.put("profession", b1.getString("profession"))
+              }
 
               val hour = DateTimeUtil.getHourByTimestamp(ts)
               featureMap.put("hour_" + hour, idDefaultValue)
@@ -178,6 +199,18 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
                 featureMap.put("city", extend.getString("city"))
               }
 
+              if (extend.containsKey("is_first_layer") && extend.getString("is_first_layer").nonEmpty) {
+                featureMap.put("is_first_layer", extend.getString("is_first_layer"))
+              }
+
+              if (extend.containsKey("root_source_scene") && extend.getString("root_source_scene").nonEmpty) {
+                featureMap.put("root_source_scene", extend.getString("root_source_scene"))
+              }
+
+              if (extend.containsKey("root_source_channel") && extend.getString("root_source_channel").nonEmpty) {
+                featureMap.put("root_source_channel", extend.getString("root_source_channel"))
+              }
+
 
               if (b1.containsKey("cpa")) {
                 featureMap.put("cpa", b1.getString("cpa").toDouble)
@@ -443,7 +476,8 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
                 val score = Similarity.conceptSimilarity(title, vTitle)
                 featureMap.put("ctitle_vtitle_similarity", score);
                 featureMap.put("cate1", d3.getOrDefault("merge_first_level_cate", ""))
-                featureMap.put("cate2", d3.getOrDefault("merge_second_level_cate", ""));
+                featureMap.put("cate2", d3.getOrDefault("merge_second_level_cate", ""))
+                featureMap.put("title_split", d3.getOrDefault("title_split", ""))
               }
               val machineinfo: JSONObject = if (record.isNull("machineinfo")) new JSONObject() else
                 JSON.parseObject(record.getString("machineinfo"))
@@ -498,7 +532,6 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
                 }
               }
               //5 处理log key表头。
-              val mid = record.getString("mid")
               val headvideoid = record.getString("headvideoid")
               val logKey = (apptype, mid, cid, ts, headvideoid).productIterator.mkString(",")
               val labelKey = labels.toString()
@@ -506,7 +539,7 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
             })
           odpsData
         }).reduce(_ union _)
-        .map{ case (logKey, labelKey, jsons) =>
+        .map { case (logKey, labelKey, jsons) =>
           val denseFeatures = scala.collection.mutable.Map[String, Double]()
           val sparseFeatures = scala.collection.mutable.Map[String, String]()
           denseFeatureNames.foreach(r => {