jch il y a 2 mois
Parent
commit
6c8fdddc1f

+ 8 - 27
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_82_originData_20250221.scala

@@ -40,7 +40,6 @@ object makedata_recsys_82_originData_20250221 {
         (mid, record)
       })
       .filter(_._1.nonEmpty)
-      .reduceByKey((a, b) => a)
       .flatMap(raw => {
         val result = new ArrayBuffer[(String, String)]
         for (hVid <- ConvertUtils.getVidList(raw._2.getString("c9_feature"))) {
@@ -52,6 +51,7 @@ object makedata_recsys_82_originData_20250221 {
 
   private def getMidSeqRdd(vidMidRdd: RDD[(String, String)], videoRdd: RDD[(String, java.util.Map[String, String])]): RDD[(String, List[java.util.Map[String, String]])] = {
     vidMidRdd
+      .reduceByKey((a, b) => a)
       .join(videoRdd)
       .map(raw => {
         (raw._2._1, raw._2._2)
@@ -62,32 +62,13 @@ object makedata_recsys_82_originData_20250221 {
       })
   }
 
-  private def joinVideoMap(logRdd: RDD[Record], videoRdd: RDD[(String, java.util.Map[String, String])]): RDD[(Record, List[java.util.Map[String, String]])] = {
-    val midLogRdd = logRdd
+  private def joinMidSeq(logRdd: RDD[Record], midSeqRdd: RDD[(String, List[java.util.Map[String, String]])]): RDD[(Record, List[java.util.Map[String, String]])] = {
+    logRdd
       .map(record => {
         val mid = record.getString("mid")
         (mid, record)
       })
-
-    val midSeqRdd = midLogRdd
-      .filter(_._1.nonEmpty)
-      .flatMap(raw => {
-        val result = new ArrayBuffer[(String, String)]
-        for (hVid <- ConvertUtils.getVidList(raw._2.getString("c9_feature"))) {
-          result += ((hVid, raw._1)) // (vid, mid)
-        }
-        result
-      })
-      .join(videoRdd) // (vid, (mid, map))
-      .map(raw => {
-        (raw._2._1, raw._2._2)
-      })
-      .groupByKey()
-      .map(raw => {
-        (raw._1, raw._2.toList)
-      })
-
-    midLogRdd.leftOuterJoin(midSeqRdd)
+      .leftOuterJoin(midSeqRdd)
       .map(raw => {
         (raw._2._1, raw._2._2.orNull)
       })
@@ -152,13 +133,13 @@ object makedata_recsys_82_originData_20250221 {
       // f. get mid seq rdd
       val midSeqRdd = getMidSeqRdd(vidMidRdd, uniqVideo)
 
-      // e. 历史行为关联video
-      val seqSampleData = joinVideoMap(resampleData, uniqVideo)
+      // g. 历史行为关联video
+      val seqSampleData = joinMidSeq(resampleData, midSeqRdd)
 
-      // d. 特征转换
+      // h. 特征转换
       val featureData = getFeature(seqSampleData)
 
-      // f. 保存数据
+      // i. 保存数据
       val hdfsPath = "%s/%s%s".format(savePath, dt, hh)
       DataUtils.saveData(featureData, hdfsPath, repartition)
     }