|
|
@@ -0,0 +1,86 @@
|
|
|
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON
|
|
|
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils}
|
|
|
+import org.apache.hadoop.io.compress.GzipCodec
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+
|
|
|
+import scala.collection.JavaConversions._
|
|
|
+import scala.collection.mutable.ArrayBuffer
|
|
|
+
|
|
|
+object feature_stat {
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ // 1. 读取参数
|
|
|
+ val param = ParamUtils.parseArgs(args)
|
|
|
+ val readPath = param.getOrElse("readPath", "/dw/recommend/model/83_origin_data/")
|
|
|
+ val beginStr = param.getOrElse("beginStr", "20250317")
|
|
|
+ val endStr = param.getOrElse("endStr", "20250317")
|
|
|
+ val repartition = param.getOrElse("repartition", "10").toInt
|
|
|
+ val savePath = param.getOrElse("savePath", "/dw/recommend/model/83_recsys_feature/")
|
|
|
+
|
|
|
+ val spark = SparkSession
|
|
|
+ .builder()
|
|
|
+ .appName(this.getClass.getName)
|
|
|
+ .getOrCreate()
|
|
|
+ val sc = spark.sparkContext
|
|
|
+
|
|
|
+ // 2. 处理数据
|
|
|
+ val dateRange = MyDateUtils.getDateRange(beginStr, endStr)
|
|
|
+ val pathArray = getPathArray(readPath, dateRange)
|
|
|
+ val data = sc.textFile(pathArray.mkString(","))
|
|
|
+ .map(r => {
|
|
|
+ // logKey + "\t" + labelKey + "\t" + scoresMap + "\t" + featureKey
|
|
|
+ val rList = r.split("\t")
|
|
|
+ val featData = rList(3)
|
|
|
+ parseFeature(featData)
|
|
|
+ })
|
|
|
+ .flatMap(features => {
|
|
|
+ features.map {
|
|
|
+ case (key, value) =>
|
|
|
+ (key, 1)
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .filter(raw => {
|
|
|
+ val key = raw._1
|
|
|
+ if (key.startsWith("mid@") || key.startsWith("vid@")) {
|
|
|
+ false
|
|
|
+ } else {
|
|
|
+ true
|
|
|
+ }
|
|
|
+ })
|
|
|
+ .reduceByKey((a, b) => a + b)
|
|
|
+ .map(raw => {
|
|
|
+ raw._1 + "\t" + raw._2.toString
|
|
|
+ })
|
|
|
+
|
|
|
+ // 3. 保存数据到hdfs
|
|
|
+ val hdfsPath = savePath + "/" + dateRange.get(dateRange.size - 1)
|
|
|
+ if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
|
|
|
+ println("删除路径并开始数据写入:" + hdfsPath)
|
|
|
+ MyHdfsUtils.delete_hdfs_path(hdfsPath)
|
|
|
+ data.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
|
|
|
+ } else {
|
|
|
+ println("路径不合法,无法写入:" + hdfsPath)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private def parseFeature(data: String): scala.collection.mutable.Map[String, Double] = {
|
|
|
+ val features = scala.collection.mutable.Map[String, Double]()
|
|
|
+ if (data.nonEmpty) {
|
|
|
+ val obj = JSON.parseObject(data)
|
|
|
+ obj.foreach(r => {
|
|
|
+ features.put(r._1, obj.getDoubleValue(r._1))
|
|
|
+ })
|
|
|
+ }
|
|
|
+ features
|
|
|
+ }
|
|
|
+
|
|
|
+ def getPathArray(basePath: String, dateRange: ArrayBuffer[String]): ArrayBuffer[String] = {
|
|
|
+ val pathArray = new ArrayBuffer[String]
|
|
|
+ for (date <- dateRange) {
|
|
|
+ val path = basePath + "/" + date + "*"
|
|
|
+ pathArray += path
|
|
|
+ }
|
|
|
+ pathArray
|
|
|
+ }
|
|
|
+}
|