zhangbo 1 年之前
父节点
当前提交
8207f7dd6e

+ 5 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_06_originData.scala

@@ -231,7 +231,11 @@ object makedata_06_originData {
     val result = mutable.HashMap[String, String]()
     set.foreach(r =>{
       if (!record.isNull(r)){
-        result.put(r, record.getString(r))
+        try{
+          result.put(r, record.getString(r))
+        }catch {
+          case Exception => result.put(r, String.valueOf(record.getBigint(r)))
+        }
       }
     })
     result

+ 113 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_08_item2redis.scala

@@ -0,0 +1,113 @@
+package com.aliyun.odps.spark.examples.makedata
+
+import com.alibaba.fastjson.JSONObject
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.makedata.makedata_06_originData.getFeatureFromSet
+import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils, env}
+import com.google.gson.GsonBuilder
+import examples.dataloader.RequestContextOffline
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import java.util
+import java.util.concurrent.TimeUnit
+import scala.collection.JavaConversions._
+
+
+object makedata_08_item2redis {
+  def main(args: Array[String]) {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val partitionPrefix = param.getOrElse("partitionPrefix", "dt=")
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val ifUser = param.getOrDefault("ifUser", "False").toBoolean
+    val ifVideo = param.getOrDefault("ifVideo", "False").toBoolean
+    val date = param.getOrDefault("date", "20231220")
+    val expireDay = param.getOrDefault("expireDay", "2").toInt
+    val ifDebug = param.getOrDefault("ifDebug", "False").toBoolean
+    val ifDeleteRedisUser = param.getOrDefault("ifDeleteRedisUser", "False").toBoolean
+    val ifWriteRedisUser = param.getOrDefault("ifWriteRedisUser", "False").toBoolean
+    val ifWriteRedis = param.getOrDefault("ifWriteRedis", "True").toBoolean
+    val partition = partitionPrefix + date
+    val savePathUser = param.getOrDefault("savePathUser", "")
+    val savePathVideo = param.getOrDefault("savePathVideo", "")
+    val userSampleIDs = param.getOrDefault("userSampleIDs", "")
+    val sampleRate = param.getOrDefault("sampleRate", "1.0").toDouble
+
+
+    // 2 读取数据库odps
+    val odpsOps = env.getODPS(sc)
+    val project = "loghubods"
+    val tableItem = "alg_recsys_video_info"
+    val videoRedisKeyPrefix = "video_info_"
+
+    // 4 video测特征处理
+    if (ifVideo){
+      println("video特征处理")
+      val itemData = odpsOps.readTable(project = project, table = tableItem, partition = partition, transfer = func, numPartition = tablePart)
+
+      val itemDataTakeRddRun = itemData.map(record =>{
+        val originFeatureName = Set(
+          "title", "tags", "total_time", "play_count_total",
+          "i_1day_exp_cnt", "i_1day_click_cnt", "i_1day_share_cnt", "i_1day_return_cnt",
+          "i_3day_exp_cnt", "i_3day_click_cnt", "i_3day_share_cnt", "i_3day_return_cnt",
+          "i_7day_exp_cnt", "i_7day_click_cnt", "i_7day_share_cnt", "i_7day_return_cnt",
+          "i_3month_exp_cnt", "i_3month_click_cnt", "i_3month_share_cnt", "i_3month_return_cnt"
+        )
+        val originFeatureMap = getFeatureFromSet(originFeatureName, record)
+        val videoid = record.getBigint("videoid").toString
+        val resultNew = new JSONObject
+        originFeatureName.foreach(r => {
+          if (originFeatureMap.containsKey(r)) {
+            resultNew.put(r, originFeatureMap.get(r))
+          }
+        })
+        (videoid, resultNew.toString())
+      }).mapPartitions(row => {
+          val redisFormat = new util.HashMap[String, String]
+          val redisFormatSave = new util.HashMap[String, String]
+          val redisTemplate = env.getRedisTemplate()
+          var i = 1
+          row.foreach {
+            case (key, value) =>
+              if (key.nonEmpty && value != null && value.nonEmpty) {
+                redisFormat.put(videoRedisKeyPrefix + key, value)
+                redisFormatSave.put(videoRedisKeyPrefix + key, value)
+              }
+              if (i % 1000 == 0 && ifWriteRedis) {
+                redisTemplate.opsForValue.multiSet(redisFormat)
+                redisFormat.keySet.foreach(r => redisTemplate.expire(r, 24 * expireDay, TimeUnit.HOURS))
+                redisFormat.clear()
+              }
+              i = i + 1
+          }
+          if (ifWriteRedis){
+            redisTemplate.opsForValue.multiSet(redisFormat)
+            redisFormat.keySet.foreach(key => redisTemplate.expire(key, 24 * expireDay, TimeUnit.HOURS))
+            redisFormat.clear()
+          }
+          redisFormatSave.iterator
+      })
+      if (savePathVideo.nonEmpty && savePathVideo.startsWith("/dw/recommend/model/")){
+        val savePathPart = savePathVideo + "/" + partition
+        MyHdfsUtils.delete_hdfs_path(savePathPart)
+        itemDataTakeRddRun.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
+      }
+      println("item.action.count=" + itemDataTakeRddRun.count())
+    }else{
+      println("不处理video")
+    }
+  }
+
+  def func(record: Record, schema: TableSchema): Record = {
+    record
+  }
+
+}

+ 3 - 1
zhangbo/01_train.sh

@@ -18,11 +18,13 @@ $HADOOP fs -text ${train_path}/dt=$day/* | /root/sunmingze/alphaFM/bin/fm_train
 # str 模型路径:/dw/recommend/model/share_ratio_samples_v2
 # ros 模型路径:/dw/recommend/model/ros_sample/
 
-
+# str
 # nohup sh 01_train.sh 20240114 /dw/recommend/model/01_str_data model_str_big >p1.log 2>&1 &
 # nohup sh 01_train.sh 20240114 /dw/recommend/model/01_str_data model_str_big1 0.5 1.0 >p1_train.log 2>&1 &
 
 # nohup sh 01_train.sh 20240114 /dw/recommend/model/02_str_data model_str_small >p2.log 2>&1 &
+# nohup sh 01_train.sh 20240114 /dw/recommend/model/02_str_data model_str_small1 0.1 5.0 >p2_train.log 2>&1 &
 # nohup sh 01_train.sh 20240114 /dw/recommend/model/03_str_data model_str_mid >p3.log 2>&1 &
 # nohup sh 01_train.sh 20240114 /dw/recommend/model/04_str_data model_str_mid2 >p4_train.log 2>&1 &
+# nohup sh 01_train.sh 20240114 /dw/recommend/model/04_str_data model_str_mid4 0.1 5.0 >p4_train.log 2>&1 &
 # nohup sh 01_train.sh 20240114 /dw/recommend/model/05_str_data model_str_mid3 >p5_train.log 2>&1 &

+ 3 - 0
zhangbo/03_predict.sh

@@ -12,9 +12,12 @@ cat predict/${output_file}_$day.txt | /root/sunmingze/AUC/AUC
 
 # str:
 # nohup sh 03_predict.sh 20240115 /dw/recommend/model/01_str_data/ model_str_big_20240114.txt model_str_big >p1_pred.log 2>&1 &
+# nohup sh 03_predict.sh 20240115 /dw/recommend/model/01_str_data/ model_str_big1_20240114.txt model_str_big1 >p1_pred.log 2>&1 &
 # nohup sh 03_predict.sh 20240115 /dw/recommend/model/02_str_data/ model_str_small_20240114.txt model_str_small >p2_pred.log 2>&1 &
+# nohup sh 03_predict.sh 20240115 /dw/recommend/model/02_str_data/ model_str_small1_20240114.txt model_str_small1 >p2_pred.log 2>&1 &
 # nohup sh 03_predict.sh 20240115 /dw/recommend/model/03_str_data/ model_str_mid_20240114.txt model_str_mid >p3_pred.log 2>&1 &
 # nohup sh 03_predict.sh 20240115 /dw/recommend/model/04_str_data/ model_str_mid2_20240114.txt model_str_mid2 >p4_pred.log 2>&1 &
+# nohup sh 03_predict.sh 20240115 /dw/recommend/model/04_str_data/ model_str_mid4_20240114.txt model_str_mid4 >p4_pred.log 2>&1 &
 # nohup sh 03_predict.sh 20240115 /dw/recommend/model/05_str_data/ model_str_mid3_20240114.txt model_str_mid3 >p5_pred.log 2>&1 &