zhangbo 1 年之前
父节点
当前提交
7ebf612004

+ 0 - 1
src/main/java/examples/sparksql/SparkVideoFeaToRedisLoader.java

@@ -93,7 +93,6 @@ public class SparkVideoFeaToRedisLoader {
         String tableItemInfo = "alg_recsys_video_info";
         String tableItemInfo = "alg_recsys_video_info";
         String tableUserInfo = "alg_recsys_user_info";
         String tableUserInfo = "alg_recsys_user_info";
 
 
-
         SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
         SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         JavaSparkContext jsc = new JavaSparkContext(sparkConf);
         OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
         OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);

+ 120 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -0,0 +1,120 @@
+package com.aliyun.odps.spark.examples.makedata
+
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import examples.dataloader.RecommRedisFeatureConstructor
+import org.apache.spark.aliyun.odps.OdpsOps
+import org.apache.spark.sql.SparkSession
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory
+import org.springframework.data.redis.core.RedisTemplate
+import org.springframework.data.redis.serializer.StringRedisSerializer
+
+import java.util
+import scala.collection.JavaConversions._
+
+object makedata_02_writeredis {
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName("WordCount")
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // redis的公共模版
+
+
+
+    // 读取数据库odps
+    val accessKeyId = "LTAIWYUujJAm7CbH"
+    val accessKeySecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+    val odpsUrl = "http://service.odps.aliyun.com/api"
+    val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
+
+    val project = "loghubods"
+    val tableItem = "alg_recsys_video_info"
+    val tableUser = "alg_recsys_user_info"
+    val partition = "dt=20231218"
+
+    //用户测特征处理
+    println("用户测特征处理")
+    val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsUrl, tunnelUrl)
+    val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = 100)
+    val userDataTake = userData.take(1000)
+    userDataTake.foreach(r=>{
+      println(r.get(0) + "\t" + r.get(1))
+    })
+    val userDataTakeRdd = sc.parallelize(userDataTake, 50)
+
+    userDataTakeRdd.mapPartitions(row=>{
+      val result = new util.ArrayList[String]()
+      val redisTemplate = this.getRedisTemplate()
+      val redisFormat = new util.HashMap[String, String]
+      for (r <- row){
+        val key = r.get(0)
+        val value = r.get(1)
+        redisFormat.put(key, value)
+      }
+      redisTemplate.opsForValue.multiSet(redisFormat)
+      result.iterator()
+    })
+
+    //video测特征处理
+    println("video测特征处理")
+    val itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = handleItem, numPartition = 100)
+    val itemDataTake = itemData.take(1000)
+    itemDataTake.foreach(r => {
+      println(r.get(0) + "\t" + r.get(1))
+    })
+    val itemDataTakeRdd = sc.parallelize(itemDataTake, 50)
+
+    itemDataTakeRdd.mapPartitions(row => {
+      val result = new util.ArrayList[String]()
+      val redisTemplate = this.getRedisTemplate()
+      val redisFormat = new util.HashMap[String, String]
+      for (r <- row) {
+        val key = r.get(0)
+        val value = r.get(1)
+        redisFormat.put(key, value)
+      }
+      redisTemplate.opsForValue.multiSet(redisFormat)
+      result.iterator()
+    })
+
+  }
+
+  def handleUser(record: Record, schema: TableSchema): util.ArrayList[String] = {
+    val feature = RecommRedisFeatureConstructor.constructUserFeature(record)
+    val key = String.format("user_info_%s", feature.getKey)
+    val value = feature.getValue
+    val kv = new util.ArrayList[String](2)
+    kv.add(key)
+    kv.add(value)
+    kv
+  }
+
+  def handleItem(record: Record, schema: TableSchema): util.ArrayList[String] = {
+    val feature = RecommRedisFeatureConstructor.constructItemFeature(record)
+    val key = String.format("video_info_%s", feature.getKey)
+    val value = feature.getValue
+    val kv = new util.ArrayList[String](2)
+    kv.add(key)
+    kv.add(value)
+    kv
+  }
+
+  def getRedisTemplate(): RedisTemplate[String, String] = {
+    val redisSC = new RedisStandaloneConfiguration
+    redisSC.setPort(6379)
+    redisSC.setPassword("Wqsd@2019")
+    redisSC.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com")
+    val jedisCF = new JedisConnectionFactory(redisSC)
+    jedisCF.afterPropertiesSet()
+    val redisTemplate = new RedisTemplate[String, String]
+    redisTemplate.setDefaultSerializer(new StringRedisSerializer)
+    redisTemplate.setConnectionFactory(jedisCF)
+    redisTemplate.afterPropertiesSet()
+    redisTemplate
+  }
+
+}