zhangbo 1 year ago
parent
commit
d47758baad

+ 33 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -31,6 +31,7 @@ object makedata_02_writeredis {
     val date = param.getOrDefault("date", "20231220")
     val date = param.getOrDefault("date", "20231220")
     val expireDay = param.getOrDefault("expireDay", "2").toInt
     val expireDay = param.getOrDefault("expireDay", "2").toInt
     val ifDebug = param.getOrDefault("ifDebug", "False").toBoolean
     val ifDebug = param.getOrDefault("ifDebug", "False").toBoolean
+    val ifDeleteRedisUser = param.getOrDefault("ifDeleteRedisUser", "False").toBoolean
     val ifWriteRedisUser = param.getOrDefault("ifWriteRedisUser", "False").toBoolean
     val ifWriteRedisUser = param.getOrDefault("ifWriteRedisUser", "False").toBoolean
     val ifWriteRedis = param.getOrDefault("ifWriteRedis", "True").toBoolean
     val ifWriteRedis = param.getOrDefault("ifWriteRedis", "True").toBoolean
     val partition = partitionPrefix + date
     val partition = partitionPrefix + date
@@ -77,6 +78,38 @@ object makedata_02_writeredis {
     } else {
     } else {
       println("不处理user")
       println("不处理user")
     }
     }
+
+    if (ifDeleteRedisUser){
+      var savePathPart = savePathUser + "/user/" + partition
+      if (userSampleIDs.nonEmpty) {
+        savePathPart = savePathPart + "_" + userSampleIDs
+      }
+      val userDataRead = sc.textFile(savePathPart).map(r => {
+        val rList = r.split("\t")
+        (rList(0), rList(1))
+      })
+      val userDataTakeRddRun = userDataRead.mapPartitions(row => {
+        val redisFormat = new util.HashMap[String, String]
+        val redisTemplate = env.getRedisTemplate()
+        var i = 1
+        row.foreach {
+          case (key, value) =>
+            if (key.nonEmpty) {
+              redisFormat.put(userRedisKeyPrefix + key, value)
+            }
+            if (i % 1000 == 0) {
+              redisTemplate.delete(redisFormat.map(_._1))
+              redisFormat.clear()
+            }
+            i = i + 1
+        }
+        redisTemplate.delete(redisFormat.map(_._1))
+        redisFormat.clear()
+        redisFormat.iterator
+      })
+      println("delete redis.count=" + userDataTakeRddRun.count())
+    }
+
     if (ifWriteRedisUser){
     if (ifWriteRedisUser){
       var savePathPart = savePathUser + "/user/" + partition
       var savePathPart = savePathUser + "/user/" + partition
       if (userSampleIDs.nonEmpty) {
       if (userSampleIDs.nonEmpty) {