zhangbo 1 rok pred
rodič
commit
f3b1a014fd

+ 23 - 22
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_01_readhdfs.scala

@@ -3,10 +3,10 @@ package com.aliyun.odps.spark.examples
 import org.apache.spark.sql.SparkSession
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
-import com.aliyun.odps.spark.examples.myUtils.ParamUtils
-import com.aliyun.odps.spark.examples.myUtils.env
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, ParamUtils, env}
 import examples.dataloader.RequestContextOffline
 import examples.dataloader.OfflineVlogShareLRFeatureExtractor
+import org.apache.hadoop.io.compress.GzipCodec
 
 import scala.collection.JavaConversions._
 
@@ -20,32 +20,35 @@ object makedata_01_readhdfs {
 
     // 1 读取参数
     val param = ParamUtils.parseArgs(args)
-    val partition = param.getOrElse("partition", "dt=20231220")
+    val partitionPrefix = param.getOrElse("partitionPrefix", "dt=")
     val tablePart = param.getOrElse("tablePart", "16").toInt
+    val beginStr = param.getOrElse("beginStr", "")
+    val endStr = param.getOrElse("endStr", "")
 
 
-    // 2 读取训练数据
+    // 2 读取odps+表信息
     val odpsOps = env.getODPS(sc)
     val project = "loghubods"
     val table = "alg_recsys_view_sample"
-    val odpsData = odpsOps.readTable(project = project,
-      table = table,
-      partition = partition,
-      transfer = read,
-      numPartition = tablePart)
-    val hdfsPath = "/dw/recommend/model/share_ratio_samples/" + partition
-    odpsData.saveAsTextFile(hdfsPath)
+
+    // 3 循环执行数据生产
+    val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
+    for (date <- dateRange) {
+      val partition = partitionPrefix + date
+      println("执行partiton:" + partition)
+      val odpsData = odpsOps.readTable(project = project,
+        table = table,
+        partition = partition,
+        transfer = func,
+        numPartition = tablePart)
+      val hdfsPath = "/dw/recommend/model/share_ratio_samples/" + partition
+      odpsData.saveAsTextFile(hdfsPath, classOf[GzipCodec])
+      println("数据写入完成:" + hdfsPath)
+    }
   }
 
-  def read(record: Record, schema: TableSchema): String = {
-    val labelName = "share_ornot"
-    val label = record.getString(labelName)
-    val newLabel = if (label == null || label == "1")
-      "0"
-    else
-      "1"
-    val result = singleParse(record)
-    result
+  def func(record: Record, schema: TableSchema): String = {
+    singleParse(record)
   }
 
   def singleParse(record: Record): String = {
@@ -59,8 +62,6 @@ object makedata_01_readhdfs {
     reqContext.putSceneFeature(record)
     val bytesFeatureExtractor = new OfflineVlogShareLRFeatureExtractor()
     bytesFeatureExtractor.makeFeature(reqContext.featureMap)
-
-
     val featureMap = bytesFeatureExtractor.featureMap
     newLabel + "\t" + featureMap.entries().map(r => r.getValue.getIdentifier + ":1").mkString("\t")
   }

+ 202 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/MyDateUtils.scala

@@ -0,0 +1,202 @@
+package com.aliyun.odps.spark.examples.myUtils
+import java.text.SimpleDateFormat
+import java.util.{Calendar, Date}
+
+import org.apache.commons.lang.time.DateUtils
+import org.apache.commons.lang3.time.DateUtils.addDays
+
+import scala.collection.mutable.ArrayBuffer
+
+object MyDateUtils {
+
+  val date_sdf = getYesterday()
+  val date_sdf_ = getYesterday_()
+  val date_sdf_full = ""
+
+
+
+
+  // 今天日期
+  def getNowDate(): String = {
+    var now: Date = new Date()
+    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd")
+    var hehe = dateFormat.format(now)
+    hehe
+  }
+  def getNowDate_(): String = {
+    var now: Date = new Date()
+    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    var hehe = dateFormat.format(now)
+    hehe
+  }
+
+  // 昨天日期
+  def getYesterday(): String = {
+    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyyMMdd")
+    var cal: Calendar = Calendar.getInstance()
+    cal.add(Calendar.DATE, -1)
+    var yesterday = dateFormat.format(cal.getTime())
+    yesterday
+  }
+  def getYesterday_(): String = {
+    var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    var cal: Calendar = Calendar.getInstance()
+    cal.add(Calendar.DATE, -1)
+    var yesterday = dateFormat.format(cal.getTime())
+    yesterday
+  }
+
+  //本周第一天的日期
+  def getNowWeekStart(): String = {
+    var period: String = ""
+    var cal: Calendar = Calendar.getInstance()
+    var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    cal.set(Calendar.DAY_OF_WEEK, Calendar.MONDAY)
+    //获取本周一的日期
+    period = df.format(cal.getTime())
+    period
+  }
+
+  // 本周末的日期
+  def getNowWeekEnd(): String = {
+    var period: String = ""
+    var cal: Calendar = Calendar.getInstance();
+    var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    cal.set(Calendar.DAY_OF_WEEK, Calendar.SUNDAY); //这种输出的是上个星期周日的日期,因为老外把周日当成第一天
+    cal.add(Calendar.WEEK_OF_YEAR, 1) // 增加一个星期,才是我们中国人的本周日的日期
+    period = df.format(cal.getTime())
+    period
+  }
+
+  // 本月的第一天
+  def getNowMonthStart(): String = {
+    var period: String = ""
+    var cal: Calendar = Calendar.getInstance();
+    var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    cal.set(Calendar.DATE, 1)
+    period = df.format(cal.getTime()) //本月第一天
+    period
+  }
+
+  // 本月最后一天
+  def getNowMonthEnd(): String = {
+    var period: String = ""
+    var cal: Calendar = Calendar.getInstance();
+    var df: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    cal.set(Calendar.DATE, 1)
+    cal.roll(Calendar.DATE, -1)
+    period = df.format(cal.getTime()) //本月最后一天
+    period
+  }
+
+  // "秒"时间戳 转 日期
+  def DateFormat(time:String):String={
+    var sdf:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
+    var date:String = sdf.format(new Date((time.toLong*1000l)))
+    date
+  }
+
+  // "秒"时间戳 转 日期
+  def DateFormat_yyyyMMdd(time:String):String={
+    var sdf:SimpleDateFormat = new SimpleDateFormat("yyyyMMdd")
+    var date:String = sdf.format(new Date((time.toLong*1000l)))
+    date
+  }
+
+  // "秒"时间戳 转 当天时间
+  def timeFormat(time:String):String={
+    var sdf:SimpleDateFormat = new SimpleDateFormat("HH:mm:ss")
+    var date:String = sdf.format(new Date((time.toLong*1000l)))
+    date
+  }
+
+  // date-time格式转成秒
+  def tranTimeToLong(tm:String) :Long={
+    val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
+    val dt = fm.parse(tm)
+    val aa = fm.format(dt)
+    val tim: Long = dt.getTime()
+    tim / 1000
+  }
+
+  // 日期格式转成秒
+  def tranTimeString_yyyyMMdd_ToLong(tm:String) :Long={
+    val fm = new SimpleDateFormat("yyyyMMdd")
+    val dt = fm.parse(tm)
+    val aa = fm.format(dt)
+    val tim: Long = dt.getTime()
+    tim / 1000
+  }
+
+  // 秒转成日期
+  def formatDateMillToMut(mill:Long)= {
+    val date = new Date(mill)
+    date
+  }
+
+  //时间推移
+  def getNumDaysBefore(dt:String,num:Int, pattern:String = "yyyyMMdd"): String ={
+    val sdf = new SimpleDateFormat(pattern)
+    val enddate= sdf.parse(dt)
+    val rightNow = Calendar.getInstance()
+    rightNow.setTime(enddate)
+    rightNow.add(Calendar.DAY_OF_YEAR,-num);//日期减30天
+    val begindate =rightNow.getTime()
+    val time_begin = sdf.format(begindate)
+    time_begin
+  }
+
+  def getNumDaysAfter(dt:String,num:Int, pattern:String = "yyyyMMdd"): String ={
+    val sdf = new SimpleDateFormat(pattern)
+    val enddate= sdf.parse(dt)
+    val rightNow = Calendar.getInstance()
+    rightNow.setTime(enddate)
+    rightNow.add(Calendar.DAY_OF_YEAR,num);//日期减30天
+    val begindate =rightNow.getTime()
+    val time_begin = sdf.format(begindate)
+    time_begin
+  }
+
+  // "20190101"转"2019-01-01"
+  def dt2Dt(dt:String) : String={
+    dt.substring(0, 4) + "-" + dt.substring(4, 6) +"-" +dt.substring(6, 8)
+  }
+
+  // 日期区间生产1:从beginStr到endDate
+  def fromBeginDate2EndDate(beginStr:String, endStr:String): Array[String] ={
+    val date_format = new SimpleDateFormat("yyyyMMdd")
+    var from = DateUtils.parseDate(beginStr, Array[String]("yyyyMMdd"))
+    val to = DateUtils.parseDate(endStr, Array[String]("yyyyMMdd"))
+    var result = new ArrayBuffer[String]()
+    while (from.compareTo(to) <= 0) {
+      val dateStr = date_format.format(from)
+      result.append(dateStr)
+      from = DateUtils.addDays(from, 1)
+    }
+    result.toArray
+  }
+  // 日期区间生产2:
+  def getDateRange(beginStr: String, endStr: String, format: String = "yyyyMMdd"): ArrayBuffer[String] = {
+    val ranges = ArrayBuffer[String]()
+    val sdf = new SimpleDateFormat(format)
+    var dateBegin = sdf.parse(beginStr)
+    var dateEnd = sdf.parse(endStr)
+    while (dateBegin.compareTo(dateEnd) <= 0) {
+      ranges += sdf.format(dateBegin)
+      dateBegin = addDays(dateBegin, 1)
+    }
+    ranges
+  }
+
+
+  def main(args: Array[String]): Unit = {
+    var from = DateUtils.parseDate("2019-09-01", Array[String]("yyyy-MM-dd"))
+    var to = DateUtils.parseDate("2019-09-10", Array[String]("yyyy-MM-dd"))
+
+    val a = from.getTime / 3600
+    val b = to.getTime / 3600
+    println(b-a)
+    //    val date = "2019-05-01"
+    //    println(dt2Dt("20190101"))
+  }
+}