zhangbo 1 yıl önce
ebeveyn
işleme
380dace7e7

+ 31 - 30
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -37,50 +37,51 @@ object makedata_02_writeredis {
     val tableUser = "alg_recsys_user_info"
     val partition = "dt=20231220"
 
-    //用户测特征处理
     val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsUrl, tunnelUrl)
-    val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = 100)
-    val userDataTake = userData.take(10)
-    userDataTake.foreach(r=>{
-      println(r.get(0) + "\t" + r.get(1))
-    })
 
-    val userDataTakeRddRun = userData.sample(false, 0.1).mapPartitions(row=>{
-      val redisTemplate = this.getRedisTemplate()
-      val redisFormat = new util.HashMap[String, String]
-      row.foreach(r =>{
-        val key = r.get(0)
-        val value = r.get(1)
-        redisFormat.put(key, value)
-      })
-      redisTemplate.opsForValue.multiSet(redisFormat)
-      redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24*7, TimeUnit.HOURS))
-      redisFormat.iterator
-    })
-    println("user.action.count="+userDataTakeRddRun.count())
-
-    //video测特征处理
-//    println("video测特征处理")
-//    val itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = handleItem, numPartition = 100)
-//    val itemDataTake = itemData.take(10)
-//    itemDataTake.foreach(r => {
+    //用户测特征处理
+//    val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = 100)
+//    val userDataTake = userData.take(10)
+//    userDataTake.foreach(r=>{
 //      println(r.get(0) + "\t" + r.get(1))
 //    })
-//    val itemDataTakeRdd = sc.parallelize(itemDataTake, 50)
 //
-//    val itemDataTakeRddRun = itemDataTakeRdd.mapPartitions(row => {
+//    val userDataTakeRddRun = userData.sample(false, 0.1).mapPartitions(row=>{
 //      val redisTemplate = this.getRedisTemplate()
 //      val redisFormat = new util.HashMap[String, String]
-//      for (r <- row) {
+//      row.foreach(r =>{
 //        val key = r.get(0)
 //        val value = r.get(1)
 //        redisFormat.put(key, value)
-//      }
+//      })
 //      redisTemplate.opsForValue.multiSet(redisFormat)
 //      redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24*7, TimeUnit.HOURS))
 //      redisFormat.iterator
 //    })
-//    println("item.action.count="+itemDataTakeRddRun.count())
+//    println("user.action.count="+userDataTakeRddRun.count())
+
+    //video测特征处理
+    println("video测特征处理")
+    val itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = handleItem, numPartition = 100)
+    val itemDataTake = itemData.take(10)
+    itemDataTake.foreach(r => {
+      println(r.get(0) + "\t" + r.get(1))
+    })
+    val itemDataTakeRdd = sc.parallelize(itemDataTake, 50)
+
+    val itemDataTakeRddRun = itemDataTakeRdd.mapPartitions(row => {
+      val redisTemplate = this.getRedisTemplate()
+      val redisFormat = new util.HashMap[String, String]
+      for (r <- row) {
+        val key = r.get(0)
+        val value = r.get(1)
+        redisFormat.put(key, value)
+      }
+      redisTemplate.opsForValue.multiSet(redisFormat)
+      redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24*7, TimeUnit.HOURS))
+      redisFormat.iterator
+    })
+    println("item.action.count="+itemDataTakeRddRun.count())
   }
 
   def handleUser(record: Record, schema: TableSchema): util.ArrayList[String] = {