|
@@ -0,0 +1,156 @@
|
|
|
+package com.aliyun.odps.spark.examples.makedata
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject
|
|
|
+import com.aliyun.odps.TableSchema
|
|
|
+import com.aliyun.odps.data.Record
|
|
|
+import com.aliyun.odps.spark.examples.makedata.makedata_06_originData.getFeatureFromSet
|
|
|
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
|
|
|
+import com.google.gson.GsonBuilder
|
|
|
+import examples.dataloader.RequestContextOffline
|
|
|
+import org.apache.commons.lang.time.DateUtils
|
|
|
+import org.apache.hadoop.io.compress.GzipCodec
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+
|
|
|
+import java.util
|
|
|
+import java.util.concurrent.TimeUnit
|
|
|
+import scala.collection.JavaConversions._
|
|
|
+
|
|
|
+
|
|
|
+object makedata_09_user2redis_freq {
|
|
|
+ def main(args: Array[String]) {
|
|
|
+ val spark = SparkSession
|
|
|
+ .builder()
|
|
|
+ .appName(this.getClass.getName)
|
|
|
+ .getOrCreate()
|
|
|
+ val sc = spark.sparkContext
|
|
|
+
|
|
|
+ // 1 读取参数
|
|
|
+ val param = ParamUtils.parseArgs(args)
|
|
|
+ val partitionPrefix = param.getOrElse("partitionPrefix", "dt=")
|
|
|
+ val tablePart = param.getOrElse("tablePart", "64").toInt
|
|
|
+ val date = param.getOrDefault("date", "20231220")
|
|
|
+ val expireDay = param.getOrDefault("expireDay", "3").toInt
|
|
|
+ val ifWriteRedisUser = param.getOrDefault("ifWriteRedisUser", "False").toBoolean
|
|
|
+ val partition = partitionPrefix + date
|
|
|
+ val savePathUser = param.getOrDefault("savePathUser", "")
|
|
|
+ val midDays = param.getOrDefault("midDays", "7").toInt
|
|
|
+
|
|
|
+ //2 读取数据库odps
|
|
|
+ val odpsOps = env.getODPS(sc)
|
|
|
+ val project = "loghubods"
|
|
|
+ val tableUser = "alg_recsys_user_info"
|
|
|
+ val userRedisKeyPrefix = "user_info_4video_"
|
|
|
+
|
|
|
+
|
|
|
+ //3 特征处理
|
|
|
+ println("user特征处理")
|
|
|
+ val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition,
|
|
|
+ transfer = func, numPartition = tablePart)
|
|
|
+ .map(record => {
|
|
|
+ val mid = record.getString("mids")
|
|
|
+ val originFeatureName = Set(
|
|
|
+ "gender", "machineinfo_brand", "machineinfo_model", "machineinfo_platform", "machineinfo_sdkversion",
|
|
|
+ "machineinfo_system", "machineinfo_wechatversion",
|
|
|
+ //"gmt_create_user",
|
|
|
+ "u_1day_exp_cnt", "u_1day_click_cnt", "u_1day_share_cnt", "u_1day_return_cnt",
|
|
|
+ "u_3day_exp_cnt", "u_3day_click_cnt", "u_3day_share_cnt", "u_3day_return_cnt",
|
|
|
+ "u_7day_exp_cnt", "u_7day_click_cnt", "u_7day_share_cnt", "u_7day_return_cnt",
|
|
|
+ "u_3month_exp_cnt", "u_3month_click_cnt", "u_3month_share_cnt", "u_3month_return_cnt"
|
|
|
+ )
|
|
|
+ val originFeatureMap = getFeatureFromSet(originFeatureName, record)
|
|
|
+ val resultNew = new JSONObject
|
|
|
+ originFeatureName.foreach(r => {
|
|
|
+ if (originFeatureMap.containsKey(r)) {
|
|
|
+ val v = originFeatureMap(r)
|
|
|
+ resultNew.put(r, v)
|
|
|
+ }
|
|
|
+ })
|
|
|
+ (mid, resultNew.toString())
|
|
|
+ })
|
|
|
+ //3 特征原始文件保存
|
|
|
+ if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {
|
|
|
+ val savePathPart = savePathUser + "/all/" + partition
|
|
|
+ MyHdfsUtils.delete_hdfs_path(savePathPart)
|
|
|
+ userData.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
|
|
|
+ }
|
|
|
+ //4 近期用户统计
|
|
|
+ val dateEarly = MyDateUtils.getNumDaysBefore(date, 0)
|
|
|
+ val midRdd = odpsOps.readTable(project = "loghubods", table = "mid_uid",
|
|
|
+ partition = "dt=" + dateEarly, transfer = func, numPartition = tablePart)
|
|
|
+ .map(r => {
|
|
|
+ val mid = if (r.isNull("mid")) "" else r.getString("mid")
|
|
|
+ val actionTs = if (r.isNull("user_last_action_time")) "" else r.getString("user_last_action_time")
|
|
|
+ (mid, actionTs)
|
|
|
+ }).filter(r => r._1.nonEmpty && r._2.nonEmpty)
|
|
|
+ .filter(r => DateUtils.parseDate(date, Array[String]("yyyyMMdd")).getTime / 1000 - r._2.toLong / 1000 < 3600 * 24 * midDays)
|
|
|
+ println("------------mid处理完毕,近期保留的用户有:" + midRdd.count() + "------------------")
|
|
|
+
|
|
|
+ //5 用户区分
|
|
|
+ val savePathPart = savePathUser + "/" + partition
|
|
|
+ val userDataRead = sc.textFile(savePathPart).filter(_.split("\t").length >= 2)
|
|
|
+ .map(r => {
|
|
|
+ val rList = r.split("\t")
|
|
|
+ (rList(0), rList(1))
|
|
|
+ }).leftOuterJoin(midRdd).map {
|
|
|
+ case (mid, (fea, Some(_))) =>
|
|
|
+ (mid, fea, true)
|
|
|
+ case (mid, (fea, None)) =>
|
|
|
+ (mid, fea, false)
|
|
|
+ }
|
|
|
+ val userDataReadTrue = userDataRead.filter(_._3).map(r => r._1 + "\t" + r._2)
|
|
|
+ val userDataReadFalse = userDataRead.filter(!_._3).map(r => r._1 + "\t" + r._2)
|
|
|
+ if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {
|
|
|
+ val p1 = savePathUser + "/true/" + partition
|
|
|
+ MyHdfsUtils.delete_hdfs_path(p1)
|
|
|
+ userDataReadTrue.saveAsTextFile(p1, classOf[GzipCodec])
|
|
|
+ val p2 = savePathUser + "/false/" + partition
|
|
|
+ MyHdfsUtils.delete_hdfs_path(p2)
|
|
|
+ userDataReadFalse.saveAsTextFile(p2, classOf[GzipCodec])
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if (ifWriteRedisUser) {
|
|
|
+ println("开始处理redis写入")
|
|
|
+ val p1 = savePathUser + "/true/" + partition
|
|
|
+ val userDataRead = sc.textFile(p1).filter(_.split("\t").length >= 2)
|
|
|
+ .map(r => {
|
|
|
+ val rList = r.split("\t")
|
|
|
+ (rList(0), rList(1))
|
|
|
+ })
|
|
|
+ val count = userDataRead.count()
|
|
|
+ println("待写入数据有:" + count)
|
|
|
+ if (count > 200000000) {
|
|
|
+ println("数据量超过2亿,不执行写入。")
|
|
|
+ } else {
|
|
|
+ val userDataTakeRddRun = userDataRead.mapPartitions(row => {
|
|
|
+ val redisFormat = new util.HashMap[String, String]
|
|
|
+ val redisTemplate = env.getRedisTemplate()
|
|
|
+ var i = 1
|
|
|
+ row.foreach {
|
|
|
+ case (key, value) =>
|
|
|
+ if (key.nonEmpty) {
|
|
|
+ redisFormat.put(userRedisKeyPrefix + key, value)
|
|
|
+ }
|
|
|
+ if (i % 1000 == 0) {
|
|
|
+ redisTemplate.opsForValue.multiSet(redisFormat)
|
|
|
+ redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
|
|
|
+ redisFormat.clear()
|
|
|
+ }
|
|
|
+ i = i + 1
|
|
|
+ }
|
|
|
+ redisTemplate.opsForValue.multiSet(redisFormat)
|
|
|
+ redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
|
|
|
+ redisFormat.clear()
|
|
|
+ redisFormat.iterator
|
|
|
+ })
|
|
|
+ println("user写入成功:put in redis.count=" + userDataTakeRddRun.count())
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def func(record: Record, schema: TableSchema): Record = {
|
|
|
+ record
|
|
|
+ }
|
|
|
+
|
|
|
+}
|