Browse Source

new feature

jch 2 months ago
parent
commit
f76ca2312f

+ 34 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_82_originData_20250221.scala

@@ -33,6 +33,34 @@ object makedata_recsys_82_originData_20250221 {
       .reduceByKey((a, b) => if (a.size() > b.size()) a else b)
   }
 
+  private def getVidMidRdd(logRdd: RDD[Record]): RDD[(String, String)] = {
+    logRdd
+      .map(record => {
+        val mid = record.getString("mid")
+        (mid, record)
+      })
+      .filter(_._1.nonEmpty)
+      .flatMap(raw => {
+        val result = new ArrayBuffer[(String, String)]
+        for (hVid <- ConvertUtils.getVidList(raw._2.getString("c9_feature"))) {
+          result += ((hVid, raw._1)) // (vid, mid)
+        }
+        result
+      })
+  }
+
+  private def getMidSeqRdd(vidMidRdd: RDD[(String, String)], videoRdd: RDD[(String, java.util.Map[String, String])]): RDD[(String, List[java.util.Map[String, String]])] = {
+    vidMidRdd
+      .join(videoRdd)
+      .map(raw => {
+        (raw._2._1, raw._2._2)
+      })
+      .groupByKey()
+      .map(raw => {
+        (raw._1, raw._2.toList)
+      })
+  }
+
   private def joinVideoMap(logRdd: RDD[Record], videoRdd: RDD[(String, java.util.Map[String, String])]): RDD[(Record, List[java.util.Map[String, String]])] = {
     val midLogRdd = logRdd
       .map(record => {
@@ -117,6 +145,12 @@ object makedata_recsys_82_originData_20250221 {
       // d. 样本重采样
       val resampleData = DataUtils.resample(whatLabel, fuSampleRate, odpsData)
 
+      // e. get vid mid rdd
+      val vidMidRdd = getVidMidRdd(resampleData)
+
+      // f. get mid seq rdd
+      val midSeqRdd = getMidSeqRdd(vidMidRdd, uniqVideo)
+
       // e. 历史行为关联video
       val seqSampleData = joinVideoMap(resampleData, uniqVideo)