zhangbo 1 year ago
parent
commit
44e157a935

+ 98 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_03_deleteredis.scala

@@ -0,0 +1,98 @@
+package com.aliyun.odps.spark.examples.makedata
+
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import examples.dataloader.RecommRedisFeatureConstructor
+import org.apache.spark.aliyun.odps.OdpsOps
+import org.apache.spark.sql.SparkSession
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory
+import org.springframework.data.redis.core.RedisTemplate
+import org.springframework.data.redis.serializer.StringRedisSerializer
+
+import java.util
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConversions._
+
+
+object makedata_03_deleteredis {
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName("WordCount")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+
+    // 读取数据库odps
+    val accessKeyId = "LTAIWYUujJAm7CbH"
+    val accessKeySecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+    val odpsUrl = "http://service.odps.aliyun.com/api"
+    val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
+
+    val project = "loghubods"
+    val tableItem = "alg_recsys_video_info"
+    val tableUser = "alg_recsys_user_info"
+    val partition = "dt=20231220"
+
+    val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsUrl, tunnelUrl)
+
+    //用户测特征处理
+    val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = 100)
+    val userDataTake = userData.take(10)
+    userDataTake.foreach(r=>{
+      println(r.get(0) + "\t" + r.get(1))
+    })
+
+    val userDataTakeRddRun = userData.mapPartitions(row=>{
+      val redisTemplate = this.getRedisTemplate()
+      val redisFormat = new util.HashMap[String, String]
+      row.foreach(r =>{
+        val key = r.get(0)
+        val value = r.get(1)
+        redisFormat.put(key, value)
+      })
+      redisTemplate.delete(redisFormat.keySet())
+      redisFormat.iterator
+    })
+    println("delete.user.action.count="+userDataTakeRddRun.count())
+
+
+  }
+
+  def handleUser(record: Record, schema: TableSchema): util.ArrayList[String] = {
+    val feature = RecommRedisFeatureConstructor.constructUserFeature(record)
+    val key = String.format("user:video:%s", feature.getUid)
+    val value = feature.getValue
+    val kv = new util.ArrayList[String](2)
+    kv.add(key)
+    kv.add(value)
+    kv
+  }
+
+  def handleItem(record: Record, schema: TableSchema): util.ArrayList[String] = {
+    val feature = RecommRedisFeatureConstructor.constructItemFeature(record)
+    val key = String.format("video:%s", feature.getKey)
+    val value = feature.getValue
+    val kv = new util.ArrayList[String](2)
+    kv.add(key)
+    kv.add(value)
+    kv
+  }
+
+  def getRedisTemplate(): RedisTemplate[String, String] = {
+    // redis的公共模版
+    val redisSC = new RedisStandaloneConfiguration
+    redisSC.setPort(6379)
+    redisSC.setPassword("Wqsd@2019")
+    redisSC.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com")
+    val jedisCF = new JedisConnectionFactory(redisSC)
+    jedisCF.afterPropertiesSet()
+    val redisTemplate = new RedisTemplate[String, String]
+    redisTemplate.setDefaultSerializer(new StringRedisSerializer)
+    redisTemplate.setConnectionFactory(jedisCF)
+    redisTemplate.afterPropertiesSet()
+    redisTemplate
+  }
+
+}