Quellcode durchsuchen

行为预测性别

jch vor 3 Wochen
Ursprung
Commit
61f161c44e

+ 75 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_profile_20251114.scala

@@ -0,0 +1,75 @@
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
+
+import com.aliyun.odps.spark.examples.myUtils._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+
+object makedata_profile_20251114 {
+  private val o2oMap = Map(
+    "creative" -> "creativeInfo"
+  )
+
+  private def getFeature(rdd: RDD[java.util.Map[String, String]]): RDD[String] = {
+    rdd.mapPartitions(partition => {
+        partition.map(raw => {
+          val mid = raw.getOrElse("mid", "")
+          val labels = raw.getOrElse("labels", "")
+          val features = ConvertV1.getFeature(0, raw, 6).toString
+          if (mid.nonEmpty && labels.nonEmpty && features.nonEmpty) {
+            mid + "\t" + labels + "\t" + features
+          } else {
+            ""
+          }
+        })
+      })
+      .filter(_.nonEmpty)
+  }
+
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1. 解析参数
+    val param = ParamUtils.parseArgs(args)
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "alg_recsys_feature_behavior_profile")
+    val year = param.getOrElse("year", "2025")
+    val suffixSet = param.getOrElse("suffix", "y,8,4,0,e,a,c,k,o,w,g,s,u,q,i,m").split(",").toSet
+    val ts = param.getOrElse("ts", "0").toLong
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/user_profile/data/")
+    val repartition = param.getOrElse("repartition", "64").toInt
+
+    var currentMs = 0L
+    if (0 != ts) {
+      currentMs = ts * 1000
+    } else {
+      currentMs = System.currentTimeMillis()
+    }
+
+    // 2. 数据处理
+    for (suffix <- suffixSet) {
+      // 2.1 分区
+      val partition = "year=%s,suffix=%s".format(year, suffix)
+      println("开始执行partition:" + partition)
+
+      // 2.2 加载样本数据
+      val odpsData = DataUtils.getODPSData(sc, project, table, partition, tablePart)
+        .map(record => {
+          OnlineLogUtils.log2Map(record, o2oMap)
+        })
+
+      // 2.3 特征转换
+      val featureData = getFeature(odpsData)
+
+      // 2.4 保存数据
+      val hdfsPath = "%s/%s_%s".format(savePath, year, suffix)
+      DataUtils.saveData(featureData, hdfsPath, repartition)
+    }
+  }
+}

+ 102 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_profile_gender_sample_20251114.scala

@@ -0,0 +1,102 @@
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
+
+import com.aliyun.odps.spark.examples.myUtils.{DataUtils, MyHdfsUtils, ParamUtils}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+
+
+object makedata_profile_gender_sample_20251114 {
+  def main(args: Array[String]): Unit = {
+    // 1. 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/user_profile/data/")
+    val year = param.getOrElse("year", "2025")
+    val suffixSet = param.getOrElse("suffix", "y,8,4,0,e,a,c,k,o,w,g,s,u,q,i,m").split(",").toSet
+    val whatLabel = param.getOrElse("whatLabel", "gender")
+    val classSet = param.getOrElse("class", "1,2").split(",").toSet
+    val featureFile = param.getOrElse("featureFile", "20241209_recsys_nor_name.txt")
+    val minCnt = param.getOrElse("minCnt", "10").toDouble
+    val repartition = param.getOrElse("repartition", "100").toInt
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/user_profile/gender/sample/")
+
+    // 2. content
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 3. 处理数据
+    val loader = getClass.getClassLoader
+    val featureSet = loadFeatureNames(featureFile)
+    val featureBucketMap = DataUtils.loadUseFeatureBuckets(loader, 1, "")
+    val bucketsMap_br = sc.broadcast(featureBucketMap)
+    for (suffix <- suffixSet) {
+      val partition = "%s_%s".format(year, suffix)
+      println("开始执行:" + partition)
+      val data = sc.textFile(readPath + "/" + partition + "*").map(row => {
+          val cells = row.split("\t")
+          val mid = cells(0)
+          val labels = cells(1)
+          val featData = cells(2)
+          (mid, labels, featData)
+        })
+        .filter {
+          case (mid, labels, featData) =>
+            val label = DataUtils.parseLabel(labels, whatLabel)
+            classSet.contains(label)
+        }
+        .map {
+          case (mid, labels, featData) =>
+            var label = DataUtils.parseLabel(labels, whatLabel).toInt
+            val features = DataUtils.parseFeature(featData)
+            if (label > 0) {
+              label -= 1
+            }
+            (mid, label, features)
+        }
+        .filter {
+          case (mid, label, features) =>
+            val cnt = features.getOrElse("cnt", 0d)
+            cnt >= minCnt
+        }
+        .mapPartitions(row => {
+          val result = new ArrayBuffer[String]()
+          row.foreach {
+            case (mid, label, features) =>
+              val bucketsMap = bucketsMap_br.value
+              val featuresBucket = DataUtils.bucketFeature(featureSet, bucketsMap, features)
+              result.add(mid + "\t" + label + "\t" + featuresBucket.mkString("\t"))
+          }
+          result.iterator
+        })
+
+      // 4. 保存数据到hdfs
+      val hdfsPath = savePath + "/" + partition
+      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+        println("删除路径并开始数据写入:" + hdfsPath)
+        MyHdfsUtils.delete_hdfs_path(hdfsPath)
+        data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+      } else {
+        println("路径不合法,无法写入:" + hdfsPath)
+      }
+    }
+  }
+
+  def loadFeatureNames(nameFile: String): Set[String] = {
+    val buffer = Source.fromFile(nameFile)
+    val names = buffer.getLines().mkString("\n")
+    buffer.close()
+    val featSet = names.split("\n")
+      .map(r => r.replace(" ", "").replaceAll("\n", ""))
+      .filter(r => r.nonEmpty)
+      .toSet
+    println("featSet.size=" + featSet.size)
+    println(featSet)
+    featSet
+  }
+}

+ 101 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/parse_gender_to_hive.scala

@@ -0,0 +1,101 @@
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
+
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{ParamUtils, env}
+import org.apache.spark.sql.SparkSession
+
+object parse_gender_to_hive {
+  def main(args: Array[String]): Unit = {
+    // 1. 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val inputPath = param.getOrElse("inputPath", "/dw/recommend/model/user_profile/gender/result/")
+    val year = param.getOrElse("year", "2025")
+    val suffixSet = param.getOrElse("suffix", "y,8,4,0,e,a,c,k,o,w,g,s,u,q,i,m").split(",").toSet
+    val project = param.getOrElse("project", "loghubods")
+    val outputTable = param.getOrElse("outputTable", "alg_recsys_user_behavior_gender_tmp")
+    val version = param.getOrElse("version", "20251119")
+
+    // 2. sc
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 3. 读取odps+表信息
+    val odpsOps = env.getODPS(sc)
+
+    // 4. 处理数据
+    for (suffix <- suffixSet) {
+      val subPath = "%s_%s".format(year, suffix)
+      println("开始执行: " + subPath)
+      val predictRdd = sc.textFile(inputPath + "/" + subPath + "*")
+        .map(line => {
+          val cells = line.split("\t")
+          val fake = cells(0)
+          val logit = cells(1)
+          val score = parseScore(cells(2))
+          val mid = cells(3)
+          val total = cells(4)
+          (mid, total, score)
+        })
+        .map {
+          case (mid, total, score) =>
+            val colsMap = scala.collection.mutable.Map[String, Any]()
+            colsMap.put("mid", mid)
+            colsMap.put("total", total)
+            colsMap.put("score", score)
+            colsMap
+        }
+
+      // 5. write to table
+      val partition = s"version=$version,suffix=$suffix"
+      odpsOps.saveToTable(project, outputTable, partition, predictRdd, write, defaultCreate = true, overwrite = true)
+    }
+  }
+
+  private def parseScore(data: String): String = {
+    if (data.nonEmpty) {
+      val pair = data.replace("[", "").replace("]", "").split(",")
+      if (pair.length > 1) {
+        return pair(1).toDouble.formatted("%.4f")
+      }
+    }
+    "-1"
+  }
+
+  private def write(map: scala.collection.mutable.Map[String, Any], record: Record, schema: TableSchema): Unit = {
+    for ((key, value) <- map) {
+      try {
+        // 查找列名在表结构中的索引
+        val columnIndex = schema.getColumnIndex(key.toLowerCase)
+        // 获取列的类型
+        val columnType = schema.getColumn(columnIndex).getTypeInfo
+        try {
+          columnType.getTypeName match {
+            case "STRING" =>
+              record.setString(columnIndex, value.toString)
+            case "BIGINT" =>
+              record.setBigint(columnIndex, value.toString.toLong)
+            case "DOUBLE" =>
+              record.setDouble(columnIndex, value.toString.toDouble)
+            case "BOOLEAN" =>
+              record.setBoolean(columnIndex, value.toString.toBoolean)
+            case other =>
+              throw new IllegalArgumentException(s"Unsupported column type: $other")
+          }
+        } catch {
+          case e: NumberFormatException =>
+            println(s"Error converting value $value to type ${columnType.getTypeName} for column $key: ${e.getMessage}")
+          case e: Exception =>
+            println(s"Unexpected error writing value $value to column $key: ${e.getMessage}")
+        }
+      } catch {
+        case e: IllegalArgumentException => {
+          println(e.getMessage)
+        }
+      }
+    }
+  }
+}

+ 85 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/stat_origin_feature.scala

@@ -0,0 +1,85 @@
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
+
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.spark.examples.myUtils.{DataUtils, MyHdfsUtils, ParamUtils}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+object stat_origin_feature {
+  def main(args: Array[String]): Unit = {
+    // 1. 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/user_profile/data/")
+    val year = param.getOrElse("year", "2025")
+    val suffixSet = param.getOrElse("suffix", "y,8,4,0,e,a,c,k,o,w,g,s,u,q,i,m").split(",").toSet
+    val whatLabel = param.getOrElse("whatLabel", "gender")
+    val classSet = param.getOrElse("class", "1,2").split(",").toSet
+    val featureIndex = param.getOrElse("featureIndex", "2").toInt
+    val repartition = param.getOrElse("repartition", "10").toInt
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/832_recsys_analysis_data")
+
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 2. 处理数据
+    val pathArray = getPathArray(readPath, year, suffixSet)
+    val data = sc.textFile(pathArray.mkString(","))
+      .filter(r => {
+        val cells = r.split("\t")
+        val label = DataUtils.parseLabel(cells(1), whatLabel)
+        classSet.contains(label)
+      })
+      .map(r => {
+        // logKey + "\t" + labelKey  + "\t" + featureKey
+        val rList = r.split("\t")
+        val featData = rList(featureIndex)
+        parseFeature(featData)
+      })
+      .flatMap(features => {
+        features.map {
+          case (key, value) =>
+            (key, 1)
+        }
+      })
+      .reduceByKey((a, b) => a + b)
+      .map(raw => {
+        raw._1 + "\t" + raw._2.toString
+      })
+
+    // 3. 保存数据到hdfs
+    val hdfsPath = savePath + "/" + year
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
+    }
+  }
+
+  private def parseFeature(data: String): scala.collection.mutable.Map[String, Double] = {
+    val features = scala.collection.mutable.Map[String, Double]()
+    if (data.nonEmpty) {
+      val obj = JSON.parseObject(data)
+      obj.foreach(r => {
+        features.put(r._1, obj.getDoubleValue(r._1))
+      })
+    }
+    features
+  }
+
+  def getPathArray(basePath: String, year: String, suffixSet: Set[String]): ArrayBuffer[String] = {
+    val pathArray = new ArrayBuffer[String]
+    for (suffix <- suffixSet) {
+      val path = "%s/%s_%s/*".format(basePath, year, suffix)
+      pathArray += path
+    }
+    pathArray
+  }
+}

+ 41 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/ConvertV1.java

@@ -0,0 +1,41 @@
+package com.aliyun.odps.spark.examples.myUtils;
+
+import com.alibaba.fastjson.JSONObject;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ConvertV1 {
+    public static JSONObject getFeature(long currentMs, Map<String, String> record, int accurate) {
+        Map<String, Double> featMap = new HashMap<>();
+        try {
+            // base
+            // String mid = record.get("mid");
+            Map<String, String> total_stat = ConvertUtils.getRecordCol(record, "total_stat");
+            double total = Double.parseDouble(total_stat.get("total"));
+            long behaviorMs = Long.parseLong(total_stat.get("ts")) * 1000;
+
+            // context
+            FeatureTransformV1.getContextFeature(currentMs, behaviorMs, total, featMap);
+
+            // cate1
+            Map<String, String> cate1_stat = ConvertUtils.getRecordCol(record, "cate1_stat");
+            FeatureTransformV1.getCateFeature("cate1", total, cate1_stat, featMap);
+
+            // cate2
+            Map<String, String> cate2_stat = ConvertUtils.getRecordCol(record, "cate2_stat");
+            FeatureTransformV1.getCateFeature("cate2", total, cate2_stat, featMap);
+
+            // title keywords
+            Map<String, String> title_keywords_stat = ConvertUtils.getRecordCol(record, "title_keywords_stat");
+            FeatureTransformV1.getCateFeature("title_kw", total, title_keywords_stat, featMap);
+
+            // title keywords
+            Map<String, String> video_keywords_stat = ConvertUtils.getRecordCol(record, "video_keywords_stat");
+            FeatureTransformV1.getCateFeature("video_kw", total, video_keywords_stat, featMap);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return ConvertUtils.filterAndTruncate(featMap, accurate);
+    }
+}

+ 34 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/FeatureTransformV1.java

@@ -0,0 +1,34 @@
+package com.aliyun.odps.spark.examples.myUtils;
+
+import java.util.Map;
+
+public class FeatureTransformV1 {
+    public static final double maxDiffMs = 12 * 30 * 24 * 3600 * 1000.0;
+
+    public static void getContextFeature(long currentMs, long behaviorMs, double total, Map<String, Double> featureMap) {
+        double diff = (currentMs - behaviorMs) / maxDiffMs;
+        if (diff > 1.0) {
+            diff = 1.0;
+        }
+        featureMap.put("ms", diff);
+
+        double value = FeatureUtils.log1(total, 10);
+        featureMap.put("cnt", total);
+        featureMap.put("total", value);
+    }
+
+    public static void getCateFeature(String prefix, double total, Map<String, String> stat, Map<String, Double> featMap) {
+        if (null != stat) {
+            for (Map.Entry<String, String> entry : stat.entrySet()) {
+                String name = entry.getKey();
+                String cleanName = name.replaceAll("(\\s+|\\t|:)", "");
+                if (!cleanName.isEmpty() && !cleanName.equals("unknown")) {
+                    double cnt = Double.parseDouble(entry.getValue());
+                    double value = FeatureUtils.wilsonScore(cnt, total);
+                    String key = String.format("%s@%s", prefix, cleanName);
+                    featMap.put(key, value);
+                }
+            }
+        }
+    }
+}