zhangbo 1 سال پیش
والد
کامیت
f1c1b8d415
1فایلهای تغییر یافته به همراه12 افزوده شده و 8 حذف شده
  1. 12 8
      src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

+ 12 - 8
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -67,20 +67,24 @@ object makedata_02_writeredis {
       val userDataTakeRddRun = userData.mapPartitions(row => {
         val redisFormat = new util.HashMap[String, String]
         val redisTemplate = env.getRedisTemplate()
+        var i = 1
         row.foreach{
           case (key, value, _) =>
             if (key.nonEmpty){
               redisFormat.put(userRedisKeyPrefix + key, value)
-              if (ifWriteRedis) {
-                redisTemplate.opsForValue.set(userRedisKeyPrefix + key, value, 24 * expireDay, TimeUnit.HOURS)
-              }
             }
+            if (ifWriteRedis && i % 1000 == 0){
+              redisTemplate.opsForValue.multiSet(redisFormat)
+              redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
+              redisFormat.clear()
+            }
+            i = i + 1
+        }
+        if (ifWriteRedis) {
+          redisTemplate.opsForValue.multiSet(redisFormat)
+          redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
+          redisFormat.clear()
         }
-//        if (ifWriteRedis){
-//          val redisTemplate = env.getRedisTemplate()
-//          redisTemplate.opsForValue.multiSet(redisFormat)
-//          redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24 * expireDay, TimeUnit.HOURS))
-//        }
         redisFormat.iterator
       })
       if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {