|
@@ -1,7 +1,7 @@
|
|
|
package com.aliyun.odps.spark.examples.makedata
|
|
|
|
|
|
import com.alibaba.fastjson.JSON
|
|
|
-import com.aliyun.odps.spark.examples.myUtils.{MyHdfsUtils, ParamUtils}
|
|
|
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
|
|
|
import org.apache.hadoop.io.compress.GzipCodec
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
|
|
@@ -41,49 +41,53 @@ object makedata_14_valueData_20240608 {
|
|
|
|
|
|
// 1 读取参数
|
|
|
val param = ParamUtils.parseArgs(args)
|
|
|
- val partitionPrefix = param.getOrElse("partitionPrefix", "dt=20240607,hh=00")
|
|
|
- val date = param.getOrElse("date", "20240607")
|
|
|
+ val beginStr = param.getOrElse("beginStr", "20230101")
|
|
|
+ val endStr = param.getOrElse("endStr", "20230101")
|
|
|
val readPath = param.getOrElse("readPath", "/dw/recommend/model/13_sample_data/")
|
|
|
val savePath = param.getOrElse("savePath", "/dw/recommend/model/14_feature_data/")
|
|
|
val repartition = param.getOrElse("repartition", "200").toInt
|
|
|
+ val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
|
|
|
+ for (date <- dateRange) {
|
|
|
+ val data = sc.textFile(readPath + "/" + date + "*")
|
|
|
+ val data1 = data.map(r => {
|
|
|
+ val rList = r.split("\t")
|
|
|
+ val logKey = rList(0)
|
|
|
+ val labelKey = rList(1)
|
|
|
+ val featureKey = rList(2)
|
|
|
+ (logKey, labelKey, featureKey)
|
|
|
+ }).filter(r =>
|
|
|
+ r._1.split(",")(6).equals("0")
|
|
|
+ ).mapPartitions(row => {
|
|
|
+ val result = new ArrayBuffer[String]()
|
|
|
+ val contentList = contentList_bc.value
|
|
|
+ row.foreach {
|
|
|
+ case (logKey, labelKey, featureKey) =>
|
|
|
+ val label = JSON.parseObject(labelKey).getOrDefault("is_return", "0").toString
|
|
|
+ val featureJson = JSON.parseObject(featureKey)
|
|
|
|
|
|
- val data = sc.textFile(readPath + partitionPrefix + "*")
|
|
|
- val data1 = data.map(r => {
|
|
|
- val rList = r.split("\t")
|
|
|
- val logKey = rList(0)
|
|
|
- val labelKey = rList(1)
|
|
|
- val featureKey = rList(2)
|
|
|
- (logKey, labelKey, featureKey)
|
|
|
- }).filter(r=>
|
|
|
- r._1.split(",")(6).equals("0")
|
|
|
- ).mapPartitions(row => {
|
|
|
- val result = new ArrayBuffer[String]()
|
|
|
- val contentList = contentList_bc.value
|
|
|
- row.foreach{
|
|
|
- case (logKey, labelKey, featureKey) =>
|
|
|
- val label = JSON.parseObject(labelKey).getOrDefault("is_return", "0").toString
|
|
|
- val featureJson = JSON.parseObject(featureKey)
|
|
|
+ val featureValues = contentList.map(key => {
|
|
|
+ if (featureJson.containsKey(key)) {
|
|
|
+ featureJson.getDouble(key)
|
|
|
+ } else {
|
|
|
+ 0.0
|
|
|
+ }
|
|
|
+ })
|
|
|
+ result.add(label + "\t" + featureValues.mkString(","))
|
|
|
+ }
|
|
|
+ result.iterator
|
|
|
+ })
|
|
|
|
|
|
- val featureValues = contentList.map(key=>{
|
|
|
- if (featureJson.containsKey(key)){
|
|
|
- featureJson.getDouble(key)
|
|
|
- }else{
|
|
|
- 0.0
|
|
|
- }
|
|
|
- })
|
|
|
- result.add(label + "\t" + featureValues.mkString(","))
|
|
|
+ // 4 保存数据到hdfs
|
|
|
+ val hdfsPath = savePath + "/" + date
|
|
|
+ if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
|
|
|
+ println("删除路径并开始数据写入:" + hdfsPath)
|
|
|
+ MyHdfsUtils.delete_hdfs_path(hdfsPath)
|
|
|
+ data1.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
|
|
|
+ } else {
|
|
|
+ println("路径不合法,无法写入:" + hdfsPath)
|
|
|
}
|
|
|
- result.iterator
|
|
|
- })
|
|
|
-
|
|
|
- // 4 保存数据到hdfs
|
|
|
- val hdfsPath = savePath + "/" + date
|
|
|
- if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
|
|
|
- println("删除路径并开始数据写入:" + hdfsPath)
|
|
|
- MyHdfsUtils.delete_hdfs_path(hdfsPath)
|
|
|
- data1.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
|
|
|
- } else {
|
|
|
- println("路径不合法,无法写入:" + hdfsPath)
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
}
|
|
|
}
|