zhangbo 1 year ago
parent
commit
20a82f6659

+ 43 - 26
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -64,38 +64,55 @@ object makedata_02_writeredis {
         userDataTake.foreach(r=> println(r._1 + "\t" + r._2 + "\t" + r._3))
         userData = sc.parallelize(userDataTake)
       }
-      val userDataTakeRddRun = userData.mapPartitions(row => {
-        val redisFormat = new util.HashMap[String, String]
-        val redisTemplate = env.getRedisTemplate()
-        var i = 1
-        row.foreach{
-          case (key, value, _) =>
-            if (key.nonEmpty){
-              redisFormat.put(userRedisKeyPrefix + key, value)
-            }
-            if (ifWriteRedis && i % 1000 == 0){
-              redisTemplate.opsForValue.multiSet(redisFormat)
-              redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
-              redisFormat.clear()
-            }
-            i = i + 1
-        }
-        if (ifWriteRedis) {
-          redisTemplate.opsForValue.multiSet(redisFormat)
-          redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
-          redisFormat.clear()
-        }
-        redisFormat.iterator
-      })
+
       if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {
         var savePathPart = savePathUser + "/user/" + partition
-        if (userSampleIDs.nonEmpty){
+        if (userSampleIDs.nonEmpty) {
           savePathPart = savePathPart + "_" + userSampleIDs
         }
         MyHdfsUtils.delete_hdfs_path(savePathPart)
-        userDataTakeRddRun.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
+        userData.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
       }
-      println("user.action.count=" + userDataTakeRddRun.count())
+      println("user.action.count=" + userData.count())
+
+      if (ifWriteRedis){
+        var savePathPart = savePathUser + "/user/" + partition
+        if (userSampleIDs.nonEmpty) {
+          savePathPart = savePathPart + "_" + userSampleIDs
+        }
+        val userDataRead = sc.textFile(savePathPart).map(r => {
+          val rList = r.split("\t")
+          (rList(0), rList(1))
+        })
+        val userDataTakeRddRun = userDataRead.mapPartitions(row => {
+          val redisFormat = new util.HashMap[String, String]
+          val redisTemplate = env.getRedisTemplate()
+          var i = 1
+          row.foreach {
+            case (key, value) =>
+              if (key.nonEmpty) {
+                redisFormat.put(userRedisKeyPrefix + key, value)
+              }
+              if (ifWriteRedis && i % 1000 == 0) {
+                redisTemplate.opsForValue.multiSet(redisFormat)
+                redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
+                redisFormat.clear()
+              }
+              i = i + 1
+          }
+          if (ifWriteRedis) {
+            redisTemplate.opsForValue.multiSet(redisFormat)
+            redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
+            redisFormat.clear()
+          }
+          redisFormat.iterator
+        })
+        println("put in redis.count=" + userDataTakeRddRun.count())
+
+      }
+
+
+
     }else{
       println("不处理user")
     }