浏览代码

Merge branch 'dev-xym-add-features' of algorithm/recommend-emr-dataprocess into feature/20250104-zt-update

xueyiming 1 周之前
父节点
当前提交
c9c9275de3

+ 44 - 7
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketDataFromOriginToHive_20250228.scala

@@ -8,6 +8,9 @@ import examples.extractor.{ExtractorUtils, RankExtractorFeature_20240530}
 import examples.utils.{AdUtil, DateTimeUtil}
 import org.apache.spark.sql.SparkSession
 import org.xm.Similarity
+import java.time.Instant
+import java.time.ZoneId
+import java.time.ZonedDateTime
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
@@ -68,7 +71,9 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
       "region", "city", "brand",
       "vid", "cate1", "cate2",
       "user_cid_click_list", "user_cid_conver_list",
-      "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d", "user_vid_return_tags_7d", "user_vid_return_tags_14d")
+      "user_vid_return_tags_2h", "user_vid_return_tags_1d", "user_vid_return_tags_3d", "user_vid_return_tags_7d",
+      "user_vid_return_tags_14d", "apptype", "ts", "mid", "pqtid", "hour", "hour_quarter", "root_source_scene",
+      "root_source_channel", "is_first_layer", "title_split", "profession")
 
 
     // 2 读取odps+表信息
@@ -110,14 +115,31 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
               label > 0 || Random.nextDouble() < negSampleRate
             })
             .map(record => {
+              val featureMap = new JSONObject()
               val ts = record.getString("ts").toInt
+              val instant = Instant.ofEpochSecond(ts)
+              // 设置时区为中国时区
+              val chinaZone = ZoneId.of("Asia/Shanghai")
+              // 将 Instant 对象转换为中国时区的 ZonedDateTime 对象
+              val zonedDateTime = ZonedDateTime.ofInstant(instant, chinaZone)
+              // 获取小时
+              val tsHour = zonedDateTime.getHour()
+              // 当前小时-刻钟(15分钟一个间隔,0~95)
+              val tsHourQuarter = zonedDateTime.getMinute() / 15 + zonedDateTime.getHour() * 4
               val cid = record.getString("cid")
+              val mid = record.getString("mid")
+              val pqtid = record.getString("pqtid")
               val apptype = record.getString("apptype")
-              val extend: JSONObject = if (record.isNull("extend")) new JSONObject() else
-                JSON.parseObject(record.getString("extend"))
 
-              val featureMap = new JSONObject()
+              featureMap.put("apptype", apptype)
+              featureMap.put("ts", ts)
+              featureMap.put("mid", mid)
+              featureMap.put("pqtid", pqtid)
+              featureMap.put("hour", tsHour)
+              featureMap.put("hour_quarter", tsHourQuarter)
 
+              val extend: JSONObject = if (record.isNull("extend")) new JSONObject() else
+                JSON.parseObject(record.getString("extend"))
               val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else
                 JSON.parseObject(record.getString("b1_feature"))
               val b2: JSONObject = if (record.isNull("b2_feature")) new JSONObject() else
@@ -157,6 +179,9 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
               if (b1.containsKey("adverid") && b1.getString("adverid").nonEmpty) {
                 featureMap.put("adverid", b1.getBigInteger("adverid"))
               }
+              if (b1.containsKey("profession") && b1.getString("profession").nonEmpty) {
+                featureMap.put("profession", b1.getString("profession"))
+              }
 
               val hour = DateTimeUtil.getHourByTimestamp(ts)
               featureMap.put("hour_" + hour, idDefaultValue)
@@ -178,6 +203,18 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
                 featureMap.put("city", extend.getString("city"))
               }
 
+              if (extend.containsKey("is_first_layer") && extend.getString("is_first_layer").nonEmpty) {
+                featureMap.put("is_first_layer", extend.getString("is_first_layer"))
+              }
+
+              if (extend.containsKey("root_source_scene") && extend.getString("root_source_scene").nonEmpty) {
+                featureMap.put("root_source_scene", extend.getString("root_source_scene"))
+              }
+
+              if (extend.containsKey("root_source_channel") && extend.getString("root_source_channel").nonEmpty) {
+                featureMap.put("root_source_channel", extend.getString("root_source_channel"))
+              }
+
 
               if (b1.containsKey("cpa")) {
                 featureMap.put("cpa", b1.getString("cpa").toDouble)
@@ -443,7 +480,8 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
                 val score = Similarity.conceptSimilarity(title, vTitle)
                 featureMap.put("ctitle_vtitle_similarity", score);
                 featureMap.put("cate1", d3.getOrDefault("merge_first_level_cate", ""))
-                featureMap.put("cate2", d3.getOrDefault("merge_second_level_cate", ""));
+                featureMap.put("cate2", d3.getOrDefault("merge_second_level_cate", ""))
+                featureMap.put("title_split", d3.getOrDefault("title_split", ""))
               }
               val machineinfo: JSONObject = if (record.isNull("machineinfo")) new JSONObject() else
                 JSON.parseObject(record.getString("machineinfo"))
@@ -498,7 +536,6 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
                 }
               }
               //5 处理log key表头。
-              val mid = record.getString("mid")
               val headvideoid = record.getString("headvideoid")
               val logKey = (apptype, mid, cid, ts, headvideoid).productIterator.mkString(",")
               val labelKey = labels.toString()
@@ -506,7 +543,7 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
             })
           odpsData
         }).reduce(_ union _)
-        .map{ case (logKey, labelKey, jsons) =>
+        .map { case (logKey, labelKey, jsons) =>
           val denseFeatures = scala.collection.mutable.Map[String, Double]()
           val sparseFeatures = scala.collection.mutable.Map[String, String]()
           denseFeatureNames.foreach(r => {