zhangbo 1 jaar geleden
bovenliggende
commit
cdd737f4c5

+ 36 - 30
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_09_user2redis_freq.scala

@@ -30,6 +30,7 @@ object makedata_09_user2redis_freq {
     val tablePart = param.getOrElse("tablePart", "64").toInt
     val date = param.getOrDefault("date", "20231220")
     val expireDay = param.getOrDefault("expireDay", "3").toInt
+    val ifUser = param.getOrDefault("ifUser", "False").toBoolean
     val ifWriteRedisUser = param.getOrDefault("ifWriteRedisUser", "False").toBoolean
     val partition = partitionPrefix + date
     val savePathUser = param.getOrDefault("savePathUser", "")
@@ -42,37 +43,42 @@ object makedata_09_user2redis_freq {
     val userRedisKeyPrefix = "user_info_4video_"
 
 
-    //3 特征处理
-    println("user特征处理")
-    val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition,
-        transfer = func, numPartition = tablePart)
-      .map(record => {
-        val mid = record.getString("mids")
-        val originFeatureName = Set(
-          "gender", "machineinfo_brand", "machineinfo_model", "machineinfo_platform", "machineinfo_sdkversion",
-          "machineinfo_system", "machineinfo_wechatversion",
-          //"gmt_create_user",
-          "u_1day_exp_cnt", "u_1day_click_cnt", "u_1day_share_cnt", "u_1day_return_cnt",
-          "u_3day_exp_cnt", "u_3day_click_cnt", "u_3day_share_cnt", "u_3day_return_cnt",
-          "u_7day_exp_cnt", "u_7day_click_cnt", "u_7day_share_cnt", "u_7day_return_cnt",
-          "u_3month_exp_cnt", "u_3month_click_cnt", "u_3month_share_cnt", "u_3month_return_cnt"
-        )
-        val originFeatureMap = getFeatureFromSet(originFeatureName, record)
-        val resultNew = new JSONObject
-        originFeatureName.foreach(r => {
-          if (originFeatureMap.containsKey(r)) {
-            val v = originFeatureMap(r)
-            resultNew.put(r, v)
-          }
+
+    if (ifUser){
+      //3 特征处理
+      println("user特征处理")
+      val userData = odpsOps.readTable(project = project, table = tableUser, partition = partition,
+          transfer = func, numPartition = tablePart)
+        .map(record => {
+          val mid = record.getString("mids")
+          val originFeatureName = Set(
+            "gender", "machineinfo_brand", "machineinfo_model", "machineinfo_platform", "machineinfo_sdkversion",
+            "machineinfo_system", "machineinfo_wechatversion",
+            //"gmt_create_user",
+            "u_1day_exp_cnt", "u_1day_click_cnt", "u_1day_share_cnt", "u_1day_return_cnt",
+            "u_3day_exp_cnt", "u_3day_click_cnt", "u_3day_share_cnt", "u_3day_return_cnt",
+            "u_7day_exp_cnt", "u_7day_click_cnt", "u_7day_share_cnt", "u_7day_return_cnt",
+            "u_3month_exp_cnt", "u_3month_click_cnt", "u_3month_share_cnt", "u_3month_return_cnt"
+          )
+          val originFeatureMap = getFeatureFromSet(originFeatureName, record)
+          val resultNew = new JSONObject
+          originFeatureName.foreach(r => {
+            if (originFeatureMap.containsKey(r)) {
+              val v = originFeatureMap(r)
+              resultNew.put(r, v)
+            }
+          })
+          (mid, resultNew.toString())
         })
-        (mid, resultNew.toString())
-      })
-    //3 特征原始文件保存
-    if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {
-      val savePathPart = savePathUser + "/all/" + partition
-      MyHdfsUtils.delete_hdfs_path(savePathPart)
-      userData.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
+      //3 特征原始文件保存
+      if (savePathUser.nonEmpty && savePathUser.startsWith("/dw/recommend/model/")) {
+        val savePathPart = savePathUser + "/all/" + partition
+        MyHdfsUtils.delete_hdfs_path(savePathPart)
+        userData.map(r => r._1 + "\t" + r._2).saveAsTextFile(savePathPart, classOf[GzipCodec])
+      }
     }
+
+
     //4 近期用户统计
     val dateEarly = MyDateUtils.getNumDaysBefore(date, 0)
     val midRdd = odpsOps.readTable(project = "loghubods", table = "mid_uid",
@@ -108,7 +114,7 @@ object makedata_09_user2redis_freq {
       userDataReadFalse.saveAsTextFile(p2, classOf[GzipCodec])
     }
 
-
+    //6 redis
     if (ifWriteRedisUser) {
       println("开始处理redis写入")
       val p1 = savePathUser + "/true/" + partition

+ 4 - 5
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本

@@ -33,12 +33,11 @@ savePath:/dw/recommend/model/11_str_data_v3/ beginStr:20240222 endStr:20240225 i
 
 
 
-/opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.examples.makedata.makedata_09_user2redis_freq \
 --name makedata_09_user2redis_freq \
---master yarn --driver-memory 1G --executor-memory 3G --executor-cores 1 --num-executors 32 \
+--master yarn --driver-memory 1G --executor-memory 4G --executor-cores 1 --num-executors 32 \
 --conf spark.yarn.executor.memoryoverhead=1024 \
 /root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-date:20230227 tablePart:32 expireDay:3  \
-ifWriteRedisUser:True midDays:7 \
-savePathUser:/dw/recommend/model/09_feature/user/
+date:20240227 tablePart:32 expireDay:3 ifWriteRedisUser:False ifUser:False midDays:7 \
+savePathUser:/dw/recommend/model/09_feature/user/ > p09.log 2>&1 &