فهرست منبع

feat:修改分析脚本

zhaohaipeng 2 ماه پیش
والد
کامیت
d40781dfc1

+ 86 - 0
src/main/java/examples/utils/StatisticsUtil.java

@@ -0,0 +1,86 @@
+package examples.utils;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.spark.broadcast.Broadcast;
+import scala.collection.immutable.Set;
+
+import java.util.Map;
+
+public class StatisticsUtil {
+
+    public static void featureCoverRate(String record, Broadcast<Set<String>> featureNames, Map<String, Long> allMap, Map<String, Long> isShareMap, Map<String, Long> isReturnNoSelfMap) {
+        String[] rSplit = record.split("\t");
+        // 异常样本统计
+        if (rSplit.length != 3) {
+            StatisticsUtil.mapKeyAddOne(allMap, "errorSampleNum");
+            StatisticsUtil.mapKeyAddOne(isShareMap, "errorSampleNum");
+            StatisticsUtil.mapKeyAddOne(isReturnNoSelfMap, "errorSampleNum");
+            return;
+        }
+
+        // 是否推荐场景
+        JSONObject logKey = JSON.parseObject(rSplit[0]);
+        String page = logKey.getString("page");
+        String recommendPageType = logKey.getString("recommendpagetype");
+
+        JSONObject labelKey = JSON.parseObject(rSplit[1]);
+        String isShare = labelKey.getString("is_share");
+        String isReturnNoSelf = logKey.getString("is_return_noself");
+
+        if (StatisticsUtil.isRecommendScene(page, recommendPageType)) {
+            StatisticsUtil.mapKeyAddOne(allMap, "recommendSampleNum");
+            if (StringUtils.equals("1", isShare)) {
+                StatisticsUtil.mapKeyAddOne(isShareMap, "recommendSampleNum");
+            }
+            if (StringUtils.equals("1", isReturnNoSelf)) {
+                StatisticsUtil.mapKeyAddOne(isReturnNoSelfMap, "recommendSampleNum");
+            }
+
+        }
+
+        // 正常数据统计
+        StatisticsUtil.mapKeyAddOne(allMap, "normalSampleNum");
+        if (StringUtils.equals("1", isShare)) {
+            StatisticsUtil.mapKeyAddOne(isShareMap, "normalSampleNum");
+        }
+        if (StringUtils.equals("1", isReturnNoSelf)) {
+            StatisticsUtil.mapKeyAddOne(isReturnNoSelfMap, "normalSampleNum");
+        }
+
+        // 特征大于0统计
+        JSONObject feature = JSON.parseObject(rSplit[2]);
+        for (String featureName : featureNames) {
+            if (feature.containsKey(featureName) && feature.getDoubleValue(featureName) > 0) {
+                StatisticsUtil.mapKeyAddOne(allMap, featureName);
+
+                if (StringUtils.equals("1", isShare)) {
+                    StatisticsUtil.mapKeyAddOne(isShareMap, featureName);
+                }
+                if (StringUtils.equals("1", isReturnNoSelf)) {
+                    StatisticsUtil.mapKeyAddOne(isReturnNoSelfMap, featureName);
+                }
+
+            }
+        }
+    }
+
+    private static boolean isRecommendScene(String page, String recommendPageType) {
+        if (StringUtils.equals("详情后沉浸页", page)) {
+            return true;
+        } else if (StringUtils.equals("回流后沉浸页&内页feed", page) && StringUtils.isNotBlank(recommendPageType) && recommendPageType.endsWith("-pages/user-videos-share-recommend-detail")) {
+            return true;
+        }
+        return false;
+    }
+
+    private static void mapKeyAddOne(Map<String, Long> resultMap, String key) {
+        if (!resultMap.containsKey(key)) {
+            resultMap.put(key, 0L);
+        }
+
+        resultMap.put(key, resultMap.get(key) + 1);
+    }
+
+}

+ 39 - 31
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys/makedata_recsys_45_feature_cover_degree.scala

@@ -1,8 +1,10 @@
-package com.aliyun.odps.spark.examples.makedata_recsys
-
 import com.aliyun.odps.spark.examples.myUtils.{FileUtils, ParamUtils}
+import examples.utils.StatisticsUtil
 import org.apache.spark.sql.SparkSession
 
+import java.util
+import scala.collection.JavaConverters._
+
 object makedata_recsys_45_feature_cover_degree {
   def main(args: Array[String]): Unit = {
     val spark = SparkSession
@@ -28,37 +30,43 @@ object makedata_recsys_45_feature_cover_degree {
     // 读取数据
     val data = sc.textFile(readPath)
 
-    // 过滤无效数据(以 tab 分割后长度不为 3)
-    val validData = data.filter(_.split("\t").length == 3)
-    println("问题数据数量:" + (data.count() - validData.count()))
-    println(s"正常数据量: ${validData.count()}")
+    // 使用 Accumulator 代替 HashMap
+    val allMapAcc = sc.collectionAccumulator[(String, Long)]("AllMap")
+    val isShareMapAcc = sc.collectionAccumulator[(String, Long)]("IsShareMap")
+    val isReturnNoSelfMapAcc = sc.collectionAccumulator[(String, Long)]("IsReturnNoSelfMap")
 
-    // 统计 feature 出现且 > 0 的次数
-    val featureCounts = validData.map { line =>
-      val fields = line.split("\t")
-      val jsonStr = fields(2)
+    data.foreach { line =>
+      val allMap = new util.HashMap[String, java.lang.Long]()
+      val isShareMap = new util.HashMap[String, java.lang.Long]()
+      val isReturnNoSelfMap = new util.HashMap[String, java.lang.Long]()
 
-      // 解析 JSON(转换为 Map[String, Int])
-      val featureMap = jsonStr
-        .replaceAll("[{}\"]", "") // 去掉大括号和引号
-        .split(",")
-        .map(_.split(":"))
-        .collect { case Array(k, v) if v.trim.forall(_.isDigit) => k.trim -> v.trim.toInt }
-        .toMap
+      // 调用 Java 方法处理
+      StatisticsUtil.featureCoverRate(line, featureNameSet_br, allMap, isShareMap, isReturnNoSelfMap)
 
-      // 统计 featureNameSet 中的 key 出现且 > 0 的次数
-      featureNameSet_br.value.map { key =>
-        key -> (if (featureMap.getOrElse(key, 0) > 0) 1 else 0)
-      }.toMap
-    }.reduce((map1, map2) =>
-      (map1.keys ++ map2.keys).map { key =>
-        key -> (map1.getOrElse(key, 0) + map2.getOrElse(key, 0))
-      }.toMap
-    )
-
-    // 打印结果
-    featureCounts.foreach { case (feature, count) =>
-      println(s"$feature -> $count")
+      // 将结果累加到 Accumulator
+      allMap.asScala.foreach { case (key, value) => allMapAcc.add((key, value)) }
+      isShareMap.asScala.foreach { case (key, value) => isShareMapAcc.add((key, value)) }
+      isReturnNoSelfMap.asScala.foreach { case (key, value) => isReturnNoSelfMapAcc.add((key, value)) }
     }
+
+    // **转换 Accumulator 数据**
+    val allMapSeq = allMapAcc.value.asScala.toSeq
+    val isShareMapSeq = isShareMapAcc.value.asScala.toSeq
+    val isReturnNoSelfMapSeq = isReturnNoSelfMapAcc.value.asScala.toSeq
+
+    // **使用 groupBy 聚合**
+    val finalAllMap = allMapSeq.groupBy(_._1).mapValues(_.map(_._2).sum).toMap
+    val finalIsShareMap = isShareMapSeq.groupBy(_._1).mapValues(_.map(_._2).sum).toMap
+    val finalIsReturnNoSelfMap = isReturnNoSelfMapSeq.groupBy(_._1).mapValues(_.map(_._2).sum).toMap
+
+    // **输出最终结果**
+    println("AllMap 结果:")
+    finalAllMap.foreach { case (key, value) => println(s"$key -> $value") }
+
+    println("IsShareMap 结果:")
+    finalIsShareMap.foreach { case (key, value) => println(s"$key -> $value") }
+
+    println("IsReturnNoSelfMap 结果:")
+    finalIsReturnNoSelfMap.foreach { case (key, value) => println(s"$key -> $value") }
   }
-}
+}