|
@@ -5,6 +5,7 @@ import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUt
|
|
|
import examples.extractor.ExtractorUtils
|
|
|
import org.apache.hadoop.io.compress.GzipCodec
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
+import com.alibaba.fastjson.{JSON, JSONObject}
|
|
|
|
|
|
import scala.collection.JavaConversions._
|
|
|
import scala.collection.mutable.ArrayBuffer
|
|
@@ -76,10 +77,12 @@ object makedata_16_bucketData_20240609_check {
|
|
|
val logKey = rList(0)
|
|
|
val labelKey = rList(1)
|
|
|
val features = rList(2).split(",").map(_.toDouble)
|
|
|
- (logKey, labelKey, features)
|
|
|
+ val allFeature: JSONObject = if (rList(3).equals("\\N")) new JSONObject() else
|
|
|
+ JSON.parseObject(rList(3))
|
|
|
+ (logKey, labelKey, features, allFeature)
|
|
|
})
|
|
|
.filter{
|
|
|
- case (logKey, labelKey, features) =>
|
|
|
+ case (logKey, labelKey, features, allFeature) =>
|
|
|
val logKeyList = logKey.split(",")
|
|
|
val apptype = logKeyList(0)
|
|
|
val pagesource = logKeyList(1)
|
|
@@ -89,25 +92,35 @@ object makedata_16_bucketData_20240609_check {
|
|
|
ABSETS.contains(abcode) && level.equals("0")
|
|
|
}
|
|
|
.map{
|
|
|
- case (logKey, labelKey, features) =>
|
|
|
+ case (logKey, labelKey, features, allFeature) =>
|
|
|
val label = JSON.parseObject(labelKey).getOrDefault("is_return", "0").toString
|
|
|
- (label, features)
|
|
|
+ (label, features, allFeature)
|
|
|
}
|
|
|
.mapPartitions(row => {
|
|
|
val result = new ArrayBuffer[String]()
|
|
|
val contentList = contentList_br.value
|
|
|
val bucketsMap = bucketsMap_br.value
|
|
|
row.foreach{
|
|
|
- case (label, features) =>
|
|
|
+ case (label, features, allFeature) =>
|
|
|
val featuresBucket = contentList.indices.map(i =>{
|
|
|
val featureName = contentList(i)
|
|
|
val score = features(i)
|
|
|
- if (score > 1E-8){
|
|
|
- val (bucketNum, buckets) = bucketsMap(featureName)
|
|
|
- val scoreNew = 1.0 / bucketNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
|
|
|
- featureName + ":" + scoreNew.toString
|
|
|
- }else{
|
|
|
- ""
|
|
|
+ // 用户
|
|
|
+ if (featureName.startsWith("c")) {
|
|
|
+ val scoreNew = allFeature.getOrDefault(featureName, "").toString
|
|
|
+ if (scoreNew.equals("")) {
|
|
|
+ ""
|
|
|
+ } else {
|
|
|
+ featureName + ":" + scoreNew.toString
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (score > 1E-8) {
|
|
|
+ val (bucketNum, buckets) = bucketsMap(featureName)
|
|
|
+ val scoreNew = 1.0 / bucketNum * (ExtractorUtils.findInsertPosition(buckets, score).toDouble + 1.0)
|
|
|
+ featureName + ":" + scoreNew.toString
|
|
|
+ } else {
|
|
|
+ ""
|
|
|
+ }
|
|
|
}
|
|
|
}).filter(_.nonEmpty)
|
|
|
result.add(label + "\t" + featuresBucket.mkString("\t"))
|