|
@@ -463,69 +463,70 @@ object makedata_31_bucketDataPrint_20240821 {
|
|
println("路径不合法,无法写入:" + hdfsPath)
|
|
println("路径不合法,无法写入:" + hdfsPath)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- val data2 = sc.textFile(savePath + "/" + readDate + "*").mapPartitions(row => {
|
|
|
|
- val result = new ArrayBuffer[(String, List[String], List[String], List[String])]()
|
|
|
|
- val contentList = contentList_br.value
|
|
|
|
- // 680实验,517个特征
|
|
|
|
- row.foreach(r => {
|
|
|
|
- val rList = r.split("\t")
|
|
|
|
- val label = rList(5).toString
|
|
|
|
- val allFeatureMap = JSON.parseObject(rList(6)).toMap.map(r => (r._1, r._2.toString))
|
|
|
|
- val offlineFeature = rList(7).split(",").map(r => (r.split(":")(0), r.split(":")(1))).toMap
|
|
|
|
- val offlineFeatureList = contentList.map(name => {
|
|
|
|
- name + ":" + offlineFeature(name)
|
|
|
|
- }).filter(_.nonEmpty)
|
|
|
|
-
|
|
|
|
- val allFeatureV1 = allFeatureMap.map {
|
|
|
|
- case (key, value) =>
|
|
|
|
- key + ":" + value
|
|
|
|
- }.toList
|
|
|
|
-
|
|
|
|
- val allFeatureV2 = allFeatureMap.map {
|
|
|
|
- case (key, value) =>
|
|
|
|
- val b9FeatureSet = Set("b9_1h_ctr", "b9_1h_ctcvr", "b9_1h_cvr", "b9_1h_conver", "b9_1h_click", "b9_1h_conver*log(view)", "b9_1h_conver*ctcvr", "b9_2h_ctr", "b9_2h_ctcvr", "b9_2h_cvr", "b9_2h_conver", "b9_2h_click", "b9_2h_conver*log(view)", "b9_2h_conver*ctcvr", "b9_3h_ctr", "b9_3h_ctcvr", "b9_3h_cvr", "b9_3h_conver", "b9_3h_click", "b9_3h_conver*log(view)", "b9_3h_conver*ctcvr", "b9_6h_ctr", "b9_6h_ctcvr", "b9_6h_cvr", "b9_6h_conver", "b9_6h_click", "b9_6h_conver*log(view)", "b9_6h_conver*ctcvr", "b9_12h_ctr", "b9_12h_ctcvr", "b9_12h_cvr", "b9_12h_conver", "b9_12h_click", "b9_12h_conver*log(view)", "b9_12h_conver*ctcvr", "b9_1d_ctr", "b9_1d_ctcvr", "b9_1d_cvr", "b9_1d_conver", "b9_1d_click", "b9_1d_conver*log(view)", "b9_1d_conver*ctcvr", "b9_3d_ctr", "b9_3d_ctcvr", "b9_3d_cvr", "b9_3d_conver", "b9_3d_click", "b9_3d_conver*log(view)", "b9_3d_conver*ctcvr", "b9_7d_ctr", "b9_7d_ctcvr", "b9_7d_cvr", "b9_7d_conver", "b9_7d_click", "b9_7d_conver*log(view)", "b9_7d_conver*ctcvr", "b9_yesterday_ctr", "b9_yesterday_ctcvr", "b9_yesterday_cvr", "b9_yesterday_conver", "b9_yesterday_click", "b9_yesterday_conver*log(view)", "b9_yesterday_conver*ctcvr", "b9_today_ctr", "b9_today_ctcvr", "b9_today_cvr", "b9_today_conver", "b9_today_click", "b9_today_conver*log(view)", "b9_today_conver*ctcvr")
|
|
|
|
- if (b9FeatureSet.contains(key) && offlineFeature.contains(key)) {
|
|
|
|
- key + ":" + offlineFeature(key)
|
|
|
|
- } else {
|
|
|
|
- key + ":" + value
|
|
|
|
- }
|
|
|
|
- }.filter(_.nonEmpty).toList
|
|
|
|
|
|
|
|
- result.add((label, offlineFeatureList, allFeatureV1, allFeatureV2))
|
|
|
|
- })
|
|
|
|
|
|
+ val data2 = sc.textFile(savePath + "/" + readDate + "*").mapPartitions(row => {
|
|
|
|
+ val result = new ArrayBuffer[(String, List[String], List[String], List[String])]()
|
|
|
|
+ val contentList = contentList_br.value
|
|
|
|
+ // 680实验,517个特征
|
|
|
|
+ row.foreach(r => {
|
|
|
|
+ val rList = r.split("\t")
|
|
|
|
+ val label = rList(5).toString
|
|
|
|
+ val allFeatureMap = JSON.parseObject(rList(6)).toMap.map(r => (r._1, r._2.toString))
|
|
|
|
+ val offlineFeature = rList(7).split(",").map(r => (r.split(":")(0), r.split(":")(1))).toMap
|
|
|
|
+ val offlineFeatureList = contentList.map(name => {
|
|
|
|
+ name + ":" + offlineFeature(name)
|
|
|
|
+ }).filter(_.nonEmpty)
|
|
|
|
+
|
|
|
|
+ val allFeatureV1 = allFeatureMap.map {
|
|
|
|
+ case (key, value) =>
|
|
|
|
+ key + ":" + value
|
|
|
|
+ }.toList
|
|
|
|
+
|
|
|
|
+ val allFeatureV2 = allFeatureMap.map {
|
|
|
|
+ case (key, value) =>
|
|
|
|
+ val b9FeatureSet = Set("b9_1h_ctr", "b9_1h_ctcvr", "b9_1h_cvr", "b9_1h_conver", "b9_1h_click", "b9_1h_conver*log(view)", "b9_1h_conver*ctcvr", "b9_2h_ctr", "b9_2h_ctcvr", "b9_2h_cvr", "b9_2h_conver", "b9_2h_click", "b9_2h_conver*log(view)", "b9_2h_conver*ctcvr", "b9_3h_ctr", "b9_3h_ctcvr", "b9_3h_cvr", "b9_3h_conver", "b9_3h_click", "b9_3h_conver*log(view)", "b9_3h_conver*ctcvr", "b9_6h_ctr", "b9_6h_ctcvr", "b9_6h_cvr", "b9_6h_conver", "b9_6h_click", "b9_6h_conver*log(view)", "b9_6h_conver*ctcvr", "b9_12h_ctr", "b9_12h_ctcvr", "b9_12h_cvr", "b9_12h_conver", "b9_12h_click", "b9_12h_conver*log(view)", "b9_12h_conver*ctcvr", "b9_1d_ctr", "b9_1d_ctcvr", "b9_1d_cvr", "b9_1d_conver", "b9_1d_click", "b9_1d_conver*log(view)", "b9_1d_conver*ctcvr", "b9_3d_ctr", "b9_3d_ctcvr", "b9_3d_cvr", "b9_3d_conver", "b9_3d_click", "b9_3d_conver*log(view)", "b9_3d_conver*ctcvr", "b9_7d_ctr", "b9_7d_ctcvr", "b9_7d_cvr", "b9_7d_conver", "b9_7d_click", "b9_7d_conver*log(view)", "b9_7d_conver*ctcvr", "b9_yesterday_ctr", "b9_yesterday_ctcvr", "b9_yesterday_cvr", "b9_yesterday_conver", "b9_yesterday_click", "b9_yesterday_conver*log(view)", "b9_yesterday_conver*ctcvr", "b9_today_ctr", "b9_today_ctcvr", "b9_today_cvr", "b9_today_conver", "b9_today_click", "b9_today_conver*log(view)", "b9_today_conver*ctcvr")
|
|
|
|
+ if (b9FeatureSet.contains(key) && offlineFeature.contains(key)) {
|
|
|
|
+ key + ":" + offlineFeature(key)
|
|
|
|
+ } else {
|
|
|
|
+ key + ":" + value
|
|
|
|
+ }
|
|
|
|
+ }.filter(_.nonEmpty).toList
|
|
|
|
|
|
- result.iterator
|
|
|
|
|
|
+ result.add((label, offlineFeatureList, allFeatureV1, allFeatureV2))
|
|
})
|
|
})
|
|
|
|
|
|
- val offlineSave = "/dw/recommend/model/33_for_check_offline/" + readDate
|
|
|
|
- if (offlineSave.nonEmpty && offlineSave.startsWith("/dw/recommend/model/")) {
|
|
|
|
- println("删除路径并开始数据写入:" + offlineSave)
|
|
|
|
- MyHdfsUtils.delete_hdfs_path(offlineSave)
|
|
|
|
- data2.map(r => r._1 + "\t" + r._2.mkString("\t")).saveAsTextFile(offlineSave, classOf[GzipCodec])
|
|
|
|
- } else {
|
|
|
|
- println("路径不合法,无法写入:" + offlineSave)
|
|
|
|
- }
|
|
|
|
|
|
+ result.iterator
|
|
|
|
+ })
|
|
|
|
|
|
- val allFeatureV1 = "/dw/recommend/model/33_for_check_all_v1/" + readDate
|
|
|
|
- if (allFeatureV1.nonEmpty && allFeatureV1.startsWith("/dw/recommend/model/")) {
|
|
|
|
- println("删除路径并开始数据写入:" + allFeatureV1)
|
|
|
|
- MyHdfsUtils.delete_hdfs_path(allFeatureV1)
|
|
|
|
- data2.map(r => r._1 + "\t" + r._3.mkString("\t")).saveAsTextFile(allFeatureV1, classOf[GzipCodec])
|
|
|
|
- } else {
|
|
|
|
- println("路径不合法,无法写入:" + allFeatureV1)
|
|
|
|
- }
|
|
|
|
|
|
+ val offlineSave = "/dw/recommend/model/33_for_check_offline/" + readDate
|
|
|
|
+ if (offlineSave.nonEmpty && offlineSave.startsWith("/dw/recommend/model/")) {
|
|
|
|
+ println("删除路径并开始数据写入:" + offlineSave)
|
|
|
|
+ MyHdfsUtils.delete_hdfs_path(offlineSave)
|
|
|
|
+ data2.map(r => r._1 + "\t" + r._2.mkString("\t")).saveAsTextFile(offlineSave, classOf[GzipCodec])
|
|
|
|
+ } else {
|
|
|
|
+ println("路径不合法,无法写入:" + offlineSave)
|
|
|
|
+ }
|
|
|
|
|
|
- val allFeatureV2 = "/dw/recommend/model/33_for_check_all_v2/" + readDate
|
|
|
|
- if (allFeatureV2.nonEmpty && allFeatureV2.startsWith("/dw/recommend/model/")) {
|
|
|
|
- println("删除路径并开始数据写入:" + allFeatureV2)
|
|
|
|
- MyHdfsUtils.delete_hdfs_path(allFeatureV2)
|
|
|
|
- data2.map(r => r._1 + "\t" + r._4.mkString("\t")).saveAsTextFile(allFeatureV2, classOf[GzipCodec])
|
|
|
|
- } else {
|
|
|
|
- println("路径不合法,无法写入:" + allFeatureV2)
|
|
|
|
- }
|
|
|
|
|
|
+ val allFeatureV1 = "/dw/recommend/model/33_for_check_all_v1/" + readDate
|
|
|
|
+ if (allFeatureV1.nonEmpty && allFeatureV1.startsWith("/dw/recommend/model/")) {
|
|
|
|
+ println("删除路径并开始数据写入:" + allFeatureV1)
|
|
|
|
+ MyHdfsUtils.delete_hdfs_path(allFeatureV1)
|
|
|
|
+ data2.map(r => r._1 + "\t" + r._3.mkString("\t")).saveAsTextFile(allFeatureV1, classOf[GzipCodec])
|
|
|
|
+ } else {
|
|
|
|
+ println("路径不合法,无法写入:" + allFeatureV1)
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ val allFeatureV2 = "/dw/recommend/model/33_for_check_all_v2/" + readDate
|
|
|
|
+ if (allFeatureV2.nonEmpty && allFeatureV2.startsWith("/dw/recommend/model/")) {
|
|
|
|
+ println("删除路径并开始数据写入:" + allFeatureV2)
|
|
|
|
+ MyHdfsUtils.delete_hdfs_path(allFeatureV2)
|
|
|
|
+ data2.map(r => r._1 + "\t" + r._4.mkString("\t")).saveAsTextFile(allFeatureV2, classOf[GzipCodec])
|
|
|
|
+ } else {
|
|
|
|
+ println("路径不合法,无法写入:" + allFeatureV2)
|
|
}
|
|
}
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
def func(record: Record, schema: TableSchema): Record = {
|
|
def func(record: Record, schema: TableSchema): Record = {
|