瀏覽代碼

实时化特征

xueyiming 1 周之前
父節點
當前提交
8e82c7284b

+ 57 - 63
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketDataFromOriginToHive_20250522.scala

@@ -34,7 +34,7 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
     val beginStr = param.getOrElse("beginStr", "20250216")
     val endStr = param.getOrElse("endStr", "20250216")
     val project = param.getOrElse("project", "loghubods")
-    val inputTable = param.getOrElse("inputTable", "alg_recsys_ad_sample_all")
+    val inputTable = param.getOrElse("inputTable", "ad_engine_statistics_log_per5min_new")
     val outputTable = param.getOrElse("outputTable", "ad_easyrec_train_data_v1_sampled")
     val outputTable2 = param.getOrElse("outputTable2", "")
     val filterHours = param.getOrElse("filterHours", "00,01,02,03,04,05,06,07").split(",").toSet
@@ -75,7 +75,7 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
       "user_vid_return_tags_14d", "apptype", "ts", "mid", "pqtid", "hour", "hour_quarter", "root_source_scene",
       "root_source_channel", "is_first_layer", "title_split", "profession", "user_vid_share_tags_1d", "user_vid_share_tags_14d",
       "user_vid_return_cate1_14d", "user_vid_return_cate2_14d", "user_vid_share_cate1_14d", "user_vid_share_cate2_14d",
-      "creative_type", "creative_hook_embedding", "creative_why_embedding", "creative_action_embedding","user_has_conver_1y")
+      "creative_type", "creative_hook_embedding", "creative_why_embedding", "creative_action_embedding", "user_has_conver_1y")
 
 
     // 2 读取odps+表信息
@@ -84,18 +84,7 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
     val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
     for (dt <- dateRange) {
       val timeRange = MyDateUtils.getDateMinuteRange(dt + "0800", dt + "2355")
-      val recordRdd = timeRange.map { dt_hh =>
-          val dt = dt_hh.substring(0, 8)
-          val hh = dt_hh.substring(8, 10)
-          val partition = s"dt=$dt,hh=$hh"
-          if (filterHours.nonEmpty && filterHours.contains(hh)) {
-            None
-          } else {
-            Some(partition)
-          }
-        }.collect {
-          case Some(partition) => partition
-        }.map(partition => {
+      val recordRdd = timeRange.map(dt => s"dt=$dt").map(partition => {
           val odpsData = odpsOps.readTable(project = project,
               table = inputTable,
               partition = partition,
@@ -132,6 +121,7 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
               val mid = record.getString("mid")
               val pqtid = record.getString("pqtid")
               val apptype = record.getString("apptype")
+              val abcode = record.getString("abcode")
 
               featureMap.put("apptype", apptype)
               featureMap.put("ts", ts)
@@ -140,27 +130,29 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
               featureMap.put("hour", tsHour)
               featureMap.put("hour_quarter", tsHourQuarter)
 
-              val extend: JSONObject = if (record.isNull("extend")) new JSONObject() else
-                JSON.parseObject(record.getString("extend"))
-              val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("b1_feature"))
-              val b2: JSONObject = if (record.isNull("b2_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("b2_feature"))
-              val b3: JSONObject = if (record.isNull("b3_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("b3_feature"))
-              val b4: JSONObject = if (record.isNull("b4_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("b4_feature"))
-              val b5: JSONObject = if (record.isNull("b5_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("b5_feature"))
-              val b6: JSONObject = if (record.isNull("b6_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("b6_feature"))
-              val b7: JSONObject = if (record.isNull("b7_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("b7_feature"))
-              val b8: JSONObject = if (record.isNull("b8_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("b8_feature"))
-              val b9: JSONObject = if (record.isNull("b9_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("b9_feature"))
-
+              val mateFeature: JSONObject = if (record.isNull("metafeature")) new JSONObject() else
+                JSON.parseObject(record.getString("metafeature"))
+
+              val reqFeature: JSONObject = if (!mateFeature.containsKey("reqFeature")) new JSONObject() else
+                mateFeature.getJSONObject("reqFeature")
+              val b1: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_basic_info")) new JSONObject() else
+                mateFeature.getJSONObject("alg_cid_feature_basic_info")
+              val b2: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_adver_action")) new JSONObject() else
+                mateFeature.getJSONObject("alg_cid_feature_adver_action")
+              val b3: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_cid_action")) new JSONObject() else
+                mateFeature.getJSONObject("alg_cid_feature_cid_action")
+              val b4: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_region_action")) new JSONObject() else
+                mateFeature.getJSONObject("alg_cid_feature_region_action")
+              val b5: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_app_action")) new JSONObject() else
+                mateFeature.getJSONObject("alg_cid_feature_app_action")
+              val b6: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_week_action")) new JSONObject() else
+                mateFeature.getJSONObject("alg_cid_feature_week_action")
+              val b7: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_hour_action")) new JSONObject() else
+                mateFeature.getJSONObject("alg_cid_feature_hour_action")
+              val b8: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_brand_action")) new JSONObject() else
+                mateFeature.getJSONObject("alg_cid_feature_brand_action")
+              val b9: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_weChatVersion_action")) new JSONObject() else
+                mateFeature.getJSONObject("alg_cid_feature_weChatVersion_action")
 
               featureMap.put("cid_" + cid, idDefaultValue)
               if (b1.containsKey("adid") && b1.getString("adid").nonEmpty) {
@@ -205,28 +197,28 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
 
               featureMap.put("apptype_" + apptype, idDefaultValue);
 
-              if (extend.containsKey("abcode") && extend.getString("abcode").nonEmpty) {
-                featureMap.put("abcode_" + extend.getString("abcode"), idDefaultValue)
+              if (abcode.nonEmpty) {
+                featureMap.put("abcode_" + abcode, idDefaultValue)
               }
 
-              if (extend.containsKey("region") && extend.getString("region").nonEmpty) {
-                featureMap.put("region", extend.getString("region"))
+              if (reqFeature.containsKey("region") && reqFeature.getString("region").nonEmpty) {
+                featureMap.put("region", reqFeature.getString("region"))
               }
 
-              if (extend.containsKey("city") && extend.getString("city").nonEmpty) {
-                featureMap.put("city", extend.getString("city"))
+              if (reqFeature.containsKey("city") && reqFeature.getString("city").nonEmpty) {
+                featureMap.put("city", reqFeature.getString("city"))
               }
 
-              if (extend.containsKey("is_first_layer") && extend.getString("is_first_layer").nonEmpty) {
-                featureMap.put("is_first_layer", extend.getString("is_first_layer"))
+              if (reqFeature.containsKey("is_first_layer") && reqFeature.getString("is_first_layer").nonEmpty) {
+                featureMap.put("is_first_layer", reqFeature.getString("is_first_layer"))
               }
 
-              if (extend.containsKey("root_source_scene") && extend.getString("root_source_scene").nonEmpty) {
-                featureMap.put("root_source_scene", extend.getString("root_source_scene"))
+              if (reqFeature.containsKey("root_source_scene") && reqFeature.getString("root_source_scene").nonEmpty) {
+                featureMap.put("root_source_scene", reqFeature.getString("root_source_scene"))
               }
 
-              if (extend.containsKey("root_source_channel") && extend.getString("root_source_channel").nonEmpty) {
-                featureMap.put("root_source_channel", extend.getString("root_source_channel"))
+              if (reqFeature.containsKey("root_source_channel") && reqFeature.getString("root_source_channel").nonEmpty) {
+                featureMap.put("root_source_channel", reqFeature.getString("root_source_channel"))
               }
 
 
@@ -298,8 +290,9 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
                 }
               }
 
-              val c1: JSONObject = if (record.isNull("c1_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("c1_feature"))
+
+              val c1: JSONObject = if (!mateFeature.containsKey("alg_mid_feature_ad_action")) new JSONObject() else
+                mateFeature.getJSONObject("alg_mid_feature_ad_action")
 
               val midActionList = if (c1.containsKey("action") && c1.getString("action").nonEmpty) {
                 c1.getString("action").split(",").map(r => {
@@ -408,10 +401,10 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
                 ))
               }
 
-              val e1: JSONObject = if (record.isNull("e1_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("e1_feature"))
-              val e2: JSONObject = if (record.isNull("e2_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("e2_feature"))
+              val e1: JSONObject = if (!mateFeature.containsKey("alg_mid_feature_return_tags")) new JSONObject() else
+                mateFeature.getJSONObject("alg_mid_feature_return_tags")
+              val e2: JSONObject = if (!mateFeature.containsKey("alg_mid_feature_share_tags")) new JSONObject() else
+                mateFeature.getJSONObject("alg_mid_feature_share_tags")
               val title = b1.getOrDefault("cidtitle", "").toString
               if (title.nonEmpty) {
                 for ((en, prefix1) <- List((e1, "e1"), (e2, "e2"))) {
@@ -450,10 +443,10 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
                 featureMap.put("user_vid_share_tags_14d", e2.getString("tags_14d"))
               }
 
-              val g1: JSONObject = if (record.isNull("g1_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("g1_feature"))
-              val g2: JSONObject = if (record.isNull("g2_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("g2_feature"))
+              val g1: JSONObject = if (!mateFeature.containsKey("mid_return_video_cate")) new JSONObject() else
+                mateFeature.getJSONObject("mid_return_video_cate")
+              val g2: JSONObject = if (!mateFeature.containsKey("mid_share_video_cate")) new JSONObject() else
+                mateFeature.getJSONObject("mid_share_video_cate")
               if (g1.containsKey("cate1_14d") && g1.getString("cate1_14d").nonEmpty) {
                 featureMap.put("user_vid_return_cate1_14d", g1.getString("cate1_14d"))
               }
@@ -467,12 +460,13 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250522 {
                 featureMap.put("user_vid_share_cate2_14d", g2.getString("cate2_14d"))
               }
 
-              val d1: JSONObject = if (record.isNull("d1_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("d1_feature"))
-              val d2: JSONObject = if (record.isNull("d2_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("d2_feature"))
-              val d3: JSONObject = if (record.isNull("d3_feature")) new JSONObject() else
-                JSON.parseObject(record.getString("d3_feature"))
+
+              val d1: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_vid_cf")) new JSONObject() else
+                mateFeature.getJSONObject("alg_cid_feature_vid_cf")
+              val d2: JSONObject = if (!mateFeature.containsKey("alg_cid_feature_vid_cf_rank")) new JSONObject() else
+                mateFeature.getJSONObject("alg_cid_feature_vid_cf_rank")
+              val d3: JSONObject = if (!mateFeature.containsKey("alg_vid_feature_basic_info")) new JSONObject() else
+                mateFeature.getJSONObject("alg_vid_feature_basic_info")
 
               if (d1.nonEmpty) {
                 for (prefix <- List("3h", "6h", "12h", "1d", "3d", "7d")) {