|
@@ -7,7 +7,6 @@ import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUt
|
|
import examples.extractor.v20250218.ExtractFeature20250218
|
|
import examples.extractor.v20250218.ExtractFeature20250218
|
|
import examples.utils.{FestiveUtil, SimilarityUtils}
|
|
import examples.utils.{FestiveUtil, SimilarityUtils}
|
|
import org.apache.hadoop.io.compress.GzipCodec
|
|
import org.apache.hadoop.io.compress.GzipCodec
|
|
-import org.apache.spark.rdd.RDD
|
|
|
|
import org.apache.spark.sql.SparkSession
|
|
import org.apache.spark.sql.SparkSession
|
|
|
|
|
|
import java.time.LocalDateTime
|
|
import java.time.LocalDateTime
|
|
@@ -46,16 +45,12 @@ object makedata_recsys_41_vid_ros_train_data_20250324 {
|
|
|
|
|
|
// 3 循环执行数据生产
|
|
// 3 循环执行数据生产
|
|
val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
|
|
val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
|
|
- val partitions = timeRange.map { dt_hh =>
|
|
|
|
|
|
+ for (dt_hh <- timeRange) {
|
|
val dt = dt_hh.substring(0, 8)
|
|
val dt = dt_hh.substring(0, 8)
|
|
val hh = dt_hh.substring(8, 10)
|
|
val hh = dt_hh.substring(8, 10)
|
|
- s"dt=$dt,hh=$hh"
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- var odpsData: RDD[String] = sc.emptyRDD[String] // 初始化空RDD
|
|
|
|
- for (partition <- partitions) {
|
|
|
|
|
|
+ val partition = s"dt=$dt,hh=$hh"
|
|
println(s"开始读取分区: $partition")
|
|
println(s"开始读取分区: $partition")
|
|
- val partitionData = odpsOps.readTable(
|
|
|
|
|
|
+ val odpsData = odpsOps.readTable(
|
|
project = project,
|
|
project = project,
|
|
table = table,
|
|
table = table,
|
|
partition = partition,
|
|
partition = partition,
|
|
@@ -66,10 +61,6 @@ object makedata_recsys_41_vid_ros_train_data_20250324 {
|
|
FestiveUtil.init()
|
|
FestiveUtil.init()
|
|
p.map(record => {
|
|
p.map(record => {
|
|
val featureMap = new JSONObject()
|
|
val featureMap = new JSONObject()
|
|
- val vid = if (record.isNull("vid")) "" else record.getString("vid")
|
|
|
|
-
|
|
|
|
- val hh = record.getString("hh").toInt
|
|
|
|
-
|
|
|
|
// a 视频特征
|
|
// a 视频特征
|
|
val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else JSON.parseObject(record.getString("b1_feature"))
|
|
val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else JSON.parseObject(record.getString("b1_feature"))
|
|
val b2: JSONObject = if (record.isNull("b2_feature")) new JSONObject() else JSON.parseObject(record.getString("b2_feature"))
|
|
val b2: JSONObject = if (record.isNull("b2_feature")) new JSONObject() else JSON.parseObject(record.getString("b2_feature"))
|
|
@@ -120,17 +111,17 @@ object makedata_recsys_41_vid_ros_train_data_20250324 {
|
|
|
|
|
|
})
|
|
})
|
|
})
|
|
})
|
|
- odpsData = odpsData.union(partitionData)
|
|
|
|
- }
|
|
|
|
|
|
|
|
- // 4 保存数据到hdfs
|
|
|
|
- val hdfsPath = savePath
|
|
|
|
- if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
|
|
|
|
- println("删除路径并开始数据写入:" + hdfsPath)
|
|
|
|
- MyHdfsUtils.delete_hdfs_path(hdfsPath)
|
|
|
|
- odpsData.coalesce(repartition, shuffle = true).saveAsTextFile(hdfsPath, classOf[GzipCodec])
|
|
|
|
- } else {
|
|
|
|
- println("路径不合法,无法写入:" + hdfsPath)
|
|
|
|
|
|
+ // 4 保存数据到hdfs
|
|
|
|
+ val savePartition = dt + hh
|
|
|
|
+ val hdfsPath = savePath + "/" + savePartition
|
|
|
|
+ if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
|
|
|
|
+ println("删除路径并开始数据写入:" + hdfsPath)
|
|
|
|
+ MyHdfsUtils.delete_hdfs_path(hdfsPath)
|
|
|
|
+ odpsData.coalesce(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
|
|
|
|
+ } else {
|
|
|
|
+ println("路径不合法,无法写入:" + hdfsPath)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|