zhangbo 1 ano atrás
pai
commit
62beba72c4

+ 10 - 9
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -37,7 +37,7 @@ object makedata_02_writeredis {
     val partition = "dt=20231218"
 
     //用户测特征处理
-    println("用户测特征处理")
+    println("用户测特征处理-存之前打印")
     val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsUrl, tunnelUrl)
     val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = 100)
     val userDataTake = userData.take(10)
@@ -47,16 +47,18 @@ object makedata_02_writeredis {
     val userDataTakeRdd = sc.parallelize(userDataTake, 50)
 
     userDataTakeRdd.mapPartitions(row=>{
-      val result = new util.ArrayList[String]()
       val redisTemplate = this.getRedisTemplate()
       val redisFormat = new util.HashMap[String, String]
-      for (r <- row){
+      row.foreach(r =>{
         val key = r.get(0)
         val value = r.get(1)
-        redisFormat.put(key, value)
-      }
-      redisTemplate.opsForValue.multiSet(redisFormat)
-      result.iterator()
+        redisTemplate.opsForValue().set(key, value, 3600*2)
+      })
+      redisFormat.iterator
+    })
+    println("用户测特征处理-存完再打印")
+    userDataTakeRdd.foreach(r => {
+      println(r.get(0) + "\t" + r.get(1))
     })
     println("user.action.count="+userDataTakeRdd.count())
 
@@ -70,7 +72,6 @@ object makedata_02_writeredis {
     val itemDataTakeRdd = sc.parallelize(itemDataTake, 50)
 
     itemDataTakeRdd.mapPartitions(row => {
-      val result = new util.ArrayList[String]()
       val redisTemplate = this.getRedisTemplate()
       val redisFormat = new util.HashMap[String, String]
       for (r <- row) {
@@ -79,7 +80,7 @@ object makedata_02_writeredis {
         redisFormat.put(key, value)
       }
       redisTemplate.opsForValue.multiSet(redisFormat)
-      result.iterator()
+      redisFormat.iterator
     })
     println("item.action.count="+itemDataTakeRdd.count())