Explorar el Código

实时化特征

xueyiming hace 1 semana
padre
commit
f008570a28

+ 23 - 14
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketDataFromOriginToHive_20250522.scala

@@ -34,7 +34,7 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
     val beginStr = param.getOrElse("beginStr", "20250216")
     val endStr = param.getOrElse("endStr", "20250216")
     val project = param.getOrElse("project", "loghubods")
-    val inputTable = param.getOrElse("inputTable", "ad_engine_statistics_log_per5min_new")
+    val inputTable = param.getOrElse("inputTable", "alg_recsys_ad_sample_all")
     val outputTable = param.getOrElse("outputTable", "ad_easyrec_train_data_v1_sampled")
     val outputTable2 = param.getOrElse("outputTable2", "")
     val filterHours = param.getOrElse("filterHours", "00,01,02,03,04,05,06,07").split(",").toSet
@@ -83,8 +83,19 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
     // 3 循环执行数据生产
     val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
     for (dt <- dateRange) {
-      val timeRange = MyDateUtils.getDateMinuteRange(dt + "0800", dt + "2355")
-      val recordRdd = timeRange.map(dt => s"dt=$dt").map(partition => {
+      val timeRange = MyDateUtils.getDateHourRange(dt + "08", dt + "23")
+      val recordRdd = timeRange.map { dt_hh =>
+          val dt = dt_hh.substring(0, 8)
+          val hh = dt_hh.substring(8, 10)
+          val partition = s"dt=$dt,hh=$hh"
+          if (filterHours.nonEmpty && filterHours.contains(hh)) {
+            None
+          } else {
+            Some(partition)
+          }
+        }.collect {
+          case Some(partition) => partition
+        }.map(partition => {
           val odpsData = odpsOps.readTable(project = project,
               table = inputTable,
               partition = partition,
@@ -108,15 +119,6 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
             .map(record => {
               val featureMap = new JSONObject()
               val ts = record.getString("ts").toInt
-              val instant = Instant.ofEpochSecond(ts)
-              // 设置时区为中国时区
-              val chinaZone = ZoneId.of("Asia/Shanghai")
-              // 将 Instant 对象转换为中国时区的 ZonedDateTime 对象
-              val zonedDateTime = ZonedDateTime.ofInstant(instant, chinaZone)
-              // 获取小时
-              val tsHour = zonedDateTime.getHour()
-              // 当前小时-刻钟(15分钟一个间隔,0~95)
-              val tsHourQuarter = zonedDateTime.getMinute() / 15 + zonedDateTime.getHour() * 4
               val cid = record.getString("cid")
               val mid = record.getString("mid")
               val pqtid = record.getString("pqtid")
@@ -127,14 +129,14 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
               featureMap.put("ts", ts)
               featureMap.put("mid", mid)
               featureMap.put("pqtid", pqtid)
-              featureMap.put("hour", tsHour)
-              featureMap.put("hour_quarter", tsHourQuarter)
 
               val mateFeature: JSONObject = if (record.isNull("metafeature")) new JSONObject() else
                 JSON.parseObject(record.getString("metafeature"))
 
               val reqFeature: JSONObject = if (!mateFeature.containsKey("reqFeature")) new JSONObject() else
                 mateFeature.getJSONObject("reqFeature")
+              val sceneFeature: JSONObject = if (!mateFeature.containsKey("sceneFeature")) new JSONObject() else
+                mateFeature.getJSONObject("sceneFeature")
               val b1: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_basic_info")) new JSONObject() else
                 mateFeature.getJSONObject("alg_cid_feature_basic_info")
               val b2: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_adver_action")) new JSONObject() else
@@ -189,6 +191,13 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
                 featureMap.put("creative_action_embedding", b1.getString("creative_action_embedding").split('|').map(_.toDouble).map(_.toFloat).mkString("|"))
               }
 
+              if (sceneFeature.containsKey("hour") && sceneFeature.getString("hour").nonEmpty) {
+                featureMap.put("hour", sceneFeature.getString("hour"))
+              }
+              if (sceneFeature.containsKey("hour_quarter") && sceneFeature.getString("hour_quarter").nonEmpty) {
+                featureMap.put("hour_quarter", sceneFeature.getString("hour_quarter"))
+              }
+
               val hour = DateTimeUtil.getHourByTimestamp(ts)
               featureMap.put("hour_" + hour, idDefaultValue)