zhangbo 1 年間 前
コミット
6a8b9a01dd

+ 2 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_08_item2redis.scala

@@ -66,7 +66,8 @@ object makedata_08_item2redis {
         val resultNew = new JSONObject
         originFeatureName.foreach(r => {
           if (originFeatureMap.containsKey(r)) {
-            resultNew.put(r, originFeatureMap.get(r))
+            val v = originFeatureMap.get(r).get
+            resultNew.put(r, v)
           }
         })
         (videoid, resultNew.toString())

+ 176 - 248
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_09_user2redis.scala

@@ -1,248 +1,176 @@
-//package com.aliyun.odps.spark.examples.makedata
-//
-//import com.aliyun.odps.TableSchema
-//import com.aliyun.odps.data.Record
-//import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
-//import com.google.gson.GsonBuilder
-//import examples.dataloader.RequestContextOffline
-//import org.apache.hadoop.io.compress.GzipCodec
-//import org.apache.spark.sql.SparkSession
-//
-//import java.util
-//import java.util.concurrent.TimeUnit
-//import scala.collection.JavaConversions._
-//
-//
-//object makedata_09_user2redis {
-//  def main(args: Array[String]) {
-//    val spark = SparkSession
-//      .builder()
-//      .appName(this.getClass.getName)
-//      .getOrCreate()
-//    val sc = spark.sparkContext
-//
-//    // 1 读取参数
-//    val param = ParamUtils.parseArgs(args)
-//    val partitionPrefix = param.getOrElse("partitionPrefix", "dt=")
-//    val tablePart = param.getOrElse("tablePart", "64").toInt
-//    val ifUser = param.getOrDefault("ifUser", "False").toBoolean
-//    val ifVideo = param.getOrDefault("ifVideo", "False").toBoolean
-//    val date = param.getOrDefault("date", "20231220")
-//    val expireDay = param.getOrDefault("expireDay", "2").toInt
-//    val ifDebug = param.getOrDefault("ifDebug", "False").toBoolean
-//    val ifDeleteRedisUser = param.getOrDefault("ifDeleteRedisUser", "False").toBoolean
-//    val ifWriteRedisUser = param.getOrDefault("ifWriteRedisUser", "False").toBoolean
-//    val ifWriteRedis = param.getOrDefault("ifWriteRedis", "True").toBoolean
-//    val partition = partitionPrefix + date
-//    val savePathUser = param.getOrDefault("savePathUser", "")
-//    val savePathVideo = param.getOrDefault("savePathVideo", "")
-//    val userSampleIDs = param.getOrDefault("userSampleIDs", "")
-//    val sampleRate = param.getOrDefault("sampleRate", "1.0").toDouble
-////    val userSampleIDsPathFix = param.getOrDefault("userSampleIDsPathFix", "")
-//    //  /dw/recommend/model/feature/
-//
-//
-//    // 2 读取数据库odps
-//    val odpsOps = env.getODPS(sc)
-//    val project = "loghubods"
-//    val tableUser = "alg_recsys_user_info"
-//    val tableItem = "alg_recsys_video_info"
-//    val userRedisKeyPrefix = "user_info_4video_"
-//    val videoRedisKeyPrefix = "video_info_"
-//
-//
-//
-//    // 3 用户测特征处理
-//    if (ifUser){
-//      println("user特征处理")
-//      var userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = tablePart)
-//        .filter {
-//          case (mid, fea, feaSize) =>
-//            mid.nonEmpty && fea.nonEmpty && feaSize > 0
-//        }
-//      if (userSampleIDs.nonEmpty){
-//        val IDs = userSampleIDs.split(",").filter(_.nonEmpty).map(_.toInt).toList
-//        userData = userData.filter(r => IDs.contains(r._1.hashCode % 10))
-//      }
-//      if (ifDebug){
-//        println("user特征处理-debug开启-只保留5条数据-特征数量大于1")
-//        val userDataTake = userData.take(5)
-//        userDataTake.foreach(r=> println(r._1 + "\t" + r._2 + "\t" + r._3))
-//        userData = sc.parallelize(userDataTake)
-//      }
-//      if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {
-//        var savePathPart = savePathUser + "/" + partition
-//        if (userSampleIDs.nonEmpty) {
-//          savePathPart = savePathPart + "_" + userSampleIDs
-//        }
-//        MyHdfsUtils.delete_hdfs_path(savePathPart)
-//        userData.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
-//      }
-//      println("user.action.count=" + userData.count())
-//    } else {
-//      println("不处理user")
-//    }
-//
-//    if (ifDeleteRedisUser){
-//      println("user redis 删除")
-//      var savePathPart = savePathUser + "/" + partition
-//      if (userSampleIDs.nonEmpty) {
-//        savePathPart = savePathPart + "_" + userSampleIDs
-//      }
-//      println("读取数据路径:" + savePathPart)
-//      val userDataRead = sc.textFile(savePathPart)
-//      val userDataRead2 = userDataRead.filter(_.split("\t").length >= 2).map(r => {
-//        val rList = r.split("\t")
-//        (rList(0), rList(1))
-//      })
-//      println("预计删除数据量:" + userDataRead2.count())
-//      val userDataTakeRddRun = userDataRead2.mapPartitions(row => {
-//        val redisFormat = new util.HashMap[String, String]
-//        val redisTemplate = env.getRedisTemplate()
-//        var i = 1
-//        row.foreach {
-//          case (key, value) =>
-//            if (key.nonEmpty) {
-//              redisFormat.put(userRedisKeyPrefix + key, value)
-//            }
-//            if (i % 1000 == 0) {
-//              redisTemplate.delete(redisFormat.map(_._1))
-//              redisFormat.clear()
-//            }
-//            i = i + 1
-//        }
-//        redisTemplate.delete(redisFormat.map(_._1))
-//        redisFormat.clear()
-//        redisFormat.iterator
-//      })
-//      println("delete redis.count=" + userDataTakeRddRun.count())
-//    } else {
-//      println("不处理user的redis删除")
-//    }
-//
-//    if (ifWriteRedisUser){
-//      println("user redis 写入")
-//      var savePathPart = savePathUser + "/" + partition
-//      if (userSampleIDs.nonEmpty) {
-//        savePathPart = savePathPart + "_" + userSampleIDs
-//      }
-//      val userDataRead = sc.textFile(savePathPart).filter(_.split("\t").length >= 2)
-//        .sample(false, sampleRate)
-//        .map(r => {
-//        val rList = r.split("\t")
-//        (rList(0), rList(1))
-//      })
-//      val userDataTakeRddRun = userDataRead.mapPartitions(row => {
-//        val redisFormat = new util.HashMap[String, String]
-//        val redisTemplate = env.getRedisTemplate()
-//        var i = 1
-//        row.foreach {
-//          case (key, value) =>
-//            if (key.nonEmpty) {
-//              redisFormat.put(userRedisKeyPrefix + key, value)
-//            }
-//            if (i % 1000 == 0) {
-//              redisTemplate.opsForValue.multiSet(redisFormat)
-//              redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
-//              redisFormat.clear()
-//            }
-//            i = i + 1
-//        }
-//        redisTemplate.opsForValue.multiSet(redisFormat)
-//        redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
-//        redisFormat.clear()
-//        redisFormat.iterator
-//      })
-//      println("put in redis.count=" + userDataTakeRddRun.count())
-//    } else {
-//      println("不处理user的redis写入")
-//    }
-//
-//
-//
-//
-//    // 4 video测特征处理
-//    if (ifVideo){
-//      println("video特征处理")
-//      val handleItemFunction: (Record, TableSchema) => Tuple3[String, String, Int] = handleItem(_, _, date)
-//      var itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = handleItemFunction, numPartition = tablePart)
-//      if (ifDebug) {
-//        println("video特征处理-debug开启-只保留5条数据-特征数量大于1")
-//        val itemDataTake = itemData.filter(_._3 > 1).take(5)
-//        itemDataTake.foreach(r => println(r._1 + "\t" + r._2 + "\t" + r._3))
-//        itemData = sc.parallelize(itemDataTake)
-//      }
-//      val itemDataTakeRddRun = itemData.mapPartitions(row => {
-//        val redisFormat = new util.HashMap[String, String]
-//        val redisTemplate = env.getRedisTemplate()
-//        row.foreach {
-//          case (key, value, _) =>
-//            if (key.nonEmpty && value != null && value.nonEmpty) {
-//              redisFormat.put(videoRedisKeyPrefix + key, value)
-//              if (ifWriteRedis) {
-//                redisTemplate.opsForValue.set(videoRedisKeyPrefix + key, value, 24 * expireDay, TimeUnit.HOURS)
-//              }
-//            }
-//        }
-////        if (ifWriteRedis){
-////          redisTemplate.opsForValue.multiSet(redisFormat)
-////          redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24 * expireDay, TimeUnit.HOURS))
-////        }
-//        redisFormat.iterator
-//      })
-//      if (savePathVideo.nonEmpty && savePathVideo.startsWith("/dw/recommend/model/")){
-//        val savePathPart = savePathVideo + "/" + partition
-//        MyHdfsUtils.delete_hdfs_path(savePathPart)
-//        itemDataTakeRddRun.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
-//      }
-//      println("item.action.count=" + itemDataTakeRddRun.count())
-//    }else{
-//      println("不处理video")
-//    }
-//  }
-//
-//  def handleUser(record: Record, schema: TableSchema): Tuple3[String, String, Int] = {
-//    val userKey = "mids"
-//    val mid = record.getString(userKey)
-//    val reqContext: RequestContextOffline = new RequestContextOffline()
-//    reqContext.putUserFeature(record)
-//    // reqContext.featureMap.put("mid", mid)
-//    val gson = (new GsonBuilder).serializeSpecialFloatingPointValues.create
-//    val value = gson.toJson(reqContext.featureMap)
-//    (mid, value, reqContext.featureMap.size())
-//  }
-//
-//  def handleItem(record: Record, schema: TableSchema, date:String): Tuple3[String, String, Int] = {
-//    val videoKey = "videoid"
-//    val videoid = record.getBigint(videoKey).toString
-//    val reqContext: RequestContextOffline = new RequestContextOffline()
-//
-//    //---------todo 有特征不在表里 临时修复---------
-////    val i_title_len =  if (record.getString("title") != null) record.getString("title").length.toString else ""
-////    val i_days_since_upload = if (record.getDatetime("gmt_create") != null){
-////      val format = new SimpleDateFormat("yyyyMMdd")
-////      val dateOld = format.format(record.getDatetime("gmt_create"))
-////      val dayDiff = MyDateUtils.calculateDateDifference(dateOld, date)
-////      dayDiff.toString
-////    }else{
-////      ""
-////    }
-////    if (i_title_len.nonEmpty){
-////      val d = reqContext.bucketRatioFeature(i_title_len.toDouble)
-////      reqContext.featureMap.put("i_title_len", d.toString)
-////    }
-////    if (i_days_since_upload.nonEmpty) {
-////      val d = reqContext.bucketRatioFeature(i_days_since_upload.toDouble)
-////      reqContext.featureMap.put("i_days_since_upload", d.toString)
-////    }
-//    //------修复完成---------
-//
-//    reqContext.putItemFeature(record)
-//     reqContext.featureMap.put("videoid", videoid)
-//
-//    val gson = (new GsonBuilder).serializeSpecialFloatingPointValues.create
-//    val value = gson.toJson(reqContext.featureMap)
-//    (videoid, value, reqContext.featureMap.size())
-//  }
-//
-//}
+package com.aliyun.odps.spark.examples.makedata
+
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
+import com.google.gson.GsonBuilder
+import examples.dataloader.RequestContextOffline
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import java.util
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConversions._
+
+
+object makedata_09_user2redis {
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val partitionPrefix = param.getOrElse("partitionPrefix", "dt=")
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val ifUser = param.getOrDefault("ifUser", "False").toBoolean
+    val ifVideo = param.getOrDefault("ifVideo", "False").toBoolean
+    val date = param.getOrDefault("date", "20231220")
+    val expireDay = param.getOrDefault("expireDay", "2").toInt
+    val ifDebug = param.getOrDefault("ifDebug", "False").toBoolean
+    val ifDeleteRedisUser = param.getOrDefault("ifDeleteRedisUser", "False").toBoolean
+    val ifWriteRedisUser = param.getOrDefault("ifWriteRedisUser", "False").toBoolean
+    val ifWriteRedis = param.getOrDefault("ifWriteRedis", "True").toBoolean
+    val partition = partitionPrefix + date
+    val savePathUser = param.getOrDefault("savePathUser", "")
+    val savePathVideo = param.getOrDefault("savePathVideo", "")
+    val userSampleIDs = param.getOrDefault("userSampleIDs", "")
+    val sampleRate = param.getOrDefault("sampleRate", "1.0").toDouble
+
+
+    // 2 读取数据库odps
+    val odpsOps = env.getODPS(sc)
+    val project = "loghubods"
+    val tableUser = "alg_recsys_user_info"
+    val userRedisKeyPrefix = "user_info_4video_"
+
+
+
+    // 3 用户测特征处理
+    if (ifUser){
+      println("user特征处理")
+      var userData = odpsOps.readTable(project = project, table = tableUser, partition = partition, transfer = handleUser, numPartition = tablePart)
+        .filter {
+          case (mid, fea, feaSize) =>
+            mid.nonEmpty && fea.nonEmpty && feaSize > 0
+        }
+      if (userSampleIDs.nonEmpty){
+        val IDs = userSampleIDs.split(",").filter(_.nonEmpty).map(_.toInt).toList
+        userData = userData.filter(r => IDs.contains(r._1.hashCode % 10))
+      }
+      if (ifDebug){
+        println("user特征处理-debug开启-只保留5条数据-特征数量大于1")
+        val userDataTake = userData.take(5)
+        userDataTake.foreach(r=> println(r._1 + "\t" + r._2 + "\t" + r._3))
+        userData = sc.parallelize(userDataTake)
+      }
+      if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {
+        var savePathPart = savePathUser + "/" + partition
+        if (userSampleIDs.nonEmpty) {
+          savePathPart = savePathPart + "_" + userSampleIDs
+        }
+        MyHdfsUtils.delete_hdfs_path(savePathPart)
+        userData.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
+      }
+      println("user.action.count=" + userData.count())
+    } else {
+      println("不处理user")
+    }
+
+    if (ifDeleteRedisUser){
+      println("user redis 删除")
+      var savePathPart = savePathUser + "/" + partition
+      if (userSampleIDs.nonEmpty) {
+        savePathPart = savePathPart + "_" + userSampleIDs
+      }
+      println("读取数据路径:" + savePathPart)
+      val userDataRead = sc.textFile(savePathPart)
+      val userDataRead2 = userDataRead.filter(_.split("\t").length >= 2).map(r => {
+        val rList = r.split("\t")
+        (rList(0), rList(1))
+      })
+      println("预计删除数据量:" + userDataRead2.count())
+      val userDataTakeRddRun = userDataRead2.mapPartitions(row => {
+        val redisFormat = new util.HashMap[String, String]
+        val redisTemplate = env.getRedisTemplate()
+        var i = 1
+        row.foreach {
+          case (key, value) =>
+            if (key.nonEmpty) {
+              redisFormat.put(userRedisKeyPrefix + key, value)
+            }
+            if (i % 1000 == 0) {
+              redisTemplate.delete(redisFormat.map(_._1))
+              redisFormat.clear()
+            }
+            i = i + 1
+        }
+        redisTemplate.delete(redisFormat.map(_._1))
+        redisFormat.clear()
+        redisFormat.iterator
+      })
+      println("delete redis.count=" + userDataTakeRddRun.count())
+    } else {
+      println("不处理user的redis删除")
+    }
+
+    if (ifWriteRedisUser){
+      println("user redis 写入")
+      var savePathPart = savePathUser + "/" + partition
+      if (userSampleIDs.nonEmpty) {
+        savePathPart = savePathPart + "_" + userSampleIDs
+      }
+      val userDataRead = sc.textFile(savePathPart).filter(_.split("\t").length >= 2)
+        .sample(false, sampleRate)
+        .map(r => {
+        val rList = r.split("\t")
+        (rList(0), rList(1))
+      })
+      val userDataTakeRddRun = userDataRead.mapPartitions(row => {
+        val redisFormat = new util.HashMap[String, String]
+        val redisTemplate = env.getRedisTemplate()
+        var i = 1
+        row.foreach {
+          case (key, value) =>
+            if (key.nonEmpty) {
+              redisFormat.put(userRedisKeyPrefix + key, value)
+            }
+            if (i % 1000 == 0) {
+              redisTemplate.opsForValue.multiSet(redisFormat)
+              redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
+              redisFormat.clear()
+            }
+            i = i + 1
+        }
+        redisTemplate.opsForValue.multiSet(redisFormat)
+        redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
+        redisFormat.clear()
+        redisFormat.iterator
+      })
+      println("put in redis.count=" + userDataTakeRddRun.count())
+    } else {
+      println("不处理user的redis写入")
+    }
+
+
+
+  }
+
+  def handleUser(record: Record, schema: TableSchema): Tuple3[String, String, Int] = {
+    val userKey = "mids"
+    val mid = record.getString(userKey)
+    val reqContext: RequestContextOffline = new RequestContextOffline()
+    reqContext.putUserFeature(record)
+    // reqContext.featureMap.put("mid", mid)
+    val gson = (new GsonBuilder).serializeSpecialFloatingPointValues.create
+    val value = gson.toJson(reqContext.featureMap)
+    (mid, value, reqContext.featureMap.size())
+  }
+
+  def func(record: Record, schema: TableSchema): Record = {
+    record
+  }
+
+}