|
@@ -91,6 +91,10 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
|
|
.filter(record => {
|
|
.filter(record => {
|
|
AdUtil.isApi(record)
|
|
AdUtil.isApi(record)
|
|
})
|
|
})
|
|
|
|
+ .filter(record => {
|
|
|
|
+ val label = record.getString(whatLabel).toInt
|
|
|
|
+ label > 1 || Random.nextDouble() < negSampleRate
|
|
|
|
+ })
|
|
.map(record => {
|
|
.map(record => {
|
|
val ts = record.getString("ts").toInt
|
|
val ts = record.getString("ts").toInt
|
|
val cid = record.getString("cid")
|
|
val cid = record.getString("cid")
|
|
@@ -523,9 +527,8 @@ object makedata_ad_33_bucketDataFromOriginToHive_20250228 {
|
|
})
|
|
})
|
|
resultMap += ("has_conversion" -> label)
|
|
resultMap += ("has_conversion" -> label)
|
|
resultMap += ("logkey" -> logKey)
|
|
resultMap += ("logkey" -> logKey)
|
|
- (label.toInt, resultMap, Random.nextDouble())
|
|
|
|
- }.filter(r => r._3 < negSampleRate || r._1 > 0)
|
|
|
|
- .map(r => r._2).coalesce(128)
|
|
|
|
|
|
+ resultMap
|
|
|
|
+ }.coalesce(128)
|
|
|
|
|
|
val partition = s"dt=$dt"
|
|
val partition = s"dt=$dt"
|
|
odpsOps.saveToTable(project, outputTable, partition, recordRdd, write, defaultCreate = true, overwrite = true)
|
|
odpsOps.saveToTable(project, outputTable, partition, recordRdd, write, defaultCreate = true, overwrite = true)
|