zhangbo 1 年之前
父節點
當前提交
b4095ee080
共有 1 個文件被更改,包括 22 次插入24 次删除
  1. 22 24
      src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

+ 22 - 24
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -22,9 +22,6 @@ object makedata_02_writeredis {
       .getOrCreate()
     val sc = spark.sparkContext
 
-    // redis的公共模版
-
-
 
     // 读取数据库odps
     val accessKeyId = "LTAIWYUujJAm7CbH"
@@ -40,25 +37,25 @@ object makedata_02_writeredis {
     val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsUrl, tunnelUrl)
 
     //用户测特征处理
-//    val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = 100)
-//    val userDataTake = userData.take(10)
-//    userDataTake.foreach(r=>{
-//      println(r.get(0) + "\t" + r.get(1))
-//    })
-//
-//    val userDataTakeRddRun = userData.sample(false, 0.1).mapPartitions(row=>{
-//      val redisTemplate = this.getRedisTemplate()
-//      val redisFormat = new util.HashMap[String, String]
-//      row.foreach(r =>{
-//        val key = r.get(0)
-//        val value = r.get(1)
-//        redisFormat.put(key, value)
-//      })
-//      redisTemplate.opsForValue.multiSet(redisFormat)
-//      redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24*7, TimeUnit.HOURS))
-//      redisFormat.iterator
-//    })
-//    println("user.action.count="+userDataTakeRddRun.count())
+    val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = 100)
+    val userDataTake = userData.take(10)
+    userDataTake.foreach(r=>{
+      println(r.get(0) + "\t" + r.get(1))
+    })
+
+    val userDataTakeRddRun = userData.sample(false, 0.1).mapPartitions(row=>{
+      val redisTemplate = this.getRedisTemplate()
+      val redisFormat = new util.HashMap[String, String]
+      row.foreach(r =>{
+        val key = r.get(0)
+        val value = r.get(1)
+        redisFormat.put(key, value)
+      })
+      redisTemplate.opsForValue.multiSet(redisFormat)
+      redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24*7, TimeUnit.HOURS))
+      redisFormat.iterator
+    })
+    println("user.action.count="+userDataTakeRddRun.count())
 
     //video测特征处理
     println("video测特征处理")
@@ -86,7 +83,7 @@ object makedata_02_writeredis {
 
   def handleUser(record: Record, schema: TableSchema): util.ArrayList[String] = {
     val feature = RecommRedisFeatureConstructor.constructUserFeature(record)
-    val key = String.format("user_info_%s", feature.getUid)
+    val key = String.format("user:video:%s", feature.getUid)
     val value = feature.getValue
     val kv = new util.ArrayList[String](2)
     kv.add(key)
@@ -96,7 +93,7 @@ object makedata_02_writeredis {
 
   def handleItem(record: Record, schema: TableSchema): util.ArrayList[String] = {
     val feature = RecommRedisFeatureConstructor.constructItemFeature(record)
-    val key = String.format("video_info_%s", feature.getKey)
+    val key = String.format("video:%s", feature.getKey)
     val value = feature.getValue
     val kv = new util.ArrayList[String](2)
     kv.add(key)
@@ -105,6 +102,7 @@ object makedata_02_writeredis {
   }
 
   def getRedisTemplate(): RedisTemplate[String, String] = {
+    // redis的公共模版
     val redisSC = new RedisStandaloneConfiguration
     redisSC.setPort(6379)
     redisSC.setPassword("Wqsd@2019")