|
@@ -2,14 +2,11 @@ package com.aliyun.odps.spark.examples.makedata
|
|
|
|
|
|
import com.aliyun.odps.TableSchema
|
|
|
import com.aliyun.odps.data.Record
|
|
|
-import com.aliyun.odps.spark.examples.myUtils.ParamUtils
|
|
|
-import examples.dataloader.RecommRedisFeatureConstructor
|
|
|
-import org.apache.spark.aliyun.odps.OdpsOps
|
|
|
+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
|
|
|
+import com.google.gson.GsonBuilder
|
|
|
+import examples.dataloader.RequestContextOffline
|
|
|
+import org.apache.hadoop.io.compress.GzipCodec
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
-import org.springframework.data.redis.connection.RedisStandaloneConfiguration
|
|
|
-import org.springframework.data.redis.connection.jedis.JedisConnectionFactory
|
|
|
-import org.springframework.data.redis.core.RedisTemplate
|
|
|
-import org.springframework.data.redis.serializer.StringRedisSerializer
|
|
|
|
|
|
import java.util.concurrent.TimeUnit
|
|
|
import java.util
|
|
@@ -24,110 +21,120 @@ object makedata_02_writeredis {
|
|
|
.getOrCreate()
|
|
|
val sc = spark.sparkContext
|
|
|
|
|
|
- // 读取参数
|
|
|
+ // 1 读取参数
|
|
|
val param = ParamUtils.parseArgs(args)
|
|
|
- val ifUser = param.getOrDefault("ifUser", "True").toBoolean
|
|
|
+ val partitionPrefix = param.getOrElse("partitionPrefix", "dt=")
|
|
|
+ val tablePart = param.getOrElse("tablePart", "16").toInt
|
|
|
+ val ifUser = param.getOrDefault("ifUser", "False").toBoolean
|
|
|
val ifVideo = param.getOrDefault("ifVideo", "False").toBoolean
|
|
|
- val partition = param.getOrDefault("partition", "dt=20231220")
|
|
|
-
|
|
|
-
|
|
|
- // 读取数据库odps
|
|
|
- val accessKeyId = "LTAIWYUujJAm7CbH"
|
|
|
- val accessKeySecret = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
|
|
|
- val odpsUrl = "http://service.odps.aliyun.com/api"
|
|
|
- val tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"
|
|
|
-
|
|
|
+ val date = param.getOrDefault("date", "20231220")
|
|
|
+ val expireDay = param.getOrDefault("expireDay", "2").toInt
|
|
|
+ val ifDebug = param.getOrDefault("ifDebug", "False").toBoolean
|
|
|
+ val ifWriteRedis = param.getOrDefault("ifWriteRedis", "True").toBoolean
|
|
|
+ val partition = partitionPrefix + date
|
|
|
+ val savePathUser = param.getOrDefault("savePathUser", "")
|
|
|
+ val savePathVideo = param.getOrDefault("savePathVideo", "")
|
|
|
+ // userDataTakeRddRun : /dw/recommend/model/
|
|
|
+
|
|
|
+
|
|
|
+ // 2 读取数据库odps
|
|
|
+ val odpsOps = env.getODPS(sc)
|
|
|
val project = "loghubods"
|
|
|
- val tableItem = "alg_recsys_video_info"
|
|
|
val tableUser = "alg_recsys_user_info"
|
|
|
+ val tableItem = "alg_recsys_video_info"
|
|
|
+ val userRedisKeyPrefix = "user_info_4video_"
|
|
|
+ val videoRedisKeyPrefix = "video_info_"
|
|
|
|
|
|
- val odpsOps = OdpsOps(sc, accessKeyId, accessKeySecret, odpsUrl, tunnelUrl)
|
|
|
-
|
|
|
- //用户测特征处理
|
|
|
+ // 3 用户测特征处理
|
|
|
if (ifUser){
|
|
|
- val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = 100)
|
|
|
- val userDataTake = userData.take(10)
|
|
|
- userDataTake.foreach(r => {
|
|
|
- println(r.get(0) + "\t" + r.get(1))
|
|
|
- })
|
|
|
-
|
|
|
- val userDataTakeRddRun = userData.sample(false, 0.01).mapPartitions(row => {
|
|
|
- val redisTemplate = this.getRedisTemplate()
|
|
|
+ println("user特征处理")
|
|
|
+ var userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = tablePart)
|
|
|
+ if (ifDebug){
|
|
|
+ println("user特征处理-debug开启-只保留5条数据-特征数量大于1")
|
|
|
+ val userDataTake = userData.filter(_._3 > 1).take(5)
|
|
|
+ userDataTake.foreach(r=> println(r._1 + "\t" + r._2 + "\t" + r._3))
|
|
|
+ userData = sc.parallelize(userDataTake)
|
|
|
+ }
|
|
|
+ val userDataTakeRddRun = userData.mapPartitions(row => {
|
|
|
+ val redisTemplate = env.getRedisTemplate()
|
|
|
val redisFormat = new util.HashMap[String, String]
|
|
|
- row.foreach(r => {
|
|
|
- val key = r.get(0)
|
|
|
- val value = r.get(1)
|
|
|
- redisFormat.put(key, value)
|
|
|
- })
|
|
|
- redisTemplate.opsForValue.multiSet(redisFormat)
|
|
|
- redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24 * 7, TimeUnit.HOURS))
|
|
|
+ row.foreach{
|
|
|
+ case (key, value, _) =>
|
|
|
+ if (key.nonEmpty){
|
|
|
+ redisFormat.put(userRedisKeyPrefix + key, value)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (ifWriteRedis){
|
|
|
+ redisTemplate.opsForValue.multiSet(redisFormat)
|
|
|
+ redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24 * expireDay, TimeUnit.HOURS))
|
|
|
+ }
|
|
|
redisFormat.iterator
|
|
|
})
|
|
|
println("user.action.count=" + userDataTakeRddRun.count())
|
|
|
+ if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {
|
|
|
+ val savePathPart = savePathUser + "/user/" + partition
|
|
|
+ MyHdfsUtils.delete_hdfs_path(savePathPart)
|
|
|
+ userDataTakeRddRun.saveAsTextFile(savePathPart, classOf[GzipCodec])
|
|
|
+ }
|
|
|
}else{
|
|
|
println("不处理user")
|
|
|
}
|
|
|
+ // 4 video测特征处理
|
|
|
if (ifVideo){
|
|
|
- //video测特征处理
|
|
|
- println("video测特征处理")
|
|
|
- val itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = handleItem, numPartition = 100)
|
|
|
- val itemDataTake = itemData.take(10)
|
|
|
- itemDataTake.foreach(r => {
|
|
|
- println(r.get(0) + "\t" + r.get(1))
|
|
|
- })
|
|
|
+ println("video特征处理")
|
|
|
+ var itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = handleItem, numPartition = tablePart)
|
|
|
+ if (ifDebug) {
|
|
|
+ println("video特征处理-debug开启-只保留5条数据-特征数量大于1")
|
|
|
+ val itemDataTake = itemData.filter(_._3 > 1).take(5)
|
|
|
+ itemDataTake.foreach(r => println(r._1 + "\t" + r._2 + "\t" + r._3))
|
|
|
+ itemData = sc.parallelize(itemDataTake)
|
|
|
+ }
|
|
|
val itemDataTakeRddRun = itemData.mapPartitions(row => {
|
|
|
- val redisTemplate = this.getRedisTemplate()
|
|
|
+ val redisTemplate = env.getRedisTemplate()
|
|
|
val redisFormat = new util.HashMap[String, String]
|
|
|
- for (r <- row) {
|
|
|
- val key = r.get(0)
|
|
|
- val value = r.get(1)
|
|
|
- redisFormat.put(key, value)
|
|
|
+ row.foreach {
|
|
|
+ case (key, value, _) =>
|
|
|
+ if (key.nonEmpty) {
|
|
|
+ redisFormat.put(videoRedisKeyPrefix + key, value)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (ifWriteRedis){
|
|
|
+ redisTemplate.opsForValue.multiSet(redisFormat)
|
|
|
+ redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24 * expireDay, TimeUnit.HOURS))
|
|
|
}
|
|
|
- redisTemplate.opsForValue.multiSet(redisFormat)
|
|
|
- redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24 * 7, TimeUnit.HOURS))
|
|
|
redisFormat.iterator
|
|
|
})
|
|
|
println("item.action.count=" + itemDataTakeRddRun.count())
|
|
|
+ if (savePathVideo.nonEmpty && savePathVideo.startsWith("/dw/recommend/model/")){
|
|
|
+ val savePathPart = savePathVideo + "/video/" + partition
|
|
|
+ MyHdfsUtils.delete_hdfs_path(savePathPart)
|
|
|
+ itemDataTakeRddRun.saveAsTextFile(savePathPart, classOf[GzipCodec])
|
|
|
+ }
|
|
|
}else{
|
|
|
println("不处理video")
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- def handleUser(record: Record, schema: TableSchema): util.ArrayList[String] = {
|
|
|
- val feature = RecommRedisFeatureConstructor.constructUserFeature(record)
|
|
|
- val key = String.format("user:video:%s", feature.getUid)
|
|
|
- val value = feature.getValue
|
|
|
- val kv = new util.ArrayList[String](2)
|
|
|
- kv.add(key)
|
|
|
- kv.add(value)
|
|
|
- kv
|
|
|
}
|
|
|
|
|
|
- def handleItem(record: Record, schema: TableSchema): util.ArrayList[String] = {
|
|
|
- val feature = RecommRedisFeatureConstructor.constructItemFeature(record)
|
|
|
- val key = String.format("video:%s", feature.getKey)
|
|
|
- val value = feature.getValue
|
|
|
- val kv = new util.ArrayList[String](2)
|
|
|
- kv.add(key)
|
|
|
- kv.add(value)
|
|
|
- kv
|
|
|
+ def handleUser(record: Record, schema: TableSchema): Tuple3[String, String, Int] = {
|
|
|
+ val userKey = "mids"
|
|
|
+ val mid = record.getString(userKey)
|
|
|
+ val reqContext: RequestContextOffline = new RequestContextOffline()
|
|
|
+ reqContext.putUserFeature(record)
|
|
|
+ reqContext.featureMap.put("mid", mid)
|
|
|
+ val gson = (new GsonBuilder).serializeSpecialFloatingPointValues.create
|
|
|
+ val value = gson.toJson(reqContext.featureMap)
|
|
|
+ (mid, value, reqContext.featureMap.size())
|
|
|
}
|
|
|
|
|
|
- def getRedisTemplate(): RedisTemplate[String, String] = {
|
|
|
- // redis的公共模版
|
|
|
- val redisSC = new RedisStandaloneConfiguration
|
|
|
- redisSC.setPort(6379)
|
|
|
- redisSC.setPassword("Wqsd@2019")
|
|
|
- redisSC.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com")
|
|
|
- val jedisCF = new JedisConnectionFactory(redisSC)
|
|
|
- jedisCF.afterPropertiesSet()
|
|
|
- val redisTemplate = new RedisTemplate[String, String]
|
|
|
- redisTemplate.setDefaultSerializer(new StringRedisSerializer)
|
|
|
- redisTemplate.setConnectionFactory(jedisCF)
|
|
|
- redisTemplate.afterPropertiesSet()
|
|
|
- redisTemplate
|
|
|
+ def handleItem(record: Record, schema: TableSchema): Tuple3[String, String, Int] = {
|
|
|
+ val videoKey = "videoid"
|
|
|
+ val videoid = record.getString(videoKey)
|
|
|
+ val reqContext: RequestContextOffline = new RequestContextOffline()
|
|
|
+ reqContext.putItemFeature(record)
|
|
|
+ reqContext.featureMap.put("videoid", videoid)
|
|
|
+ val gson = (new GsonBuilder).serializeSpecialFloatingPointValues.create
|
|
|
+ val value = gson.toJson(reqContext.featureMap)
|
|
|
+ (videoid, value, reqContext.featureMap.size())
|
|
|
}
|
|
|
|
|
|
}
|