Selaa lähdekoodia

去掉重复数据

jch 17 tuntia sitten
vanhempi
commit
a9a45529d6

+ 15 - 10
src/main/scala/com/aliyun/odps/spark/examples/makedata_ad/v20240718/makedata_ad_33_bucketDataFromOriginToHive_20260120.scala

@@ -151,16 +151,10 @@ object makedata_ad_33_bucketDataFromOriginToHive_20260120 {
               val adverId = record.getString("adverid")
               !filterAdverIds.contains(adverId)
             })
-            .map(record => {
-              val pqtid = record.getString("pqtid")
-              (pqtid, record)
-            })
-            .reduceByKey((a, b) => a)
-            .map(_._2)
-            .filter(record => {
-              val label = record.getString(whatLabel).toInt
-              label > 0 || Random.nextDouble() < negSampleRate
-            })
+            //            .filter(record => {
+            //              val label = record.getString(whatLabel).toInt
+            //              label > 0 || Random.nextDouble() < negSampleRate
+            //            })
             .map(record => {
               val featureMap = new JSONObject()
               val ts = record.getString("ts").toInt
@@ -660,6 +654,17 @@ object makedata_ad_33_bucketDataFromOriginToHive_20260120 {
               val labelKey = labels.toString()
               (logKey, labelKey, featureMap)
             })
+            .map { case (logKey, labelKey, jsons) =>
+              val pqtid = jsons.getString("pqtid")
+              (pqtid, (logKey, labelKey, jsons))
+            }
+            .reduceByKey((a, b) => a)
+            .map(_._2)
+            .filter { case (logKey, labelKey, jsons) =>
+              val labelObject = JSON.parseObject(labelKey)
+              val label = labelObject.getString("ad_is_conversion").toInt
+              label > 0 || Random.nextDouble() < negSampleRate
+            }
           odpsData
         }).reduce(_ union _)
         .map { case (logKey, labelKey, jsons) =>