|
@@ -0,0 +1,248 @@
|
|
|
+//package com.aliyun.odps.spark.examples.makedata
|
|
|
+//
|
|
|
+//import com.aliyun.odps.TableSchema
|
|
|
+//import com.aliyun.odps.data.Record
|
|
|
+//import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
|
|
|
+//import com.google.gson.GsonBuilder
|
|
|
+//import examples.dataloader.RequestContextOffline
|
|
|
+//import org.apache.hadoop.io.compress.GzipCodec
|
|
|
+//import org.apache.spark.sql.SparkSession
|
|
|
+//
|
|
|
+//import java.util
|
|
|
+//import java.util.concurrent.TimeUnit
|
|
|
+//import scala.collection.JavaConversions._
|
|
|
+//
|
|
|
+//
|
|
|
+//object makedata_09_user2redis {
|
|
|
+// def main(args: Array[String]) {
|
|
|
+// val spark = SparkSession
|
|
|
+// .builder()
|
|
|
+// .appName(this.getClass.getName)
|
|
|
+// .getOrCreate()
|
|
|
+// val sc = spark.sparkContext
|
|
|
+//
|
|
|
+// // 1 读取参数
|
|
|
+// val param = ParamUtils.parseArgs(args)
|
|
|
+// val partitionPrefix = param.getOrElse("partitionPrefix", "dt=")
|
|
|
+// val tablePart = param.getOrElse("tablePart", "64").toInt
|
|
|
+// val ifUser = param.getOrDefault("ifUser", "False").toBoolean
|
|
|
+// val ifVideo = param.getOrDefault("ifVideo", "False").toBoolean
|
|
|
+// val date = param.getOrDefault("date", "20231220")
|
|
|
+// val expireDay = param.getOrDefault("expireDay", "2").toInt
|
|
|
+// val ifDebug = param.getOrDefault("ifDebug", "False").toBoolean
|
|
|
+// val ifDeleteRedisUser = param.getOrDefault("ifDeleteRedisUser", "False").toBoolean
|
|
|
+// val ifWriteRedisUser = param.getOrDefault("ifWriteRedisUser", "False").toBoolean
|
|
|
+// val ifWriteRedis = param.getOrDefault("ifWriteRedis", "True").toBoolean
|
|
|
+// val partition = partitionPrefix + date
|
|
|
+// val savePathUser = param.getOrDefault("savePathUser", "")
|
|
|
+// val savePathVideo = param.getOrDefault("savePathVideo", "")
|
|
|
+// val userSampleIDs = param.getOrDefault("userSampleIDs", "")
|
|
|
+// val sampleRate = param.getOrDefault("sampleRate", "1.0").toDouble
|
|
|
+//// val userSampleIDsPathFix = param.getOrDefault("userSampleIDsPathFix", "")
|
|
|
+// // /dw/recommend/model/feature/
|
|
|
+//
|
|
|
+//
|
|
|
+// // 2 读取数据库odps
|
|
|
+// val odpsOps = env.getODPS(sc)
|
|
|
+// val project = "loghubods"
|
|
|
+// val tableUser = "alg_recsys_user_info"
|
|
|
+// val tableItem = "alg_recsys_video_info"
|
|
|
+// val userRedisKeyPrefix = "user_info_4video_"
|
|
|
+// val videoRedisKeyPrefix = "video_info_"
|
|
|
+//
|
|
|
+//
|
|
|
+//
|
|
|
+// // 3 用户测特征处理
|
|
|
+// if (ifUser){
|
|
|
+// println("user特征处理")
|
|
|
+// var userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = tablePart)
|
|
|
+// .filter {
|
|
|
+// case (mid, fea, feaSize) =>
|
|
|
+// mid.nonEmpty && fea.nonEmpty && feaSize > 0
|
|
|
+// }
|
|
|
+// if (userSampleIDs.nonEmpty){
|
|
|
+// val IDs = userSampleIDs.split(",").filter(_.nonEmpty).map(_.toInt).toList
|
|
|
+// userData = userData.filter(r => IDs.contains(r._1.hashCode % 10))
|
|
|
+// }
|
|
|
+// if (ifDebug){
|
|
|
+// println("user特征处理-debug开启-只保留5条数据-特征数量大于1")
|
|
|
+// val userDataTake = userData.take(5)
|
|
|
+// userDataTake.foreach(r=> println(r._1 + "\t" + r._2 + "\t" + r._3))
|
|
|
+// userData = sc.parallelize(userDataTake)
|
|
|
+// }
|
|
|
+// if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {
|
|
|
+// var savePathPart = savePathUser + "/" + partition
|
|
|
+// if (userSampleIDs.nonEmpty) {
|
|
|
+// savePathPart = savePathPart + "_" + userSampleIDs
|
|
|
+// }
|
|
|
+// MyHdfsUtils.delete_hdfs_path(savePathPart)
|
|
|
+// userData.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
|
|
|
+// }
|
|
|
+// println("user.action.count=" + userData.count())
|
|
|
+// } else {
|
|
|
+// println("不处理user")
|
|
|
+// }
|
|
|
+//
|
|
|
+// if (ifDeleteRedisUser){
|
|
|
+// println("user redis 删除")
|
|
|
+// var savePathPart = savePathUser + "/" + partition
|
|
|
+// if (userSampleIDs.nonEmpty) {
|
|
|
+// savePathPart = savePathPart + "_" + userSampleIDs
|
|
|
+// }
|
|
|
+// println("读取数据路径:" + savePathPart)
|
|
|
+// val userDataRead = sc.textFile(savePathPart)
|
|
|
+// val userDataRead2 = userDataRead.filter(_.split("\t").length >= 2).map(r => {
|
|
|
+// val rList = r.split("\t")
|
|
|
+// (rList(0), rList(1))
|
|
|
+// })
|
|
|
+// println("预计删除数据量:" + userDataRead2.count())
|
|
|
+// val userDataTakeRddRun = userDataRead2.mapPartitions(row => {
|
|
|
+// val redisFormat = new util.HashMap[String, String]
|
|
|
+// val redisTemplate = env.getRedisTemplate()
|
|
|
+// var i = 1
|
|
|
+// row.foreach {
|
|
|
+// case (key, value) =>
|
|
|
+// if (key.nonEmpty) {
|
|
|
+// redisFormat.put(userRedisKeyPrefix + key, value)
|
|
|
+// }
|
|
|
+// if (i % 1000 == 0) {
|
|
|
+// redisTemplate.delete(redisFormat.map(_._1))
|
|
|
+// redisFormat.clear()
|
|
|
+// }
|
|
|
+// i = i + 1
|
|
|
+// }
|
|
|
+// redisTemplate.delete(redisFormat.map(_._1))
|
|
|
+// redisFormat.clear()
|
|
|
+// redisFormat.iterator
|
|
|
+// })
|
|
|
+// println("delete redis.count=" + userDataTakeRddRun.count())
|
|
|
+// } else {
|
|
|
+// println("不处理user的redis删除")
|
|
|
+// }
|
|
|
+//
|
|
|
+// if (ifWriteRedisUser){
|
|
|
+// println("user redis 写入")
|
|
|
+// var savePathPart = savePathUser + "/" + partition
|
|
|
+// if (userSampleIDs.nonEmpty) {
|
|
|
+// savePathPart = savePathPart + "_" + userSampleIDs
|
|
|
+// }
|
|
|
+// val userDataRead = sc.textFile(savePathPart).filter(_.split("\t").length >= 2)
|
|
|
+// .sample(false, sampleRate)
|
|
|
+// .map(r => {
|
|
|
+// val rList = r.split("\t")
|
|
|
+// (rList(0), rList(1))
|
|
|
+// })
|
|
|
+// val userDataTakeRddRun = userDataRead.mapPartitions(row => {
|
|
|
+// val redisFormat = new util.HashMap[String, String]
|
|
|
+// val redisTemplate = env.getRedisTemplate()
|
|
|
+// var i = 1
|
|
|
+// row.foreach {
|
|
|
+// case (key, value) =>
|
|
|
+// if (key.nonEmpty) {
|
|
|
+// redisFormat.put(userRedisKeyPrefix + key, value)
|
|
|
+// }
|
|
|
+// if (i % 1000 == 0) {
|
|
|
+// redisTemplate.opsForValue.multiSet(redisFormat)
|
|
|
+// redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
|
|
|
+// redisFormat.clear()
|
|
|
+// }
|
|
|
+// i = i + 1
|
|
|
+// }
|
|
|
+// redisTemplate.opsForValue.multiSet(redisFormat)
|
|
|
+// redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
|
|
|
+// redisFormat.clear()
|
|
|
+// redisFormat.iterator
|
|
|
+// })
|
|
|
+// println("put in redis.count=" + userDataTakeRddRun.count())
|
|
|
+// } else {
|
|
|
+// println("不处理user的redis写入")
|
|
|
+// }
|
|
|
+//
|
|
|
+//
|
|
|
+//
|
|
|
+//
|
|
|
+// // 4 video测特征处理
|
|
|
+// if (ifVideo){
|
|
|
+// println("video特征处理")
|
|
|
+// val handleItemFunction: (Record, TableSchema) => Tuple3[String, String, Int] = handleItem(_, _, date)
|
|
|
+// var itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = handleItemFunction, numPartition = tablePart)
|
|
|
+// if (ifDebug) {
|
|
|
+// println("video特征处理-debug开启-只保留5条数据-特征数量大于1")
|
|
|
+// val itemDataTake = itemData.filter(_._3 > 1).take(5)
|
|
|
+// itemDataTake.foreach(r => println(r._1 + "\t" + r._2 + "\t" + r._3))
|
|
|
+// itemData = sc.parallelize(itemDataTake)
|
|
|
+// }
|
|
|
+// val itemDataTakeRddRun = itemData.mapPartitions(row => {
|
|
|
+// val redisFormat = new util.HashMap[String, String]
|
|
|
+// val redisTemplate = env.getRedisTemplate()
|
|
|
+// row.foreach {
|
|
|
+// case (key, value, _) =>
|
|
|
+// if (key.nonEmpty && value != null && value.nonEmpty) {
|
|
|
+// redisFormat.put(videoRedisKeyPrefix + key, value)
|
|
|
+// if (ifWriteRedis) {
|
|
|
+// redisTemplate.opsForValue.set(videoRedisKeyPrefix + key, value, 24 * expireDay, TimeUnit.HOURS)
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//// if (ifWriteRedis){
|
|
|
+//// redisTemplate.opsForValue.multiSet(redisFormat)
|
|
|
+//// redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24 * expireDay, TimeUnit.HOURS))
|
|
|
+//// }
|
|
|
+// redisFormat.iterator
|
|
|
+// })
|
|
|
+// if (savePathVideo.nonEmpty && savePathVideo.startsWith("/dw/recommend/model/")){
|
|
|
+// val savePathPart = savePathVideo + "/" + partition
|
|
|
+// MyHdfsUtils.delete_hdfs_path(savePathPart)
|
|
|
+// itemDataTakeRddRun.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
|
|
|
+// }
|
|
|
+// println("item.action.count=" + itemDataTakeRddRun.count())
|
|
|
+// }else{
|
|
|
+// println("不处理video")
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// def handleUser(record: Record, schema: TableSchema): Tuple3[String, String, Int] = {
|
|
|
+// val userKey = "mids"
|
|
|
+// val mid = record.getString(userKey)
|
|
|
+// val reqContext: RequestContextOffline = new RequestContextOffline()
|
|
|
+// reqContext.putUserFeature(record)
|
|
|
+// // reqContext.featureMap.put("mid", mid)
|
|
|
+// val gson = (new GsonBuilder).serializeSpecialFloatingPointValues.create
|
|
|
+// val value = gson.toJson(reqContext.featureMap)
|
|
|
+// (mid, value, reqContext.featureMap.size())
|
|
|
+// }
|
|
|
+//
|
|
|
+// def handleItem(record: Record, schema: TableSchema, date:String): Tuple3[String, String, Int] = {
|
|
|
+// val videoKey = "videoid"
|
|
|
+// val videoid = record.getBigint(videoKey).toString
|
|
|
+// val reqContext: RequestContextOffline = new RequestContextOffline()
|
|
|
+//
|
|
|
+// //---------todo 有特征不在表里 临时修复---------
|
|
|
+//// val i_title_len = if (record.getString("title") != null) record.getString("title").length.toString else ""
|
|
|
+//// val i_days_since_upload = if (record.getDatetime("gmt_create") != null){
|
|
|
+//// val format = new SimpleDateFormat("yyyyMMdd")
|
|
|
+//// val dateOld = format.format(record.getDatetime("gmt_create"))
|
|
|
+//// val dayDiff = MyDateUtils.calculateDateDifference(dateOld, date)
|
|
|
+//// dayDiff.toString
|
|
|
+//// }else{
|
|
|
+//// ""
|
|
|
+//// }
|
|
|
+//// if (i_title_len.nonEmpty){
|
|
|
+//// val d = reqContext.bucketRatioFeature(i_title_len.toDouble)
|
|
|
+//// reqContext.featureMap.put("i_title_len", d.toString)
|
|
|
+//// }
|
|
|
+//// if (i_days_since_upload.nonEmpty) {
|
|
|
+//// val d = reqContext.bucketRatioFeature(i_days_since_upload.toDouble)
|
|
|
+//// reqContext.featureMap.put("i_days_since_upload", d.toString)
|
|
|
+//// }
|
|
|
+// //------修复完成---------
|
|
|
+//
|
|
|
+// reqContext.putItemFeature(record)
|
|
|
+// reqContext.featureMap.put("videoid", videoid)
|
|
|
+//
|
|
|
+// val gson = (new GsonBuilder).serializeSpecialFloatingPointValues.create
|
|
|
+// val value = gson.toJson(reqContext.featureMap)
|
|
|
+// (videoid, value, reqContext.featureMap.size())
|
|
|
+// }
|
|
|
+//
|
|
|
+//}
|