zhaohaipeng пре 4 недеља
родитељ
комит
d3bd3c8486

+ 295 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_deconstruction_keywords/makedata_recsys_83_originData_20260306.scala

@@ -0,0 +1,295 @@
+package com.aliyun.odps.spark.examples.makedata_deconstruction_keywords
+
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.DataUtils.getStringValue
+import com.aliyun.odps.spark.examples.myUtils._
+import examples.utils.SimilarityUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+/*
+   20250317 提取特征
+ */
+
+object makedata_recsys_83_originData_20260306 {
+  private val labelNames = List(
+    "is_share", "share_cnt",
+    "is_return_1", "return_1_uv",
+    "is_return_n", "return_n_uv",
+    "is_return_noself", "return_1_uv_noself",
+    "is_return_n_noself", "return_n_uv_noself"
+  )
+  private val o2oMap = Map(
+    "creative" -> "creativeInfo"
+  )
+  private val detailPageO2OMap = Map(
+    "d1_feature" -> "scene_type_vid_cf_feature_20250212",
+    "d2_feature" -> "vid_click_cf_feature_20250212",
+    "d3_feature" -> "alg_recsys_feature_cf_i2i_v2",
+    "v2_feature" -> "head_video",
+    "creative" -> "creativeInfo"
+  )
+  private val returnPageO2OMap = Map(
+    "d1_feature" -> "null1",
+    "d2_feature" -> "null2",
+    "d3_feature" -> "null3",
+    "v2_feature" -> "null4",
+    "creative" -> "creativeInfo"
+  )
+
+  private def parseVideoRdd(videoRdd: RDD[Record]): RDD[(String, java.util.Map[String, String])] = {
+    videoRdd
+      .map(record => {
+        val vid = record.getString("vid")
+        val feature = ConvertUtils.getRecordCol(record, "feature")
+        feature.put("vid", vid)
+        (vid, feature)
+      })
+      .reduceByKey((a, b) => if (a.size() > b.size()) a else b)
+  }
+
+  private def joinVideoStat(logRdd: RDD[java.util.Map[String, String]], videoStat: RDD[(String, String)]): RDD[java.util.Map[String, String]] = {
+    logRdd
+      .map(raw => {
+        val vid = raw.getOrElse("vid", "")
+        (vid, raw)
+      })
+      .leftOuterJoin(videoStat)
+      .map(raw => {
+        val info = raw._2._1
+        if (raw._2._2.isDefined) {
+          info.put("b0_feature", raw._2._2.get)
+        }
+        info
+      })
+  }
+
+  private def joinChannelLayerStat(logRdd: RDD[java.util.Map[String, String]], videoStat: RDD[(String, String)]): RDD[java.util.Map[String, String]] = {
+    logRdd
+      .map(raw => {
+        val extendMap = ConvertUtils.getRecordCol(raw, "extend")
+        val channel = FeatureUtils.getUserChannel(raw.get("rootsourceid"), extendMap.get("group_name"))
+        val level = FeatureUtils.getUserLevel(extendMap.get("rootsessionid"), raw.get("subsessionid"))
+        val vid = raw.getOrElse("vid", "")
+        if (level.isEmpty) {
+          val key = Seq(channel, "非0层", vid).mkString(":")
+          (key, raw)
+        } else {
+          val key = Seq(channel, "0层", vid).mkString(":")
+          (key, raw)
+        }
+      })
+      .leftOuterJoin(videoStat)
+      .map(raw => {
+        val info = raw._2._1
+        if (raw._2._2.isDefined) {
+          info.put("b14_feature", raw._2._2.get)
+        }
+        info
+      })
+  }
+
+  private def joinChannelLayerHeadStat(logRdd: RDD[java.util.Map[String, String]], videoStat: RDD[(String, String)]): RDD[java.util.Map[String, String]] = {
+    logRdd
+      .map(raw => {
+        val extendMap = ConvertUtils.getRecordCol(raw, "extend")
+        val channel = FeatureUtils.getUserChannel(raw.get("rootsourceid"), extendMap.get("group_name"))
+        val level = FeatureUtils.getUserLevel(extendMap.get("rootsessionid"), raw.get("subsessionid"))
+        val headInfo = ConvertUtils.getRecordCol(raw, "v2_feature")
+        val unionid = headInfo.getOrElse("title_time_w_h_unionid", "")
+        val vid = raw.getOrElse("vid", "")
+        if (level.isEmpty) {
+          val key = Seq(channel, "非0层", unionid, vid).mkString(":")
+          (key, raw)
+        } else {
+          val key = Seq(channel, "0层", unionid, vid).mkString(":")
+          (key, raw)
+        }
+      })
+      .leftOuterJoin(videoStat)
+      .map(raw => {
+        val info = raw._2._1
+        if (raw._2._2.isDefined) {
+          info.put("b15_feature", raw._2._2.get)
+        }
+        info
+      })
+  }
+
+  private def getVidMidRdd(logRdd: RDD[java.util.Map[String, String]]): RDD[(String, String)] = {
+    logRdd
+      .map(raw => {
+        val mid = raw.getOrElse("mid", "")
+        val c9 = raw.getOrElse("c9_feature", "")
+        (mid, c9)
+      })
+      .filter(_._1.nonEmpty)
+      .reduceByKey((a, b) => a)
+      .flatMap(raw => {
+        val result = new ArrayBuffer[(String, String)]
+        for (hVid <- ConvertUtils.getVidList(raw._2)) {
+          result += ((hVid, raw._1)) // (vid, mid)
+        }
+        result
+      })
+  }
+
+  private def getMidSeqRdd(vidMidRdd: RDD[(String, String)], videoRdd: RDD[(String, java.util.Map[String, String])]): RDD[(String, List[java.util.Map[String, String]])] = {
+    vidMidRdd
+      .join(videoRdd)
+      .map(raw => {
+        (raw._2._1, raw._2._2)
+      })
+      .groupByKey()
+      .map(raw => {
+        (raw._1, raw._2.toList)
+      })
+  }
+
+  private def joinMidSeq(logRdd: RDD[java.util.Map[String, String]], midSeqRdd: RDD[(String, List[java.util.Map[String, String]])]): RDD[(java.util.Map[String, String], List[java.util.Map[String, String]])] = {
+    logRdd
+      .map(raw => {
+        val mid = raw.getOrElse("mid", "")
+        (mid, raw)
+      })
+      .leftOuterJoin(midSeqRdd)
+      .map(raw => {
+        if (raw._2._2.isDefined) {
+          (raw._2._1, raw._2._2.get)
+        } else {
+          (raw._2._1, List())
+        }
+      })
+  }
+
+  private def getFeature(rdd: RDD[(java.util.Map[String, String], List[java.util.Map[String, String]])]): RDD[String] = {
+    rdd.mapPartitions(partition => {
+      SimilarityUtils.init()
+      partition.map(raw => {
+        val record = raw._1
+        val videoSeq = raw._2
+        val logKey = DataUtils.getLogKey(record)
+        val labels = DataUtils.getLabels(labelNames, record).toString
+        val features = ConvertV2.getFeature(record, videoSeq, 6).toString
+        val scoresMap = DataUtils.getSubJson(record, "extend_alg", "scoresMap").toString
+        if (features.nonEmpty) {
+          logKey + "\t" + labels + "\t" + scoresMap + "\t" + features
+        } else {
+          ""
+        }
+      })
+    })
+      .filter(_.nonEmpty)
+  }
+
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1. 解析参数
+    val param = ParamUtils.parseArgs(args)
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "dwd_recsys_alg_sample_all_20250212")
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val beginStr = param.getOrElse("beginStr", "2025031700")
+    val endStr = param.getOrElse("endStr", "2025031700")
+    val whatPages = param.getOrElse("whatPages", "详情后沉浸页,回流后沉浸页&内页feed,首页feed,详情页,回流页").split(",").toSet
+    val whatLabel = param.getOrElse("whatLabel", "is_return_noself")
+    val fuSampleRate = param.getOrElse("fuSampleRate", "0.03").toDouble
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/83_origin_data/")
+    val repartition = param.getOrElse("repartition", "64").toInt
+
+    // 2. 数据处理
+    val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
+    for (dt_hh <- timeRange) {
+      // 2.1 分区
+      val dt = dt_hh.substring(0, 8)
+      val hh = dt_hh.substring(8, 10)
+      val partition = "dt=%s,hh=%s".format(dt, hh)
+      println("开始执行partition:" + partition)
+
+      // 2.2 加载视频信息
+      val originVideo = DataUtils.getODPSData(sc, project, "alg_recsys_feature_user_behavior_video", partition, tablePart)
+      val uniqVideo = parseVideoRdd(originVideo)
+
+      // 2.3 加载视频统计信息
+      val videoStat = DataUtils.getODPSData(sc, project, "alg_recsys_feature_video_clean_stat", partition, tablePart)
+        .map(record => {
+          val vid = record.getString("vid")
+          val feature = record.getString("feature")
+          (vid, feature)
+        })
+
+      // 2.4 加载渠道特征
+      val channelLayerStat = DataUtils.getODPSData(sc, project, "alg_recsys_feature_video_recommend_channel_layer", partition, 8)
+        .map(record => {
+          val channel = record.getString("channel")
+          val layer = record.getString("layer")
+          val vid = record.getString("vid")
+          val feature = record.getString("feature")
+          val key = Seq(channel, layer, vid).mkString(":")
+          (key, feature)
+        })
+
+      // 2.5 加载渠道头部特征
+      val channelLayerHeadStat = DataUtils.getODPSData(sc, project, "alg_recsys_feature_video_recommend_channel_layer_head", partition, 8)
+        .map(record => {
+          val channel = record.getString("channel")
+          val layer = record.getString("layer")
+          val unionid = record.getString("unionid")
+          val vid = record.getString("vid")
+          val feature = record.getString("feature")
+          val key = Seq(channel, layer, unionid, vid).mkString(":")
+          (key, feature)
+        })
+
+      // 2.6 加载样本数据
+      val odpsData = DataUtils.getODPSData(sc, project, table, partition, tablePart)
+        .filter(record => {
+          val page = getStringValue(record, "page")
+          whatPages.contains(page)
+        })
+
+      // 2.7 样本重采样
+      val filterColumns = Set("allfeaturemap", "metafeaturemap")
+      val resampleData = DataUtils.resampleWithoutInterception(whatLabel, fuSampleRate, odpsData)
+        .map(record => {
+          val page = getStringValue(record, "page")
+          if (page.equals("详情页")) {
+            OnlineLogUtils.log2Map(record, detailPageO2OMap)
+          } else if (page.equals("回流页")) {
+            OnlineLogUtils.log2Map(record, returnPageO2OMap)
+          } else {
+            OnlineLogUtils.log2Map(record, o2oMap)
+          }
+        })
+
+      // 2.8 join video stat
+      val sampleDataTmp1 = joinVideoStat(resampleData, videoStat)
+      val sampleDataTmp2 = joinChannelLayerStat(sampleDataTmp1, channelLayerStat)
+      val sampleData = joinChannelLayerHeadStat(sampleDataTmp2, channelLayerHeadStat)
+
+      // 2.9 get vid mid rdd
+      val vidMidRdd = getVidMidRdd(sampleData)
+
+      // 2.10 get mid seq rdd
+      val midSeqRdd = getMidSeqRdd(vidMidRdd, uniqVideo)
+
+      // 2.11 历史行为关联video
+      val seqSampleData = joinMidSeq(sampleData, midSeqRdd)
+
+      // 2.12 特征转换
+      val featureData = getFeature(seqSampleData)
+
+      // 2.13 保存数据
+      val hdfsPath = "%s/%s%s".format(savePath, dt, hh)
+      DataUtils.saveData(featureData, hdfsPath, repartition)
+    }
+  }
+}

+ 25 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/FeatureTransformV2.java

@@ -293,6 +293,18 @@ public class FeatureTransformV2 {
         getVideoAttrSRCrossFeature("c9_c2s", rankVideo.getOrDefault("merge_second_level_cate", ""), profile.getC2_s(), featMap);
         getVideoAttrSRCrossFeature("c9_l1s", rankVideo.getOrDefault("festive_label1", ""), profile.getL1_s(), featMap);
         getVideoAttrSRCrossFeature("c9_l2s", rankVideo.getOrDefault("festive_label2", ""), profile.getL2_s(), featMap);
+
+        // 视频解构的关键词
+        if (rankVideo.containsKey("dk_keywords")) {
+            String dkKeywords = rankVideo.get("dk_keywords");
+            if (Objects.isNull(dkKeywords) || dkKeywords.isEmpty()) {
+                return;
+            }
+            for (String kw : dkKeywords.split("[,,、]")) {
+                kw = kw.replaceAll("(\\s+|\\t|:)", "");
+                getVideoAttrSRCrossFeature("c9_dks", kw, profile.getD_k_s(), featMap);
+            }
+        }
     }
 
     private static void getRSCrossFeature(boolean flag, String prefix, long currentMs, int maxN, List<UserSRBO> list, Map<String, String> rankVideo, Map<String, Map<String, String>> hVideoMap, Map<String, Double> featMap) {
@@ -398,6 +410,19 @@ public class FeatureTransformV2 {
                 }
             }
         }
+        // 视频解构的关键词 ID特征
+        if (videoInfo.containsKey("dk_keywords")) {
+            String dkKeywords = videoInfo.get("dk_keywords");
+            if (Objects.nonNull(dkKeywords) && !dkKeywords.isEmpty()) {
+                for (String kw : dkKeywords.split("[,,、]")) {
+                    kw = kw.replaceAll("(\\s+|\\t|:)", "");
+                    if (!kw.isEmpty()) {
+                        String featKey = String.format("%s@dkw@%s", prefix, kw);
+                        featMap.put(featKey, 1.0);
+                    }
+                }
+            }
+        }
     }
 
     private static void getTwoVideoCrossFeature(String prefix, List<String> attrs, Map<String, String> video1, Map<String, String> video2, Map<String, Double> featMap) {

+ 1 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/UserShareReturnProfile.java

@@ -22,6 +22,7 @@ public class UserShareReturnProfile {
     private List<UserSRBO> l_r1_s;   // last_return_seq(最近1层回流序列)
     private Map<String, VideoAttrSRBO> c1_s;   // cate1_seq(merge_first_level_cate序列-回流率)
     private Map<String, VideoAttrSRBO> c2_s;   // cate2_seq(merge_second_level_cate序列-回流率)
+    private Map<String, VideoAttrSRBO> d_k_s;   // cate2_seq(merge_second_level_cate序列-回流率)
     private Map<String, VideoAttrSRBO> l1_s;   // label1_seq(festive_label1序列-回流率)
     private Map<String, VideoAttrSRBO> l2_s;   // label2_seq(festive_label2序列-回流率)