zhangbo hace 1 año
padre
commit
4c2bdde6c0

+ 39 - 38
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -31,6 +31,7 @@ object makedata_02_writeredis {
     val date = param.getOrDefault("date", "20231220")
     val expireDay = param.getOrDefault("expireDay", "2").toInt
     val ifDebug = param.getOrDefault("ifDebug", "False").toBoolean
+    val ifWriteRedisUser = param.getOrDefault("ifWriteRedisUser", "False").toBoolean
     val ifWriteRedis = param.getOrDefault("ifWriteRedis", "True").toBoolean
     val partition = partitionPrefix + date
     val savePathUser = param.getOrDefault("savePathUser", "")
@@ -64,7 +65,6 @@ object makedata_02_writeredis {
         userDataTake.foreach(r=> println(r._1 + "\t" + r._2 + "\t" + r._3))
         userData = sc.parallelize(userDataTake)
       }
-
       if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {
         var savePathPart = savePathUser + "/user/" + partition
         if (userSampleIDs.nonEmpty) {
@@ -74,48 +74,49 @@ object makedata_02_writeredis {
         userData.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
       }
       println("user.action.count=" + userData.count())
-
-      if (ifWriteRedis){
-        var savePathPart = savePathUser + "/user/" + partition
-        if (userSampleIDs.nonEmpty) {
-          savePathPart = savePathPart + "_" + userSampleIDs
+    } else {
+      println("不处理user")
+    }
+    if (ifWriteRedisUser){
+      var savePathPart = savePathUser + "/user/" + partition
+      if (userSampleIDs.nonEmpty) {
+        savePathPart = savePathPart + "_" + userSampleIDs
+      }
+      val userDataRead = sc.textFile(savePathPart).map(r => {
+        val rList = r.split("\t")
+        (rList(0), rList(1))
+      })
+      val userDataTakeRddRun = userDataRead.mapPartitions(row => {
+        val redisFormat = new util.HashMap[String, String]
+        val redisTemplate = env.getRedisTemplate()
+        var i = 1
+        row.foreach {
+          case (key, value) =>
+            if (key.nonEmpty) {
+              redisFormat.put(userRedisKeyPrefix + key, value)
+            }
+            if (ifWriteRedis && i % 1000 == 0) {
+              redisTemplate.opsForValue.multiSet(redisFormat)
+              redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
+              redisFormat.clear()
+            }
+            i = i + 1
         }
-        val userDataRead = sc.textFile(savePathPart).map(r => {
-          val rList = r.split("\t")
-          (rList(0), rList(1))
-        })
-        val userDataTakeRddRun = userDataRead.mapPartitions(row => {
-          val redisFormat = new util.HashMap[String, String]
-          val redisTemplate = env.getRedisTemplate()
-          var i = 1
-          row.foreach {
-            case (key, value) =>
-              if (key.nonEmpty) {
-                redisFormat.put(userRedisKeyPrefix + key, value)
-              }
-              if (ifWriteRedis && i % 1000 == 0) {
-                redisTemplate.opsForValue.multiSet(redisFormat)
-                redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
-                redisFormat.clear()
-              }
-              i = i + 1
-          }
-          if (ifWriteRedis) {
-            redisTemplate.opsForValue.multiSet(redisFormat)
-            redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
-            redisFormat.clear()
-          }
-          redisFormat.iterator
-        })
-        println("put in redis.count=" + userDataTakeRddRun.count())
+        if (ifWriteRedis) {
+          redisTemplate.opsForValue.multiSet(redisFormat)
+          redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
+          redisFormat.clear()
+        }
+        redisFormat.iterator
+      })
+      println("put in redis.count=" + userDataTakeRddRun.count())
+    } else {
+      println("不处理user的redis写入")
+    }
 
-      }
 
 
 
-    }else{
-      println("不处理user")
-    }
     // 4 video测特征处理
     if (ifVideo){
       println("video特征处理")