zhangbo 1 jaar geleden
bovenliggende
commit
7ae5ffd248

+ 7 - 9
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -9,10 +9,11 @@ import org.springframework.data.redis.connection.RedisStandaloneConfiguration
 import org.springframework.data.redis.connection.jedis.JedisConnectionFactory
 import org.springframework.data.redis.core.RedisTemplate
 import org.springframework.data.redis.serializer.StringRedisSerializer
-
+import java.util.concurrent.TimeUnit
 import java.util
 import scala.collection.JavaConversions._
 
+
 object makedata_02_writeredis {
   def main(args: Array[String]) {
     val spark = SparkSession
@@ -34,10 +35,9 @@ object makedata_02_writeredis {
     val project = "loghubods"
     val tableItem = "alg_recsys_video_info"
     val tableUser = "alg_recsys_user_info"
-    val partition = "dt=20231218"
+    val partition = "dt=20231220"
 
     //用户测特征处理
-    println("用户测特征处理-存之前打印")
     val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsUrl, tunnelUrl)
     val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = 100)
     val userDataTake = userData.take(10)
@@ -52,14 +52,12 @@ object makedata_02_writeredis {
       row.foreach(r =>{
         val key = r.get(0)
         val value = r.get(1)
-        redisTemplate.opsForValue().set(key, value, 3600*2)
+        redisFormat.put(key, value)
       })
+      redisTemplate.opsForValue.multiSet(redisFormat)
+      redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24*7, TimeUnit.HOURS))
       redisFormat.iterator
     })
-    println("用户测特征处理-存完再打印")
-    userDataTakeRddRun.take(10).foreach(r => {
-     r._1+ "\t" + r._2
-    })
     println("user.action.count="+userDataTakeRddRun.count())
 
     //video测特征处理
@@ -80,10 +78,10 @@ object makedata_02_writeredis {
         redisFormat.put(key, value)
       }
       redisTemplate.opsForValue.multiSet(redisFormat)
+      redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24*7, TimeUnit.HOURS))
       redisFormat.iterator
     })
     println("item.action.count="+itemDataTakeRddRun.count())
-
   }
 
   def handleUser(record: Record, schema: TableSchema): util.ArrayList[String] = {