|
@@ -58,16 +58,16 @@ object makedata_ad_33_bucketData_20240718 {
|
|
for (date <- dateRange) {
|
|
for (date <- dateRange) {
|
|
println("开始执行:" + date)
|
|
println("开始执行:" + date)
|
|
val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
|
|
val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
|
|
- val rList = r.split("\t")
|
|
|
|
- val logKey = rList(0)
|
|
|
|
- val labelKey = rList(1)
|
|
|
|
- val jsons = JSON.parseObject(rList(2))
|
|
|
|
- val features = scala.collection.mutable.Map[String, Double]()
|
|
|
|
- jsons.foreach(r => {
|
|
|
|
- features.put(r._1, jsons.getDoubleValue(r._1))
|
|
|
|
- })
|
|
|
|
- (logKey, labelKey, features)
|
|
|
|
|
|
+ val rList = r.split("\t")
|
|
|
|
+ val logKey = rList(0)
|
|
|
|
+ val labelKey = rList(1)
|
|
|
|
+ val jsons = JSON.parseObject(rList(2))
|
|
|
|
+ val features = scala.collection.mutable.Map[String, Double]()
|
|
|
|
+ jsons.foreach(r => {
|
|
|
|
+ features.put(r._1, jsons.getDoubleValue(r._1))
|
|
})
|
|
})
|
|
|
|
+ (logKey, labelKey, features)
|
|
|
|
+ })
|
|
.filter{
|
|
.filter{
|
|
case (logKey, labelKey, features) =>
|
|
case (logKey, labelKey, features) =>
|
|
val logKeyList = logKey.split(",")
|
|
val logKeyList = logKey.split(",")
|
|
@@ -109,7 +109,7 @@ object makedata_ad_33_bucketData_20240718 {
|
|
result.add(label + "\t" + featuresBucket.mkString("\t"))
|
|
result.add(label + "\t" + featuresBucket.mkString("\t"))
|
|
}
|
|
}
|
|
result.iterator
|
|
result.iterator
|
|
- })
|
|
|
|
|
|
+ })
|
|
|
|
|
|
// 4 保存数据到hdfs
|
|
// 4 保存数据到hdfs
|
|
val hdfsPath = savePath + "/" + date
|
|
val hdfsPath = savePath + "/" + date
|