|
@@ -0,0 +1,84 @@
|
|
|
+package com.com.tzld.piaoquan.ad.spark
|
|
|
+
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+import org.apache.spark.sql.functions._
|
|
|
+import org.apache.spark.ml.feature.{VectorAssembler, StringIndexer, PCA}
|
|
|
+import org.apache.spark.sql.expressions.Window
|
|
|
+
|
|
|
+object AdExposureUserGroupVisualizer {
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ // 创建SparkSession
|
|
|
+ val spark = SparkSession.builder()
|
|
|
+ .appName("Ad User PCA Analysis")
|
|
|
+ .enableHiveSupport()
|
|
|
+ .getOrCreate()
|
|
|
+
|
|
|
+ // 读取Hive表数据
|
|
|
+ val dt = "20250204" // 指定分析日期
|
|
|
+ val dataSql =
|
|
|
+ s"""
|
|
|
+ |SELECT advercode, machineinfo, extend, allfeaturemap FROM loghubods.alg_recsys_ad_sample_all
|
|
|
+ |WHERE dt = '$dt'
|
|
|
+ |AND GET_JSON_OBJECT(scoremap, '$$.ctcvrScore') is not null
|
|
|
+ |""".stripMargin
|
|
|
+ val df = spark.sql(dataSql)
|
|
|
+
|
|
|
+ // 解析JSON字段并提取brand和region
|
|
|
+ val dfParsed = df.withColumn("brand", upper(col("machineinfo").getField("brand")))
|
|
|
+ .withColumn("region", col("extend").getField("region"))
|
|
|
+ .drop("machineinfo", "extend")
|
|
|
+
|
|
|
+ val userFeatureNames = List("ctr_all", "viewAll", "e1_tags_14d_avgscore", "e1_tags_14d_maxscore", "incomeAll", "clickAll", "e2_tags_14d_avgscore", "e2_tags_14d_maxscore", "ecpm_all", "converAll", "e2_tags_7d_avgscore", "e1_tags_7d_avgscore", "e2_tags_3d_avgscore", "e1_tags_7d_maxscore", "cvr_all", "ctcvr_all", "e2_tags_7d_maxscore", "e2_tags_3d_maxscore", "e1_tags_3d_avgscore", "e1_tags_3d_maxscore",)
|
|
|
+
|
|
|
+ // 对brand和region进行数值编码
|
|
|
+ val indexerBrand = new StringIndexer().setInputCol("brand").setOutputCol("brand_index")
|
|
|
+ val indexerRegion = new StringIndexer().setInputCol("region").setOutputCol("region_index")
|
|
|
+ val dfIndexed = indexerRegion.fit(dfParsed).transform(dfParsed)
|
|
|
+ val dfFinal = indexerBrand.fit(dfIndexed).transform(dfIndexed)
|
|
|
+
|
|
|
+ // 提取allfeaturemap中的特征并填充空值
|
|
|
+ val extractFeature = userFeatureNames.map(f => coalesce(col("allfeaturemap").getItem(f), lit(0)).alias(f))
|
|
|
+ val finalFeatureColumns = List(col("advercode"), col("brand_index"), col("region_index")) ++ extractFeature
|
|
|
+ val dfFeatures = dfFinal.select(finalFeatureColumns: _*)
|
|
|
+
|
|
|
+ // 采样
|
|
|
+ val windowSpec = Window.partitionBy("advercode").orderBy(rand())
|
|
|
+ val dfSampled = dfFeatures.withColumn("row_num", row_number().over(windowSpec))
|
|
|
+ .filter(col("row_num") <= 5000)
|
|
|
+ .drop("row_num")
|
|
|
+
|
|
|
+ // 组装特征向量
|
|
|
+ val assembler = new VectorAssembler()
|
|
|
+ .setInputCols(Array("brand_index", "region_index") ++ userFeatureNames)
|
|
|
+ .setOutputCol("features")
|
|
|
+
|
|
|
+ val dfVector = assembler.transform(dfSampled)
|
|
|
+
|
|
|
+ // PCA降维到2维
|
|
|
+ val pca = new PCA().setInputCol("features").setOutputCol("pca_features").setK(2)
|
|
|
+ val dfPCA = pca.fit(dfVector).transform(dfVector)
|
|
|
+
|
|
|
+ // 选择advercode和降维后的特征
|
|
|
+ val dfOutput = dfPCA.select(col("advercode"), col("pca_features").getField("_1").alias("x"), col("pca_features").getField("_2").alias("y"), lit(0).alias("z"))
|
|
|
+
|
|
|
+ val outputHiveTable = "loghubods.tmp_ad_exposure_user_pca_vector"
|
|
|
+ spark.sql(s"""
|
|
|
+ CREATE TABLE IF NOT EXISTS $outputHiveTable (
|
|
|
+ advercode STRING,
|
|
|
+ x DOUBLE,
|
|
|
+ y DOUBLE,
|
|
|
+ z DOUBLE
|
|
|
+ ) PARTITIONED BY (version STRING)
|
|
|
+ STORED AS ALIORC
|
|
|
+ """.stripMargin)
|
|
|
+
|
|
|
+ dfOutput.createOrReplaceTempView("output_temp_view")
|
|
|
+ spark.sql(
|
|
|
+ s"""
|
|
|
+ |INSERT OVERWRITE TABLE $outputHiveTable PARTITION (version='$dt')
|
|
|
+ |SELECT * FROM output_temp_view
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+}
|