|  | @@ -0,0 +1,115 @@
 | 
	
		
			
				|  |  | +package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import com.alibaba.fastjson.{JSON, JSONObject}
 | 
	
		
			
				|  |  | +import com.aliyun.odps.TableSchema
 | 
	
		
			
				|  |  | +import com.aliyun.odps.data.Record
 | 
	
		
			
				|  |  | +import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
 | 
	
		
			
				|  |  | +import examples.utils.SimilarityUtils
 | 
	
		
			
				|  |  | +import org.apache.hadoop.io.compress.GzipCodec
 | 
	
		
			
				|  |  | +import org.apache.spark.sql.SparkSession
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import scala.collection.JavaConversions._
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +/*
 | 
	
		
			
				|  |  | +   20250109 提取特征
 | 
	
		
			
				|  |  | + */
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +object make_rov_online_feature {
 | 
	
		
			
				|  |  | +  def main(args: Array[String]): Unit = {
 | 
	
		
			
				|  |  | +    val spark = SparkSession
 | 
	
		
			
				|  |  | +      .builder()
 | 
	
		
			
				|  |  | +      .appName(this.getClass.getName)
 | 
	
		
			
				|  |  | +      .getOrCreate()
 | 
	
		
			
				|  |  | +    val sc = spark.sparkContext
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // 1 读取参数
 | 
	
		
			
				|  |  | +    val param = ParamUtils.parseArgs(args)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    val beginStr = param.getOrElse("beginStr", "2025011015")
 | 
	
		
			
				|  |  | +    val endStr = param.getOrElse("endStr", "2025011015")
 | 
	
		
			
				|  |  | +    val project = param.getOrElse("project", "loghubods")
 | 
	
		
			
				|  |  | +    val table = param.getOrElse("table", "statistics_log_hour")
 | 
	
		
			
				|  |  | +    val tablePart = param.getOrElse("tablePart", "64").toInt
 | 
	
		
			
				|  |  | +    val savePath = param.getOrElse("savePath", "/dw/recommend/model/61_recsys_rov_train_data/")
 | 
	
		
			
				|  |  | +    val repartition = param.getOrElse("repartition", "100").toInt
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // 2 odps
 | 
	
		
			
				|  |  | +    val odpsOps = env.getODPS(sc)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    // 3 循环执行数据生产
 | 
	
		
			
				|  |  | +    val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
 | 
	
		
			
				|  |  | +    for (dt_hh <- timeRange) {
 | 
	
		
			
				|  |  | +      val dt = dt_hh.substring(0, 8)
 | 
	
		
			
				|  |  | +      val hh = dt_hh.substring(8, 10)
 | 
	
		
			
				|  |  | +      val partition = s"dt=$dt$hh"
 | 
	
		
			
				|  |  | +      println("开始执行partiton:" + partition)
 | 
	
		
			
				|  |  | +      val odpsData = odpsOps.readTable(project = project,
 | 
	
		
			
				|  |  | +          table = table,
 | 
	
		
			
				|  |  | +          partition = partition,
 | 
	
		
			
				|  |  | +          transfer = func,
 | 
	
		
			
				|  |  | +          numPartition = tablePart)
 | 
	
		
			
				|  |  | +        .filter(record => {
 | 
	
		
			
				|  |  | +          val apptype = record.getString("apptype")
 | 
	
		
			
				|  |  | +          val scoresMap = JSON.parseObject(record.getString("scoresmap"))
 | 
	
		
			
				|  |  | +          "31".equals(apptype) && scoresMap.containsKey("RovXGBScore")
 | 
	
		
			
				|  |  | +        })
 | 
	
		
			
				|  |  | +        .mapPartitions(p => {
 | 
	
		
			
				|  |  | +          SimilarityUtils.init()
 | 
	
		
			
				|  |  | +          p.map(record => {
 | 
	
		
			
				|  |  | +            // a 特征
 | 
	
		
			
				|  |  | +            val jsons = getJsonObject(record, "allfeaturemap")
 | 
	
		
			
				|  |  | +            val features = scala.collection.mutable.Map[String, Double]()
 | 
	
		
			
				|  |  | +            jsons.foreach(r => {
 | 
	
		
			
				|  |  | +              features.put(r._1, jsons.getDoubleValue(r._1))
 | 
	
		
			
				|  |  | +            })
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            val featuresBucket = features.map {
 | 
	
		
			
				|  |  | +              case (name, score) =>
 | 
	
		
			
				|  |  | +                name + ":" + score.toString
 | 
	
		
			
				|  |  | +            }.filter(_.nonEmpty)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            //5 处理log key表头。
 | 
	
		
			
				|  |  | +            val apptype = record.getString("apptype")
 | 
	
		
			
				|  |  | +            val pagesource = record.getString("pagesource")
 | 
	
		
			
				|  |  | +            val mid = record.getString("mid")
 | 
	
		
			
				|  |  | +            val vid = if (record.isNull("videoid")) "" else record.getString("videoid")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            val recommendtraceid = record.getString("recommendtraceid")
 | 
	
		
			
				|  |  | +            val traceid = record.getString("traceid")
 | 
	
		
			
				|  |  | +            val level = if (record.isNull("level")) "0" else record.getString("level")
 | 
	
		
			
				|  |  | +            val logKey = (apptype, pagesource, mid, vid, recommendtraceid, traceid, level).productIterator.mkString(",")
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +            val scoresMap = JSON.parseObject(record.getString("scoresmap")).toString()
 | 
	
		
			
				|  |  | +            //6 拼接数据,保存。
 | 
	
		
			
				|  |  | +            logKey + "\t" + "1" + "\t" + scoresMap + "\t" + featuresBucket.mkString("\t")
 | 
	
		
			
				|  |  | +          })
 | 
	
		
			
				|  |  | +        })
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      // 4 保存数据到hdfs
 | 
	
		
			
				|  |  | +      val savePartition = dt
 | 
	
		
			
				|  |  | +      val hdfsPath = savePath + "/" + savePartition
 | 
	
		
			
				|  |  | +      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
 | 
	
		
			
				|  |  | +        println("删除路径并开始数据写入:" + hdfsPath)
 | 
	
		
			
				|  |  | +        MyHdfsUtils.delete_hdfs_path(hdfsPath)
 | 
	
		
			
				|  |  | +        odpsData.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
 | 
	
		
			
				|  |  | +      } else {
 | 
	
		
			
				|  |  | +        println("路径不合法,无法写入:" + hdfsPath)
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def func(record: Record, schema: TableSchema): Record = {
 | 
	
		
			
				|  |  | +    record
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def getJsonObject(record: Record, key: String): JSONObject = {
 | 
	
		
			
				|  |  | +    val data = if (record.isNull(key)) new JSONObject() else JSON.parseObject(record.getString(key))
 | 
	
		
			
				|  |  | +    val data2 = new JSONObject()
 | 
	
		
			
				|  |  | +    data.foreach(r => {
 | 
	
		
			
				|  |  | +      if (r._2 != null) {
 | 
	
		
			
				|  |  | +        data2.put(r._1, r._2)
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    })
 | 
	
		
			
				|  |  | +    data2
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +}
 |