zhangbo 1 ano atrás
pai
commit
57f2300adf

+ 19 - 14
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_09_user2redis.scala

@@ -55,19 +55,6 @@ object makedata_09_user2redis {
     if (ifUser){
       println("user特征处理")
 
-      // 3-2 读取最近2个月有播放行为的mid集合
-      var midRdd = sc.emptyRDD[String]
-      MyDateUtils.getDateRange(MyDateUtils.getNumDaysBefore(date, midDays), date).foreach(d => {
-        println("-----------读取播放信息:"+d)
-        val partitionMid = "dt=" + d
-        val data = odpsOps.readTable(project = "loghubods", table = "play_action_log",
-            partition = partitionMid, transfer = func, numPartition = tablePart)
-          .map(r => {
-            if (r.isNull("machinecode")) "" else r.getString("machinecode")
-          }).filter(_.nonEmpty)
-        midRdd = midRdd.union(data).distinct()
-      })
-      println("------------mid处理完毕:" + midRdd.count() + "------------------")
 
       var userData = odpsOps.readTable(project = project, table = tableUser, partition = partition,
         transfer = func, numPartition = tablePart)
@@ -93,7 +80,7 @@ object makedata_09_user2redis {
           })
           (mid, resultNew.toString())
         })
-      userData = userData.join(midRdd.map(r=> (r, 1))).map(r=> (r._1, r._2._1))
+//      userData = userData.join(midRdd.map(r=> (r, 1))).map(r=> (r._1, r._2._1))
 
       if (userSampleIDs.nonEmpty){
         val IDs = userSampleIDs.split(",").filter(_.nonEmpty).map(_.toInt).toList
@@ -156,6 +143,22 @@ object makedata_09_user2redis {
     }
 
     if (ifWriteRedisUser){
+
+      // 3-2 读取最近2个月有播放行为的mid集合
+      var midRdd = sc.emptyRDD[String]
+      MyDateUtils.getDateRange(MyDateUtils.getNumDaysBefore(date, midDays), date).foreach(d => {
+        println("-----------读取播放信息:" + d)
+        val partitionMid = "dt=" + d
+        val data = odpsOps.readTable(project = "loghubods", table = "play_action_log",
+            partition = partitionMid, transfer = func, numPartition = tablePart)
+          .map(r => {
+            if (r.isNull("machinecode")) "" else r.getString("machinecode")
+          }).filter(_.nonEmpty)
+        midRdd = midRdd.union(data).distinct()
+      })
+      println("------------mid处理完毕:" + midRdd.count() + "------------------")
+
+
       println("user redis 写入")
       var savePathPart = savePathUser + "/" + partition
       if (userSampleIDs.nonEmpty) {
@@ -167,6 +170,8 @@ object makedata_09_user2redis {
         val rList = r.split("\t")
         (rList(0), rList(1))
       })
+        .join(midRdd.map(r=> (r, 1))).map(r=> (r._1, r._2._1))
+
       val userDataTakeRddRun = userDataRead.mapPartitions(row => {
         val redisFormat = new util.HashMap[String, String]
         val redisTemplate = env.getRedisTemplate()