zhangbo 1 年之前
父节点
当前提交
04db917aac

+ 3 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_01_readtable2hdfs.scala

@@ -3,7 +3,7 @@ package com.aliyun.odps.spark.examples.makedata
 import org.apache.spark.sql.SparkSession
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
-import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, ParamUtils, env}
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
 import examples.dataloader.RequestContextOffline
 import examples.dataloader.OfflineVlogShareLRFeatureExtractor
 import org.apache.hadoop.io.compress.GzipCodec
@@ -42,8 +42,10 @@ object makedata_01_readtable2hdfs {
         transfer = func,
         numPartition = tablePart)
       val hdfsPath = "/dw/recommend/model/share_ratio_samples/" + partition
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
       odpsData.saveAsTextFile(hdfsPath, classOf[GzipCodec])
       println("数据写入完成:" + hdfsPath)
+      println("数据量:" + odpsData.count())
     }
   }
 

+ 148 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/MyHdfsUtils.scala

@@ -0,0 +1,148 @@
+package com.aliyun.odps.spark.examples.myUtils
+
+/**
+ * Author: zhangbo58
+ * Description:
+ *
+ */
+import org.apache.commons.lang.time.DateUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
+
+import scala.collection.mutable.ArrayBuffer
+
+object MyHdfsUtils {
+  def main(args: Array[String]): Unit = {
+    val path = "zhangbo58/"
+    //生成FileSystem
+    println("获取目录下的一级文件和目录")
+    getFilesAndDirs(path).foreach(println)
+    println("获取目录下的一级文件")
+    getFiles(path).foreach(println)
+    println("获取目录下的一级目录")
+    getDirs(path).foreach(println)
+    println("获取目录下所有文件")
+    getAllFiles(path).foreach(println)
+  }
+
+  def getHdfs(path: String): FileSystem = {
+    val conf = new Configuration()
+    //    FileSystem.get(URI.create(path), conf)
+    val fs = org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration())
+    fs
+  }
+
+  //获取目录下的一级文件和目录
+  def getFilesAndDirs(path: String): Array[Path] = {
+    val fs = getHdfs(path).listStatus(new Path(path))
+    FileUtil.stat2Paths(fs)
+  }
+  //获取目录下的一级文件
+  def getFiles(path: String): Array[String] = {
+    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isFile())
+      .map(_.toString)
+  }
+  //获取目录下的一级目录
+  def getDirs(path: String): Array[String] = {
+    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isDirectory)
+      .map(_.toString)
+  }
+  //获取目录下的所有文件
+  def getAllFiles(path: String): ArrayBuffer[String] = {
+    val arr = ArrayBuffer[String]()
+    val hdfs = getHdfs(path)
+    val getPath = getFilesAndDirs(path)
+    getPath.foreach(patha => {
+      if (hdfs.getFileStatus(patha).isFile())
+        arr += patha.toString
+      else {
+        arr ++= getAllFiles(patha.toString())
+      }
+    })
+    arr
+  }
+  def ifHDFSHasData(path: String): Boolean = {
+    val hdfs_path = new org.apache.hadoop.fs.Path(path.toString)
+    val hdfs = org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration())
+
+    var rst = false
+    if (hdfs.exists(hdfs_path)) {
+      //路径存在且不为空
+      val statusList = hdfs.listStatus(hdfs_path)
+      for (status <- statusList if !rst && (status.getPath.toString.contains("part-") || status.getPath.toString.contains("_SUCCESS"))) {
+        if (status.getLen > 0) {
+          rst = true
+        }
+      }
+    }
+    rst
+  }
+
+  def delete_hdfs_path(path: String): Unit = {
+    val hdfs_path = new org.apache.hadoop.fs.Path(path.toString)
+    val hdfs = org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration())
+
+    if (hdfs.exists(hdfs_path)) {
+      hdfs.delete(hdfs_path, true)
+    }
+  }
+
+  def hdfs_exits(path:String): Boolean = {
+    val hdfs_path = new org.apache.hadoop.fs.Path(path.toString)
+    val hdfs = org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration())
+
+    hdfs.exists(hdfs_path)
+  }
+
+
+  /* 删除某路径下,不在某段时间内的所有数据。
+  * 举例:keepDays=2 dateStr=20191015 => 保留20191015和20191014两天的数据
+  */
+  def hdfs_delete_not_keep_days(
+                                 path:String,
+                                 keepDays:Int,
+                                 dateStr:String,
+                                 pattern:String = "yyyyMMdd"
+                               ): Unit ={
+    val file_list = this.getFiles(path)
+    println("hdfs_delete_not_keep_days-file_list")
+    file_list.foreach(println)
+
+    for (file <- file_list){
+      var flag = true
+      val date_early = MyDateUtils.getNumDaysBefore(dateStr, keepDays, pattern)
+      try{
+        val file_split_strs = file.split("/")
+        val len = file_split_strs.length
+        var file_date = file_split_strs(len-1)
+        if (file_date.equals("")){
+          file_date = file_split_strs(len-2)
+        }
+        var date1 = DateUtils.parseDate(file_date, Array[String](pattern)) // 文件中的日期
+        var date2 = DateUtils.parseDate(date_early, Array[String](pattern)) // 请求的日期前keepDays的日期
+        if (date1.compareTo(date2) >= 0){ // 这个日期之前的数据 全部删除
+          flag = false
+        }
+      }catch {
+        case e:Exception =>
+          flag = false
+      }
+
+      if (flag){
+        MyHdfsUtils.delete_hdfs_path(file.toString)
+      }
+    }
+  }
+
+  /**
+   * @Author: zhangbo
+   * @Description: 给某hdfs路径加权限
+   *
+   */
+
+  def give_hdfs_permission(path:String): Unit ={
+    getHdfs(path).setPermission(new Path(path), new FsPermission("777"))
+  }
+}
+

+ 5 - 0
zhangbo/01_train.sh

@@ -0,0 +1,5 @@
+day=$1
+HADOOP="/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop"
+$HADOOP fs -cat /dw/recommend/model/share_ratio_samples/dt=$day/* | /root/sunmingze/alphaFM/bin/fm_train -m model/model_share_$day.txt -dim 1,1,0 -core 8
+
+

+ 14 - 0
zhangbo/02_train_go.sh

@@ -0,0 +1,14 @@
+day=$1
+day_yes=$(date -d"yesterday $day" +%Y%m%d)
+
+echo "today is: $day"
+
+echo "yesterday is: $day_yes"
+
+MODEL_PATH="/root/spark-data/model/"
+SAMPLE_PATH="/dw/recommend/model/share_ratio_samples/"
+HADOOP="/opt/apps/HADOOP-COMMON/hadoop-common-current/bin/hadoop"
+FM_TRAIN="/root/sunmingze/alphaFM/bin/fm_train"
+
+
+$HADOOP fs -cat ${SAMPLE_PATH}/dt=$day/* | ${FM_TRAIN} -m $MODEL_PATH/model_share_$day.txt -dim 1,1,0 -core 8 -im $MODEL_PATH/model_share_${day_yes}.txt

+ 28 - 0
zhangbo/train.sh

@@ -0,0 +1,28 @@
+#!/bin/sh
+
+#MVN_PACKAGE="mvn clean install  -T 2C -Dmaven.test.skip=true -Dmaven.compile.fork=true"
+JAVA_PATH="/usr/bin/java"
+PYTHON_PATH="/usr/bin/python"
+UPLOAD_PY_PATH="/root/algo/upload.py"
+JAR_PATH="/root/algo/recommend-server/recommend-server-service/target/recommend-server-service.jar"
+FM_PATH="/root/algo/alphaFM/bin"
+MODEL_PATH="/root/algo/LR_MODEL/"
+YESTERDAY="$(date -d '2 days ago' +%Y%m%d)"
+LAST30DAY="$(date -d '2 days ago' +%Y%m%d)"
+MAIN_CLASS="com.tzld.piaoquan.recommend.server.dataloader.OfflineShareSamplesLoader"
+TABLE_NAME="loghubods.alg_recsys_view_sample"
+LABEL="share_ornot"
+#OSSPATH=""
+
+
+# Train
+#mkdir -p ${MODEL_PATH}/${YESTERDAY}
+#${JAVA_PATH} -jar ${JAR_PATH} ${TABLE_NAME} ${LAST30DAY} ${YESTERDAY} ${LABEL} | ${FM_PATH}/fm_train -m ${MODEL_PATH}/${YESTERDAY}/model_${YESTERDAY}.txt -dim 0,1,0 -core 8
+
+#cat ${MODEL_PATH}/${YESTERDAY}/model_${YESTERDAY}.txt | awk -F " " '{print $1,"\t",$2}' > ${MODEL_PATH}/${YESTERDAY}/model_${YESTERDAY}_new.txt
+
+# Upload
+#${UPLOAD_PY_PATH} ${MODEL_PATH}/${YESTERDAY}/model_${YESTERDAY}_new.txt ${OSSPATH}
+
+# Predict
+java -jar ${JAR_PATH} $TABLE_NAME 20231211 20231211 ${LABEL}| ${FM_PATH}/fm_predict -m ${MODEL_PATH}/20231210/model_20231210.txt  -dim 0 -core 8 -out ${MODEL_PATH}/predict_1211.txt

+ 14 - 0
zhangbo/up.sh

@@ -0,0 +1,14 @@
+#!/bin/sh
+
+day=$1
+root_path="/root/spark-data"
+oss_hdfs_path="oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/"
+model_path="$root_path/model"
+model_online="$model_path/online"
+
+
+cat $model_path/model_ctr_$day.txt | sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' > $model_online/model_ad_ctr.txt
+
+hdfs dfs -rmr ${oss_hdfs_path}/ad_ctr_model/model_ad_ctr.txt
+
+hdfs dfs -put $model_online/model_ad_ctr.txt ${oss_hdfs_path}/ad_ctr_model/

+ 10 - 0
zhangbo/up2.sh

@@ -0,0 +1,10 @@
+root_path="/root/spark-data"
+oss_hdfs_path="oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/"
+model_path=$root_path/model
+day=$1
+
+cat /root/spark-data/model/model_share_20231216.txt | sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' > /root/spark-data/model/model_share_now.txt
+
+dfs -put /root/spark-data/model/model_share_now.txt oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/video_model
+
+hdfs dfs -put $mdoel_path/model_share_$day.txt ${oss_hdfs_path}/video_str_model