浏览代码

新样本数据生产

zhangbo 1 年之前
父节点
当前提交
da92a9afe5

+ 23 - 10
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_12_rosData_v3.scala

@@ -48,9 +48,9 @@ object makedata_12_rosData_v3 {
         val feaStr = rList(2)
         val labelJson = JSON.parseObject(labelStr)
         val is_share = labelJson.getString("is_share")
-        (logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp.toLong)
+        (mid, logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp.toLong)
       }).filter({
-        case (logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp) =>
+        case (mid, logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp) =>
           val pages = Set("2")
           val video_status = Set("-6")
           val apps = Set("0", "4", "5", "21", "3", "6")
@@ -59,7 +59,7 @@ object makedata_12_rosData_v3 {
 
       //2 样本采样(多个回流的样本复制,等价回流量的加权)
       val data2 = data1.flatMap({
-        case (logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp) =>
+        case (mid, logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp) =>
           val res = ArrayBuffer[(String, JSONObject)]()
           val feaJson = JSON.parseObject(feaStr)
           val is_return = labelJson.getString("is_return")
@@ -70,15 +70,28 @@ object makedata_12_rosData_v3 {
               val midReturn = r.split(":")(0)
               val ts = r.split(":")(1).toLong
               (midReturn, ts)
-            }).sortBy(_._2)
-            var midSet = scala.collection.mutable.HashSet[String]()
-            for ((midReturn, tsReturn) <- return_mid_ts_list){
-              if (!midSet.contains(midReturn)){
-                midSet.add(midReturn)
-                if ((tsReturn / 1000 - logtimestamp / 1000) <= 3600 && tsReturn - logtimestamp > 0){
-                  res.add(("1", feaJson))
+            }).filter(!_._1.equals(mid)).sortBy(_._2)
+            // 样本中做了一个必要的过滤,如果是自己的回流,过滤掉。
+
+            if (return_mid_ts_list.nonEmpty){
+              var flag = true
+              val midSet = scala.collection.mutable.HashSet[String]()
+              for ((midReturn, tsReturn) <- return_mid_ts_list) {
+                if (!midSet.contains(midReturn)) {
+                  midSet.add(midReturn)
+                  if ((tsReturn / 1000 - logtimestamp / 1000) <= 3600 && tsReturn - logtimestamp > 0) {
+                    res.add(("1", feaJson))
+                    flag = false
+                  }
                 }
               }
+              if (flag) {
+                // 如果上面一个正样本都没添加,那么添加一个负样本。代表近一个小时内没有回流。
+                res.add(("0", feaJson))
+              }
+            }else {
+              // 如果把自己的回流过滤掉了之后,没有其他回流,那么是负样本。
+              res.add(("0", feaJson))
             }
           }
           res.iterator

+ 37 - 8
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_12_rosData_v3_noweight.scala

@@ -48,9 +48,9 @@ object makedata_12_rosData_v3_noweight {
         val feaStr = rList(2)
         val labelJson = JSON.parseObject(labelStr)
         val is_share = labelJson.getString("is_share")
-        (logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp.toLong)
+        (mid, logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp.toLong)
       }).filter({
-        case (logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp) =>
+        case (mid, logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp) =>
           val pages = Set("2")
           val video_status = Set("-6")
           val apps = Set("0", "4", "5", "21", "3", "6")
@@ -58,15 +58,44 @@ object makedata_12_rosData_v3_noweight {
       })
 
       //2 样本采样
-      val data2 = data1.map({
-        case (logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp) =>
+      val data2 = data1.flatMap({
+        case (mid, logKeyStr, labelJson, feaStr, is_share, pagesource_change, video_recommend, apptype, logtimestamp) =>
+          val res = ArrayBuffer[(String, JSONObject)]()
           val feaJson = JSON.parseObject(feaStr)
           val is_return = labelJson.getString("is_return")
-          if ("0".equals(is_return)){
-            ("0", feaJson)
-          }else{
-            ("1", feaJson)
+          if ("0".equals(is_return)) {
+            res.add(("0", feaJson))
+          } else {
+            val return_mid_ts_list = labelJson.getString("return_mid_ts_list").split(",").map(r => {
+              val midReturn = r.split(":")(0)
+              val ts = r.split(":")(1).toLong
+              (midReturn, ts)
+            }).filter(!_._1.equals(mid)).sortBy(_._2)
+            // 样本中做了一个必要的过滤,如果是自己的回流,过滤掉。
+
+            if (return_mid_ts_list.nonEmpty) {
+              var flag = true
+              val midSet = scala.collection.mutable.HashSet[String]()
+              for ((midReturn, tsReturn) <- return_mid_ts_list) {
+                if (flag && !midSet.contains(midReturn)) {
+                  // 通过flag的变化,只添加一条正样本。实现不加权。
+                  midSet.add(midReturn)
+                  if ((tsReturn / 1000 - logtimestamp / 1000) <= 3600 && tsReturn - logtimestamp > 0) {
+                    res.add(("1", feaJson))
+                    flag = false
+                  }
+                }
+              }
+              if (flag) {
+                // 如果上面一个正样本都没添加,那么添加一个负样本。代表近一个小时内没有回流。
+                res.add(("0", feaJson))
+              }
+            } else {
+              // 如果把自己的回流过滤掉了之后,没有其他回流,那么是负样本。
+              res.add(("0", feaJson))
+            }
           }
+          res.iterator
       })
 
       //3 保留一份原始样本的中间数据

+ 9 - 2
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本

@@ -12,8 +12,15 @@ nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.s
 --class com.aliyun.odps.spark.examples.makedata.makedata_12_rosData_v3 \
 --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-savePath:/dw/recommend/model/12_ros_data_v3/ beginStr:20240222 endStr:20240225 ifRepart:10 \
-> p12.log 2>&1 &
+savePath:/dw/recommend/model/12_ros_data_v3/ beginStr:20240226 endStr:20240226 ifRepart:10 \
+> p12_1.log 2>&1 &
+
+nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata.makedata_12_rosData_v3_noweight \
+--master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
+./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+savePath:/dw/recommend/model/12_ros_data_v3_noweight/ beginStr:20240222 endStr:20240225 ifRepart:10 \
+> p12_2.log 2>&1 &
 
 
 

+ 2 - 0
zhangbo/01_train.sh

@@ -15,6 +15,8 @@ $HADOOP fs -text ${train_path}/dt=$day/* | /root/sunmingze/alphaFM/bin/fm_train
 
 # nohup sh 01_train.sh 20240222 /dw/recommend/model/12_ros_data_v3 model_ros_tom >p1_train.log 2>&1 &
 # nohup sh 01_train.sh 20240222 /dw/recommend/model/11_str_data_v3 model_str_tom >p1_train.log 2>&1 &
+# nohup sh 01_train.sh 20240222 /dw/recommend/model/12_ros_data_v3_noweight model_str_tom_noweight >p1_train.log 2>&1 &
+
 
 
 # nohup sh 01_train.sh 20231214 /dw/recommend/model/share_ratio_samples_v2/ model_sharev2 >p1.log 2>&1 &

+ 2 - 0
zhangbo/02_train_go.sh

@@ -22,3 +22,5 @@ while [[ "$current_date" != "$end_date" ]]; do
 done
 
 # nohup sh 02_train_go.sh 20240225 20240226 model_ros_tom /dw/recommend/model/12_ros_data_v3/ >p2.log 2>&1 &
+# nohup sh 02_train_go.sh 20240223 20240226 model_str_tom /dw/recommend/model/11_str_data_v3/ >p2.log 2>&1 &
+# nohup sh 02_train_go.sh 20240223 20240226 model_str_tom_noweight /dw/recommend/model/12_ros_data_v3_noweight/ >p2.log 2>&1 &

+ 4 - 0
zhangbo/03_predict.sh

@@ -11,6 +11,10 @@ $HADOOP fs -text ${train_path}/dt=$day/* | /root/sunmingze/alphaFM/bin/fm_predic
 cat predict/${output_file}_$day.txt | /root/sunmingze/AUC/AUC
 
 # nohup sh 03_predict.sh 20240226 /dw/recommend/model/12_ros_data_v3/ model_ros_tom_20240225.txt model_ros_tom >p3_pred.log 2>&1 &
+# nohup sh 03_predict.sh 20240226 /dw/recommend/model/12_ros_data_v3_noweight/ model_ros_tom_noweight_20240225.txt model_ros_tom_noweight2 >p3_pred.log 2>&1 &
+# nohup sh 03_predict.sh 20240226 /dw/recommend/model/11_str_data_v3/ model_str_tom_20240222.txt model_str_tom >p3.log 2>&1 &
+
+# nohup sh 03_predict.sh 20240226 /dw/recommend/model/11_str_data_v3/ model_str_tom_20240225.txt model_str_tom >p3.123.log 2>&1 &
 
 # str:
 # nohup sh 03_predict.sh 20240115 /dw/recommend/model/01_str_data/ model_str_big_20240114.txt model_str_big >p1_pred.log 2>&1 &