|
@@ -10,6 +10,8 @@ import org.apache.spark.sql.SparkSession
|
|
|
import scala.collection.JavaConversions._
|
|
|
import scala.collection.mutable.ArrayBuffer
|
|
|
import scala.io.Source
|
|
|
+import scala.math.random
|
|
|
+import scala.util.Random
|
|
|
|
|
|
/*
|
|
|
|
|
@@ -33,6 +35,7 @@ object makedata_ad_33_bucketData_20240729 {
|
|
|
val repartition = param.getOrElse("repartition", "100").toInt
|
|
|
val filterNames = param.getOrElse("filterNames", "").split(",").toSet
|
|
|
val whatLabel = param.getOrElse("whatLabel", "ad_is_conversion")
|
|
|
+ val cidSet = param.getOrElse("cidSet", "cid_3319,cid_3024").split(",").toSet
|
|
|
val cidCountThreshold = param.getOrElse("cidCountThreshold", "20000").toInt
|
|
|
|
|
|
val loader = getClass.getClassLoader
|
|
@@ -50,18 +53,17 @@ object makedata_ad_33_bucketData_20240729 {
|
|
|
val bucketsMap = buckets.split("\n")
|
|
|
.map(r => r.replace(" ", "").replaceAll("\n", ""))
|
|
|
.filter(r => r.nonEmpty)
|
|
|
- .map(r =>{
|
|
|
+ .map(r => {
|
|
|
val rList = r.split("\t")
|
|
|
(rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
|
|
|
}).toMap
|
|
|
val bucketsMap_br = sc.broadcast(bucketsMap)
|
|
|
|
|
|
val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
|
|
|
-
|
|
|
val cidCountMap = scala.collection.mutable.Map[String, Int]()
|
|
|
for (date <- dateRange) {
|
|
|
println("开始执行:" + date)
|
|
|
- val data = sc.textFile(readPath + "/" + date + "*").map(r=>{
|
|
|
+ val data = sc.textFile(readPath + "/" + date + "*").map(r => {
|
|
|
val rList = r.split("\t")
|
|
|
val logKey = rList(0)
|
|
|
val labelKey = rList(1)
|
|
@@ -72,23 +74,34 @@ object makedata_ad_33_bucketData_20240729 {
|
|
|
})
|
|
|
(logKey, labelKey, features)
|
|
|
})
|
|
|
- .filter{
|
|
|
+ .filter {
|
|
|
case (logKey, labelKey, features) =>
|
|
|
val logKeyList = logKey.split(",")
|
|
|
val apptype = logKeyList(0)
|
|
|
!Set("12", "13").contains(apptype)
|
|
|
- }.filter{
|
|
|
+ }.filter {
|
|
|
case (logKey, labelKey, features) =>
|
|
|
var key = ""
|
|
|
for (elem <- features) {
|
|
|
- if (elem._1.contains("cid_")){
|
|
|
+ if (elem._1.contains("cid_")) {
|
|
|
key = elem._1
|
|
|
}
|
|
|
}
|
|
|
- val count = cidCountMap.getOrElse(key, 0) + 1
|
|
|
- cidCountMap.put(key, count)
|
|
|
- count < cidCountThreshold
|
|
|
- }.map{
|
|
|
+
|
|
|
+ if (key.equals("cid_3319")) {
|
|
|
+ true
|
|
|
+ } else if (key.equals("cid_3024")) {
|
|
|
+ // 创建一个Random实例
|
|
|
+ val rand = new Random()
|
|
|
+
|
|
|
+ // 生成一个0到1之间的随机浮点数
|
|
|
+ val randomDouble = rand.nextDouble()
|
|
|
+
|
|
|
+ randomDouble < 0.01
|
|
|
+ } else {
|
|
|
+ false
|
|
|
+ }
|
|
|
+ }.map {
|
|
|
case (logKey, labelKey, features) =>
|
|
|
val label = JSON.parseObject(labelKey).getOrDefault(whatLabel, "0").toString
|
|
|
(label, features)
|
|
@@ -96,17 +109,19 @@ object makedata_ad_33_bucketData_20240729 {
|
|
|
.mapPartitions(row => {
|
|
|
val result = new ArrayBuffer[String]()
|
|
|
val bucketsMap = bucketsMap_br.value
|
|
|
- row.foreach{
|
|
|
+ row.foreach {
|
|
|
case (label, features) =>
|
|
|
- val featuresBucket = features.map{
|
|
|
+ val featuresBucket = features.map {
|
|
|
case (name, score) =>
|
|
|
var ifFilter = false
|
|
|
- if (filterNames.nonEmpty){
|
|
|
- filterNames.foreach(r=> if (!ifFilter && name.contains(r)) {ifFilter = true} )
|
|
|
+ if (filterNames.nonEmpty) {
|
|
|
+ filterNames.foreach(r => if (!ifFilter && name.contains(r)) {
|
|
|
+ ifFilter = true
|
|
|
+ })
|
|
|
}
|
|
|
- if (ifFilter){
|
|
|
+ if (ifFilter) {
|
|
|
""
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
if (score > 1E-8) {
|
|
|
if (bucketsMap.contains(name)) {
|
|
|
val (bucketsNum, buckets) = bucketsMap(name)
|
|
@@ -137,6 +152,5 @@ object makedata_ad_33_bucketData_20240729 {
|
|
|
}
|
|
|
|
|
|
|
|
|
-
|
|
|
}
|
|
|
}
|