zhangbo 1 éve
szülő
commit
cc0ea1354e

+ 22 - 2
src/main/java/examples/dataloader/RequestContextOffline.java

@@ -27,11 +27,21 @@ public class RequestContextOffline extends RequestContext {
         setKVinMap(record, "u_str_1day", "double", "rate");
         setKVinMap(record, "u_rov_1day", "double", "rate");
         setKVinMap(record, "u_ros_1day", "double", "rate");
+
+        setKVinMap(record, "u_3day_exp_cnt", "double", "cnt");
+        setKVinMap(record, "u_3day_click_cnt", "double", "cnt");
+        setKVinMap(record, "u_3day_share_cnt", "double", "cnt");
+        setKVinMap(record, "u_3day_return_cnt", "double", "cnt");
+
+        setKVinMap(record, "u_ctr_3day", "double", "rate");
+        setKVinMap(record, "u_str_3day", "double", "rate");
+        setKVinMap(record, "u_rov_3day", "double", "rate");
+        setKVinMap(record, "u_ros_3day", "double", "rate");
     }
     public void putItemFeature(Record record){
-        setKVinMap(record, "i_title_len", "double", "cnt");
+        // setKVinMap(record, "i_title_len", "double", "cnt");
         setKVinMap(record, "total_time", "double", "cnt");
-        setKVinMap(record, "i_days_since_upload", "double", "cnt");
+        // setKVinMap(record, "i_days_since_upload", "double", "cnt");
         setKVinMap(record, "play_count_total", "double", "cnt");
 
         setKVinMap(record, "i_1day_exp_cnt", "double", "cnt");
@@ -43,6 +53,16 @@ public class RequestContextOffline extends RequestContext {
         setKVinMap(record, "i_str_1day", "double", "rate");
         setKVinMap(record, "i_rov_1day", "double", "rate");
         setKVinMap(record, "i_ros_1day", "double", "rate");
+
+        setKVinMap(record, "i_3day_exp_cnt", "double", "cnt");
+        setKVinMap(record, "i_3day_click_cnt", "double", "cnt");
+        setKVinMap(record, "i_3day_share_cnt", "double", "cnt");
+        setKVinMap(record, "i_3day_return_cnt", "double", "cnt");
+
+        setKVinMap(record, "i_ctr_3day", "double", "rate");
+        setKVinMap(record, "i_str_3day", "double", "rate");
+        setKVinMap(record, "i_rov_3day", "double", "rate");
+        setKVinMap(record, "i_ros_3day", "double", "rate");
     }
 
     public void putSceneFeature(Record record){

+ 13 - 6
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_01_readtable2hdfs.scala

@@ -21,9 +21,11 @@ object makedata_01_readtable2hdfs {
     // 1 读取参数
     val param = ParamUtils.parseArgs(args)
     val partitionPrefix = param.getOrElse("partitionPrefix", "dt=")
-    val tablePart = param.getOrElse("tablePart", "16").toInt
+    val tablePart = param.getOrElse("tablePart", "64").toInt
     val beginStr = param.getOrElse("beginStr", "20230101")
     val endStr = param.getOrElse("endStr", "20230101")
+    val savePath = param.getOrElse("savePath", "")
+    // /dw/recommend/model/share_ratio_samples/
 
 
     // 2 读取odps+表信息
@@ -41,11 +43,16 @@ object makedata_01_readtable2hdfs {
         partition = partition,
         transfer = func,
         numPartition = tablePart)
-      val hdfsPath = "/dw/recommend/model/share_ratio_samples/" + partition
-      MyHdfsUtils.delete_hdfs_path(hdfsPath)
-      odpsData.saveAsTextFile(hdfsPath, classOf[GzipCodec])
-      println("数据写入完成:" + hdfsPath)
-      println("数据量:" + odpsData.count())
+      val hdfsPath = savePath + "/" + partition
+      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")){
+        MyHdfsUtils.delete_hdfs_path(hdfsPath)
+        odpsData.saveAsTextFile(hdfsPath, classOf[GzipCodec])
+        println("数据写入完成:" + hdfsPath)
+        println("数据量:" + odpsData.count())
+      }else{
+        println("路径不合法, 无法写入:" + hdfsPath)
+      }
+
     }
   }
 

+ 18 - 18
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_02_writeredis.scala

@@ -25,7 +25,7 @@ object makedata_02_writeredis {
     // 1 读取参数
     val param = ParamUtils.parseArgs(args)
     val partitionPrefix = param.getOrElse("partitionPrefix", "dt=")
-    val tablePart = param.getOrElse("tablePart", "16").toInt
+    val tablePart = param.getOrElse("tablePart", "64").toInt
     val ifUser = param.getOrDefault("ifUser", "False").toBoolean
     val ifVideo = param.getOrDefault("ifVideo", "False").toBoolean
     val date = param.getOrDefault("date", "20231220")
@@ -145,23 +145,23 @@ object makedata_02_writeredis {
     val reqContext: RequestContextOffline = new RequestContextOffline()
 
     //---------todo 有特征不在表里 临时修复---------
-    val i_title_len =  if (record.getString("title") != null) record.getString("title").length.toString else ""
-    val i_days_since_upload = if (record.getDatetime("gmt_create") != null){
-      val format = new SimpleDateFormat("yyyyMMdd")
-      val dateOld = format.format(record.getDatetime("gmt_create"))
-      val dayDiff = MyDateUtils.calculateDateDifference(dateOld, date)
-      dayDiff.toString
-    }else{
-      ""
-    }
-    if (i_title_len.nonEmpty){
-      val d = reqContext.bucketRatioFeature(i_title_len.toDouble)
-      reqContext.featureMap.put("i_title_len", d.toString)
-    }
-    if (i_days_since_upload.nonEmpty) {
-      val d = reqContext.bucketRatioFeature(i_days_since_upload.toDouble)
-      reqContext.featureMap.put("i_days_since_upload", d.toString)
-    }
+//    val i_title_len =  if (record.getString("title") != null) record.getString("title").length.toString else ""
+//    val i_days_since_upload = if (record.getDatetime("gmt_create") != null){
+//      val format = new SimpleDateFormat("yyyyMMdd")
+//      val dateOld = format.format(record.getDatetime("gmt_create"))
+//      val dayDiff = MyDateUtils.calculateDateDifference(dateOld, date)
+//      dayDiff.toString
+//    }else{
+//      ""
+//    }
+//    if (i_title_len.nonEmpty){
+//      val d = reqContext.bucketRatioFeature(i_title_len.toDouble)
+//      reqContext.featureMap.put("i_title_len", d.toString)
+//    }
+//    if (i_days_since_upload.nonEmpty) {
+//      val d = reqContext.bucketRatioFeature(i_days_since_upload.toDouble)
+//      reqContext.featureMap.put("i_days_since_upload", d.toString)
+//    }
     //------修复完成---------
 
     reqContext.putItemFeature(record)