zhangbo 8 mesiacov pred
rodič
commit
dfc061d30e

+ 246 - 0
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/MyDateUtils.scala

@@ -0,0 +1,246 @@
+package com.tzld.piaoquan.recommend.model
+
+import org.apache.commons.lang.time.DateUtils
+import org.apache.commons.lang3.time.DateUtils.addDays
+
+import java.text.SimpleDateFormat
+import java.util.{Calendar, Date}
+import scala.collection.mutable.ArrayBuffer
+
+object MyDateUtils {
+
+  val date_sdf = getYesterday()
+  val date_sdf_ = getYesterday_()
+  val date_sdf_full = ""
+
+
+
+
+  // 今天日期
+  def getNowDate(): String = {
+    var now: Date = new Date()
+    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd")
+    var hehe = dateFormat.format(now)
+    hehe
+  }
+  def getNowDate_(): String = {
+    var now: Date = new Date()
+    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    var hehe = dateFormat.format(now)
+    hehe
+  }
+
+  // 昨天日期
+  def getYesterday(): String = {
+    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd")
+    var cal: Calendar = Calendar.getInstance()
+    cal.add(Calendar.DATE, -1)
+    var yesterday = dateFormat.format(cal.getTime())
+    yesterday
+  }
+  def getYesterday_(): String = {
+    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    var cal: Calendar = Calendar.getInstance()
+    cal.add(Calendar.DATE, -1)
+    var yesterday = dateFormat.format(cal.getTime())
+    yesterday
+  }
+
+  //本周第一天的日期
+  def getNowWeekStart(): String = {
+    var period: String = ""
+    var cal: Calendar = Calendar.getInstance()
+    var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    cal.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY)
+    //获取本周一的日期
+    period = df.format(cal.getTime())
+    period
+  }
+
+  // 本周末的日期
+  def getNowWeekEnd(): String = {
+    var period: String = ""
+    var cal: Calendar = Calendar.getInstance();
+    var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    cal.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY); //这种输出的是上个星期周日的日期,因为老外把周日当成第一天
+    cal.add(Calendar.WEEK_OF_YEAR, 1) // 增加一个星期,才是我们中国人的本周日的日期
+    period = df.format(cal.getTime())
+    period
+  }
+
+  // 本月的第一天
+  def getNowMonthStart(): String = {
+    var period: String = ""
+    var cal: Calendar = Calendar.getInstance();
+    var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    cal.set(Calendar.DATE, 1)
+    period = df.format(cal.getTime()) //本月第一天
+    period
+  }
+
+  // 本月最后一天
+  def getNowMonthEnd(): String = {
+    var period: String = ""
+    var cal: Calendar = Calendar.getInstance();
+    var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    cal.set(Calendar.DATE, 1)
+    cal.roll(Calendar.DATE, -1)
+    period = df.format(cal.getTime()) //本月最后一天
+    period
+  }
+
+  // "秒"时间戳 转 日期
+  def DateFormat(time:String):String={
+    var sdf:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    var date:String = sdf.format(new Date((time.toLong*1000l)))
+    date
+  }
+
+  // "秒"时间戳 转 日期
+  def DateFormat_yyyyMMdd(time:String):String={
+    var sdf:SimpleDateFormat = new SimpleDateFormat("yyyyMMdd")
+    var date:String = sdf.format(new Date((time.toLong*1000l)))
+    date
+  }
+
+  // "秒"时间戳 转 当天时间
+  def timeFormat(time:String):String={
+    var sdf:SimpleDateFormat = new SimpleDateFormat("HH:mm:ss")
+    var date:String = sdf.format(new Date((time.toLong*1000l)))
+    date
+  }
+
+  // date-time格式转成秒
+  def tranTimeToLong(tm:String) :Long={
+    val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+    val dt = fm.parse(tm)
+    val aa = fm.format(dt)
+    val tim: Long = dt.getTime()
+    tim / 1000
+  }
+
+  // 日期格式转成秒
+  def tranTimeString_yyyyMMdd_ToLong(tm:String) :Long={
+    val fm = new SimpleDateFormat("yyyyMMdd")
+    val dt = fm.parse(tm)
+    val aa = fm.format(dt)
+    val tim: Long = dt.getTime()
+    tim / 1000
+  }
+
+  // 秒转成日期
+  def formatDateMillToMut(mill:Long)= {
+    val date = new Date(mill)
+    date
+  }
+
+  //时间推移
+  def getNumDaysBefore(dt:String,num:Int, pattern:String = "yyyyMMdd"): String ={
+    val sdf = new SimpleDateFormat(pattern)
+    val enddate= sdf.parse(dt)
+    val rightNow = Calendar.getInstance()
+    rightNow.setTime(enddate)
+    rightNow.add(Calendar.DAY_OF_YEAR,-num);//日期减30天
+    val begindate =rightNow.getTime()
+    val time_begin = sdf.format(begindate)
+    time_begin
+  }
+
+  def getNumDaysAfter(dt:String,num:Int, pattern:String = "yyyyMMdd"): String ={
+    val sdf = new SimpleDateFormat(pattern)
+    val enddate= sdf.parse(dt)
+    val rightNow = Calendar.getInstance()
+    rightNow.setTime(enddate)
+    rightNow.add(Calendar.DAY_OF_YEAR,num);//日期减30天
+    val begindate =rightNow.getTime()
+    val time_begin = sdf.format(begindate)
+    time_begin
+  }
+
+  // "20190101"转"2019-01-01"
+  def dt2Dt(dt:String) : String={
+    dt.substring(0, 4) + "-" + dt.substring(4, 6) +"-" +dt.substring(6, 8)
+  }
+
+  // 日期区间生产1:从beginStr到endDate
+  def fromBeginDate2EndDate(beginStr:String, endStr:String): Array[String] ={
+    val date_format = new SimpleDateFormat("yyyyMMdd")
+    var from = DateUtils.parseDate(beginStr, Array[String]("yyyyMMdd"))
+    val to = DateUtils.parseDate(endStr, Array[String]("yyyyMMdd"))
+    var result = new ArrayBuffer[String]()
+    while (from.compareTo(to) <= 0) {
+      val dateStr = date_format.format(from)
+      result.append(dateStr)
+      from = DateUtils.addDays(from, 1)
+    }
+    result.toArray
+  }
+  // 日期区间生产2:
+  def getDateRange(beginStr: String, endStr: String, format: String = "yyyyMMdd"): ArrayBuffer[String] = {
+    val ranges = ArrayBuffer[String]()
+    val sdf = new SimpleDateFormat(format)
+    var dateBegin = sdf.parse(beginStr)
+    var dateEnd = sdf.parse(endStr)
+    while (dateBegin.compareTo(dateEnd) <= 0) {
+      ranges += sdf.format(dateBegin)
+      dateBegin = addDays(dateBegin, 1)
+    }
+    ranges
+  }
+
+  // 日期+小时 时间区间生成
+  def getDateHourRange(beginStr: String, endStr: String, format: String = "yyyyMMddHH"): ArrayBuffer[String] = {
+    val ranges = ArrayBuffer[String]()
+    val sdf = new SimpleDateFormat(format)
+    var dateBegin = sdf.parse(beginStr)
+    val dateEnd = sdf.parse(endStr)
+
+    while (dateBegin.compareTo(dateEnd) <= 0) {
+      ranges += sdf.format(dateBegin)
+      // 将开始时间增加一小时
+      dateBegin = addHours(dateBegin, 1)
+    }
+    ranges
+  }
+
+  import java.util.Date
+
+  // 辅助函数,用于给定的日期增加小时
+  def addHours(date: Date, hours: Int): Date = {
+    val cal = Calendar.getInstance()
+    cal.setTime(date)
+    cal.add(java.util.Calendar.HOUR_OF_DAY, hours)
+    cal.getTime
+  }
+
+  import java.time.LocalDate
+  import java.time.temporal.ChronoUnit
+  def calculateDateDifference(startDate: String, endDate: String): Long = {
+    val start = LocalDate.parse(startDate, java.time.format.DateTimeFormatter.BASIC_ISO_DATE)
+    val end = LocalDate.parse(endDate, java.time.format.DateTimeFormatter.BASIC_ISO_DATE)
+    val days = ChronoUnit.DAYS.between(start, end)
+    days
+  }
+
+
+  def main(args: Array[String]): Unit = {
+//    var from = DateUtils.parseDate("2019-09-01", Array[String]("yyyy-MM-dd"))
+//    var to = DateUtils.parseDate("2019-09-10", Array[String]("yyyy-MM-dd"))
+//
+//    val a = from.getTime / 3600
+//    val b = to.getTime / 3600
+//    println(b-a)
+
+    var from = getDateHourRange("2024050123", "2024050203")
+    from.foreach(println)
+
+    val partitionPrefix = "dt={},hh={}"
+    println(partitionPrefix.stripMargin.format("XX", "YY"))
+
+    val stdxx = "2024050116"
+    val dt = stdxx.substring(0, 8)
+    val hh = stdxx.substring(8, 10)
+    println(dt)
+    println(hh)
+  }
+}

+ 148 - 0
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/MyHdfsUtils.scala

@@ -0,0 +1,148 @@
+package com.tzld.piaoquan.recommend.model
+
+/**
+ * Author: zhangbo58
+ * Description:
+ *
+ */
+import org.apache.commons.lang.time.DateUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
+
+import scala.collection.mutable.ArrayBuffer
+
+object MyHdfsUtils {
+  def main(args: Array[String]): Unit = {
+    val path = "zhangbo58/"
+    //生成FileSystem
+    println("获取目录下的一级文件和目录")
+    getFilesAndDirs(path).foreach(println)
+    println("获取目录下的一级文件")
+    getFiles(path).foreach(println)
+    println("获取目录下的一级目录")
+    getDirs(path).foreach(println)
+    println("获取目录下所有文件")
+    getAllFiles(path).foreach(println)
+  }
+
+  def getHdfs(path: String): FileSystem = {
+    val conf = new Configuration()
+    //    FileSystem.get(URI.create(path), conf)
+    val fs = org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration())
+    fs
+  }
+
+  //获取目录下的一级文件和目录
+  def getFilesAndDirs(path: String): Array[Path] = {
+    val fs = getHdfs(path).listStatus(new Path(path))
+    FileUtil.stat2Paths(fs)
+  }
+  //获取目录下的一级文件
+  def getFiles(path: String): Array[String] = {
+    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isFile())
+      .map(_.toString)
+  }
+  //获取目录下的一级目录
+  def getDirs(path: String): Array[String] = {
+    getFilesAndDirs(path).filter(getHdfs(path).getFileStatus(_).isDirectory)
+      .map(_.toString)
+  }
+  //获取目录下的所有文件
+  def getAllFiles(path: String): ArrayBuffer[String] = {
+    val arr = ArrayBuffer[String]()
+    val hdfs = getHdfs(path)
+    val getPath = getFilesAndDirs(path)
+    getPath.foreach(patha => {
+      if (hdfs.getFileStatus(patha).isFile())
+        arr += patha.toString
+      else {
+        arr ++= getAllFiles(patha.toString())
+      }
+    })
+    arr
+  }
+  def ifHDFSHasData(path: String): Boolean = {
+    val hdfs_path = new org.apache.hadoop.fs.Path(path.toString)
+    val hdfs = org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration())
+
+    var rst = false
+    if (hdfs.exists(hdfs_path)) {
+      //路径存在且不为空
+      val statusList = hdfs.listStatus(hdfs_path)
+      for (status <- statusList if !rst && (status.getPath.toString.contains("part-") || status.getPath.toString.contains("_SUCCESS"))) {
+        if (status.getLen > 0) {
+          rst = true
+        }
+      }
+    }
+    rst
+  }
+
+  def delete_hdfs_path(path: String): Unit = {
+    val hdfs_path = new org.apache.hadoop.fs.Path(path.toString)
+    val hdfs = org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration())
+
+    if (hdfs.exists(hdfs_path)) {
+      hdfs.delete(hdfs_path, true)
+    }
+  }
+
+  def hdfs_exits(path:String): Boolean = {
+    val hdfs_path = new org.apache.hadoop.fs.Path(path.toString)
+    val hdfs = org.apache.hadoop.fs.FileSystem.get(new org.apache.hadoop.conf.Configuration())
+
+    hdfs.exists(hdfs_path)
+  }
+
+
+  /* 删除某路径下,不在某段时间内的所有数据。
+  * 举例:keepDays=2 dateStr=20191015 => 保留20191015和20191014两天的数据
+  */
+  def hdfs_delete_not_keep_days(
+                                 path:String,
+                                 keepDays:Int,
+                                 dateStr:String,
+                                 pattern:String = "yyyyMMdd"
+                               ): Unit ={
+    val file_list = this.getFiles(path)
+    println("hdfs_delete_not_keep_days-file_list")
+    file_list.foreach(println)
+
+    for (file <- file_list){
+      var flag = true
+      val date_early = MyDateUtils.getNumDaysBefore(dateStr, keepDays, pattern)
+      try{
+        val file_split_strs = file.split("/")
+        val len = file_split_strs.length
+        var file_date = file_split_strs(len-1)
+        if (file_date.equals("")){
+          file_date = file_split_strs(len-2)
+        }
+        var date1 = DateUtils.parseDate(file_date, Array[String](pattern)) // 文件中的日期
+        var date2 = DateUtils.parseDate(date_early, Array[String](pattern)) // 请求的日期前keepDays的日期
+        if (date1.compareTo(date2) >= 0){ // 这个日期之前的数据 全部删除
+          flag = false
+        }
+      }catch {
+        case e:Exception =>
+          flag = false
+      }
+
+      if (flag){
+        MyHdfsUtils.delete_hdfs_path(file.toString)
+      }
+    }
+  }
+
+  /**
+   * @Author: zhangbo
+   * @Description: 给某hdfs路径加权限
+   *
+   */
+
+  def give_hdfs_permission(path:String): Unit ={
+    getHdfs(path).setPermission(new Path(path), new FsPermission("777"))
+  }
+}
+

+ 10 - 1
recommend-model-produce/src/main/scala/com/tzld/piaoquan/recommend/model/train_01_xgb_ad_20240808.scala

@@ -3,6 +3,7 @@ package com.tzld.piaoquan.recommend.model
 import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
 import org.apache.commons.lang.math.NumberUtils
 import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
 import org.apache.spark.ml.feature.VectorAssembler
 import org.apache.spark.rdd.RDD
@@ -83,7 +84,15 @@ object train_01_xgb_ad_20240808{
       .map(r =>{
         (r.get(0), r.get(1), r.get(2), r.get(3), r.get(4)).productIterator.mkString("\t")
     })
-    saveData.repartition(1).saveAsTextFile("/dw/recommend/model/checkpoint_xgbtest")
+    val hdfsPath = "/dw/recommend/model/checkpoint_xgbtest"
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      saveData.repartition(100).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
+    }
+
 
 
     val evaluator = new BinaryClassificationEvaluator()