|
@@ -5,6 +5,7 @@ import examples.extractor.ExtractorUtils
|
|
|
import org.apache.hadoop.io.compress.GzipCodec
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
|
|
|
+import com.alibaba.fastjson.JSON
|
|
|
import scala.collection.JavaConversions._
|
|
|
import scala.collection.mutable.ArrayBuffer
|
|
|
import scala.io.Source
|
|
@@ -70,10 +71,24 @@ object makedata_16_bucketData_20240609 {
|
|
|
println("开始执行:" + date)
|
|
|
val data = sc.textFile(readPath + date).map(r=>{
|
|
|
val rList = r.split("\t")
|
|
|
- val label = rList(0)
|
|
|
- val features = rList(1).split(",").map(_.toDouble)
|
|
|
- (label, features)
|
|
|
- }).mapPartitions(row => {
|
|
|
+ val logKey = rList(0)
|
|
|
+ val labelKey = rList(1)
|
|
|
+ val features = rList(2).split(",").map(_.toDouble)
|
|
|
+ (logKey, labelKey, features)
|
|
|
+ })
|
|
|
+ .filter{
|
|
|
+ case (logKey, labelKey, features) =>
|
|
|
+ val logKeyList = logKey.split(",")
|
|
|
+ val apptype = logKeyList(0)
|
|
|
+ val pagesource = logKeyList(1)
|
|
|
+ Set("0", "4", "5", "21", "3", "6").contains(apptype) && pagesource.endsWith("recommend")
|
|
|
+ }
|
|
|
+ .map{
|
|
|
+ case (logKey, labelKey, features) =>
|
|
|
+ val label = JSON.parseObject(labelKey).getOrDefault("is_return", "0").toString
|
|
|
+ (label, features)
|
|
|
+ }
|
|
|
+ .mapPartitions(row => {
|
|
|
val result = new ArrayBuffer[String]()
|
|
|
val contentList = contentList_br.value
|
|
|
val bucketsMap = bucketsMap_br.value
|