Sfoglia il codice sorgente

feat:添加分析脚本

zhaohaipeng 2 mesi fa
parent
commit
1b50fda442

+ 63 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/makedata_recsys_45_feature_cover_degree.scala

@@ -0,0 +1,63 @@
+package com.aliyun.odps.spark.examples.makedata_recsys
+
+import com.aliyun.odps.spark.examples.myUtils.{FileUtil, ParamUtils}
+import org.apache.spark.sql.SparkSession
+
+object makedata_recsys_45_feature_cover_degree {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/41_recsys_origin_date/*")
+    val featureNameFile = param.getOrElse("featureNameFile", "feature_name_20250218.txt")
+
+    val resource = getClass.getClassLoader.getResource(featureNameFile)
+    val fileContent = FileUtil.readFile(resource)
+
+    // 解析特征名
+    val featureNameSet = fileContent.split("\n")
+      .map(_.trim)
+      .filter(_.nonEmpty)
+      .toSet
+    val featureNameSet_br = sc.broadcast(featureNameSet)
+
+    // 读取数据
+    val data = sc.textFile(readPath)
+
+    // 过滤无效数据(以 tab 分割后长度不为 3)
+    val validData = data.filter(_.split("\t").length == 3)
+    println("问题数据数量:" + (data.count() - validData.count()))
+
+    // 统计 feature 出现且 > 0 的次数
+    val featureCounts = validData.map { line =>
+      val fields = line.split("\t")
+      val jsonStr = fields(2)
+
+      // 解析 JSON(转换为 Map[String, Int])
+      val featureMap = jsonStr
+        .replaceAll("[{}\"]", "") // 去掉大括号和引号
+        .split(",")
+        .map(_.split(":"))
+        .collect { case Array(k, v) if v.trim.forall(_.isDigit) => k.trim -> v.trim.toInt }
+        .toMap
+
+      // 统计 featureNameSet 中的 key 出现且 > 0 的次数
+      featureNameSet_br.value.map { key =>
+        key -> (if (featureMap.getOrElse(key, 0) > 0) 1 else 0)
+      }.toMap
+    }.reduce((map1, map2) =>
+      (map1.keys ++ map2.keys).map { key =>
+        key -> (map1.getOrElse(key, 0) + map2.getOrElse(key, 0))
+      }.toMap
+    )
+
+    // 打印结果
+    featureCounts.foreach { case (feature, count) =>
+      println(s"$feature -> $count")
+    }
+  }
+}

+ 21 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/FileUtil.scala

@@ -0,0 +1,21 @@
+package com.aliyun.odps.spark.examples.myUtils
+
+import java.net.URL
+import scala.io.Source
+
+object FileUtil {
+  def readFile(path: URL): String = {
+    var source: Option[Source] = None
+    try {
+      source = Some(Source.fromURL(path))
+      val content = source.get.getLines().mkString("\n")
+      content
+    } catch {
+      case e: Exception =>
+        println(s"读取文件: ${path}, 发生未知错误: ${e.getMessage}")
+        ""
+    } finally {
+      source.foreach(_.close())
+    }
+  }
+}