浏览代码

扩展特征

jch 1 月之前
父节点
当前提交
259bcdc24c

+ 86 - 9
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_83_originData_20250317.scala

@@ -62,6 +62,58 @@ object makedata_recsys_83_originData_20250317 {
       })
   }
 
+  private def joinChannelLayerStat(logRdd: RDD[java.util.Map[String, String]], videoStat: RDD[(String, String)]): RDD[java.util.Map[String, String]] = {
+    logRdd
+      .map(raw => {
+        val extendMap = ConvertUtils.getRecordCol(raw, "extend")
+        val channel = FeatureUtils.getUserChannel(raw.get("rootsourceid"), extendMap.get("group_name"))
+        val level = FeatureUtils.getUserLevel(extendMap.get("rootsessionid"), raw.get("subsessionid"))
+        val vid = raw.getOrElse("vid", "")
+        if (level.isEmpty) {
+          val key = Seq(channel, "非0层", vid).mkString(":")
+          (key, raw)
+        } else {
+          val key = Seq(channel, "0层", vid).mkString(":")
+          (key, raw)
+        }
+      })
+      .leftOuterJoin(videoStat)
+      .map(raw => {
+        val info = raw._2._1
+        if (raw._2._2.isDefined) {
+          info.put("b14_feature", raw._2._2.get)
+        }
+        info
+      })
+  }
+
+  private def joinChannelLayerHeadStat(logRdd: RDD[java.util.Map[String, String]], videoStat: RDD[(String, String)]): RDD[java.util.Map[String, String]] = {
+    logRdd
+      .map(raw => {
+        val extendMap = ConvertUtils.getRecordCol(raw, "extend")
+        val channel = FeatureUtils.getUserChannel(raw.get("rootsourceid"), extendMap.get("group_name"))
+        val level = FeatureUtils.getUserLevel(extendMap.get("rootsessionid"), raw.get("subsessionid"))
+        val headInfo = ConvertUtils.getRecordCol(raw, "v2_feature")
+        val unionid = headInfo.getOrElse("title_time_w_h_unionid", "")
+        val vid = raw.getOrElse("vid", "")
+        if (level.isEmpty) {
+          val key = Seq(channel, "非0层", unionid, vid).mkString(":")
+          (key, raw)
+        } else {
+          val key = Seq(channel, "0层", unionid, vid).mkString(":")
+          (key, raw)
+        }
+      })
+      .leftOuterJoin(videoStat)
+      .map(raw => {
+        val info = raw._2._1
+        if (raw._2._2.isDefined) {
+          info.put("b15_feature", raw._2._2.get)
+        }
+        info
+      })
+  }
+
   private def getVidMidRdd(logRdd: RDD[java.util.Map[String, String]]): RDD[(String, String)] = {
     logRdd
       .map(raw => {
@@ -164,14 +216,37 @@ object makedata_recsys_83_originData_20250317 {
           (vid, feature)
         })
 
-      // 2.4 加载样本数据
+      // 2.4 加载渠道特征
+      val channelLayerStat = DataUtils.getODPSData(sc, project, "alg_recsys_feature_video_recommend_channel_layer", partition, 8)
+        .map(record => {
+          val channel = record.getString("channel")
+          val layer = record.getString("layer")
+          val vid = record.getString("vid")
+          val feature = record.getString("feature")
+          val key = Seq(channel, layer, vid).mkString(":")
+          (key, feature)
+        })
+
+      // 2.5 加载渠道头部特征
+      val channelLayerHeadStat = DataUtils.getODPSData(sc, project, "alg_recsys_feature_video_recommend_channel_layer_head", partition, 8)
+        .map(record => {
+          val channel = record.getString("channel")
+          val layer = record.getString("layer")
+          val unionid = record.getString("unionid")
+          val vid = record.getString("vid")
+          val feature = record.getString("feature")
+          val key = Seq(channel, layer, unionid, vid).mkString(":")
+          (key, feature)
+        })
+
+      // 2.6 加载样本数据
       val odpsData = DataUtils.getODPSData(sc, project, table, partition, tablePart)
         .filter(record => {
           val page = getStringValue(record, "page")
           whatPages.contains(page)
         })
 
-      // 2.5 样本重采样
+      // 2.7 样本重采样
       val filterColumns = Set("allfeaturemap", "metafeaturemap")
       val resampleData = DataUtils.resampleWithoutInterception(whatLabel, fuSampleRate, odpsData)
         .map(record => {
@@ -185,22 +260,24 @@ object makedata_recsys_83_originData_20250317 {
           }
         })
 
-      // 2.6 join video stat
-      val sampleData = joinVideoStat(resampleData, videoStat)
+      // 2.8 join video stat
+      val sampleDataTmp1 = joinVideoStat(resampleData, videoStat)
+      val sampleDataTmp2 = joinChannelLayerStat(sampleDataTmp1, channelLayerStat)
+      val sampleData = joinChannelLayerHeadStat(sampleDataTmp2, channelLayerHeadStat)
 
-      // 2.7 get vid mid rdd
+      // 2.9 get vid mid rdd
       val vidMidRdd = getVidMidRdd(sampleData)
 
-      // 2.8 get mid seq rdd
+      // 2.10 get mid seq rdd
       val midSeqRdd = getMidSeqRdd(vidMidRdd, uniqVideo)
 
-      // 2.9 历史行为关联video
+      // 2.11 历史行为关联video
       val seqSampleData = joinMidSeq(sampleData, midSeqRdd)
 
-      // 2.10 特征转换
+      // 2.12 特征转换
       val featureData = getFeature(seqSampleData)
 
-      // 2.11 保存数据
+      // 2.13 保存数据
       val hdfsPath = "%s/%s%s".format(savePath, dt, hh)
       DataUtils.saveData(featureData, hdfsPath, repartition)
     }