|
@@ -0,0 +1,374 @@
|
|
|
+package com.aliyun.odps.spark.examples.makedata_dssm
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON
|
|
|
+import com.aliyun.odps.TableSchema
|
|
|
+import com.aliyun.odps.data.Record
|
|
|
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
|
|
|
+import org.apache.hadoop.io.compress.GzipCodec
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+import com.aliyun.odps.spark.examples.makedata_dssm.makedata_i2i_05_trainData_20241129.{getOnehotValue, getDenseBucketValue}
|
|
|
+import scala.collection.mutable
|
|
|
+import scala.collection.JavaConversions._
|
|
|
+import scala.collection.mutable.ArrayBuffer
|
|
|
+import scala.io.Source
|
|
|
+
|
|
|
+object makedata_i2i_06_itemPred_20241206 {
|
|
|
+ def func(record: Record, schema: TableSchema): Record = {
|
|
|
+ record
|
|
|
+ }
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val spark = SparkSession
|
|
|
+ .builder()
|
|
|
+ .appName(this.getClass.getName)
|
|
|
+ .getOrCreate()
|
|
|
+ val sc = spark.sparkContext
|
|
|
+
|
|
|
+ // 1 读取参数
|
|
|
+ val param = ParamUtils.parseArgs(args)
|
|
|
+ val tablePart = param.getOrElse("tablePart", "64").toInt
|
|
|
+ val dt = param.getOrElse("dt", "20240620")
|
|
|
+ val onehotPath = param.getOrElse("onehotPath", "/dw/recommend/model/53_dssm_i2i_onehot/20241128")
|
|
|
+ val bucketFile = param.getOrElse("bucketFile", "20241128_recsys_i2i_bucket_47_v2.txt")
|
|
|
+ val savePath = param.getOrElse("savePath", "/dw/recommend/model/56_dssm_i2i_itempredData/")
|
|
|
+ val project = param.getOrElse("project", "loghubods")
|
|
|
+ val repartition = param.getOrElse("repartition", "100").toInt
|
|
|
+
|
|
|
+ // 2 读取onehot文件
|
|
|
+ val onehotMap_br = sc.broadcast(
|
|
|
+ sc.textFile(onehotPath).map(r => {
|
|
|
+ val rList = r.split("\t")
|
|
|
+ (rList(0), rList(1))
|
|
|
+ }).collectAsMap()
|
|
|
+ )
|
|
|
+ // 3 读取dense分桶文件
|
|
|
+ val resourceUrlBucket = this.getClass.getClassLoader.getResource(bucketFile)
|
|
|
+ val buckets =
|
|
|
+ if (resourceUrlBucket != null) {
|
|
|
+ val buckets = Source.fromURL(resourceUrlBucket).getLines().mkString("\n")
|
|
|
+ Source.fromURL(resourceUrlBucket).close()
|
|
|
+ buckets
|
|
|
+ } else {
|
|
|
+ ""
|
|
|
+ }
|
|
|
+ println(buckets)
|
|
|
+ val bucketsMap_br = sc.broadcast(
|
|
|
+ buckets.split("\n")
|
|
|
+ .map(r => r.replace(" ", "").replaceAll("\n", ""))
|
|
|
+ .filter(r => r.nonEmpty)
|
|
|
+ .map(r => {
|
|
|
+ val rList = r.split("\t")
|
|
|
+ (rList(0), (rList(1).toDouble, rList(2).split(",").map(_.toDouble)))
|
|
|
+ }).toMap
|
|
|
+ )
|
|
|
+
|
|
|
+ val odpsOps = env.getODPS(sc)
|
|
|
+ val category1_br = sc.broadcast(
|
|
|
+ odpsOps.readTable(project = project,
|
|
|
+ table = "t_vid_l1_cat_stat_feature",
|
|
|
+ partition = s"dt=$dt",
|
|
|
+ transfer = func,
|
|
|
+ numPartition = tablePart)
|
|
|
+ .map(record => {
|
|
|
+ val category = record.getString("category1")
|
|
|
+ val feature = record.getString("feature")
|
|
|
+ (category, feature)
|
|
|
+ }).collectAsMap()
|
|
|
+ )
|
|
|
+ val category2_br = sc.broadcast(
|
|
|
+ odpsOps.readTable(project = project,
|
|
|
+ table = "t_vid_l2_cat_stat_feature",
|
|
|
+ partition = s"dt=$dt",
|
|
|
+ transfer = func,
|
|
|
+ numPartition = tablePart)
|
|
|
+ .map(record => {
|
|
|
+ val category = record.getString("category2")
|
|
|
+ val feature = record.getString("feature")
|
|
|
+ (category, feature)
|
|
|
+ }).collectAsMap()
|
|
|
+ )
|
|
|
+ // 2 视频特征用join
|
|
|
+ val vidStaticFeature = odpsOps.readTable(project = project,
|
|
|
+ table = "t_vid_tag_feature",
|
|
|
+ partition = s"dt=$dt",
|
|
|
+ transfer = func,
|
|
|
+ numPartition = tablePart)
|
|
|
+ .map(record => {
|
|
|
+ val vid = record.getString("vid")
|
|
|
+ val feature = record.getString("feature")
|
|
|
+ (vid, feature)
|
|
|
+ })
|
|
|
+ val vidActionFeature = odpsOps.readTable(project = project,
|
|
|
+ table = "t_vid_stat_feature",
|
|
|
+ partition = s"dt=$dt",
|
|
|
+ transfer = func,
|
|
|
+ numPartition = tablePart)
|
|
|
+ .map(record => {
|
|
|
+ val vid = record.getString("vid")
|
|
|
+ val feature = record.getString("feature")
|
|
|
+ (vid, feature)
|
|
|
+ })
|
|
|
+
|
|
|
+ val data = vidStaticFeature.leftOuterJoin(vidActionFeature).map{
|
|
|
+ case (vid, (feature, Some(feature_action))) =>
|
|
|
+ (vid, (feature, feature_action))
|
|
|
+ case (vid, (feature, None)) =>
|
|
|
+ (vid, (feature, "{}"))
|
|
|
+ }.mapPartitions(row => {
|
|
|
+ val result = new ArrayBuffer[(String, (String, String, String, String))]()
|
|
|
+ val category1 = category1_br.value
|
|
|
+ val category2 = category2_br.value
|
|
|
+ row.foreach {
|
|
|
+ case (vid, (feature, feature_action)) =>
|
|
|
+ val cate1 = JSON.parseObject(feature).getOrDefault("category1", "无").toString
|
|
|
+ val cate2 = JSON.parseObject(feature).getOrDefault("category2_1", "无").toString
|
|
|
+ val feature_cate1 = category1.getOrElse(cate1, "{}")
|
|
|
+ val feature_cate2 = category2.getOrElse(cate2, "{}")
|
|
|
+ result.add((vid, (feature, feature_action, feature_cate1, feature_cate2)))
|
|
|
+ }
|
|
|
+ result.iterator
|
|
|
+ }).mapPartitions(row =>{
|
|
|
+ val result = new ArrayBuffer[String]()
|
|
|
+ val onehotMap = onehotMap_br.value
|
|
|
+ val bucketsMap = bucketsMap_br.value
|
|
|
+ row.foreach {
|
|
|
+ case (vid_left, (feature_left, feature_left_action, feature_left_cate1, feature_left_cate2)) =>
|
|
|
+ val left = new ArrayBuffer[String]()
|
|
|
+ val left_dense1 = new ArrayBuffer[String]()
|
|
|
+ val left_dense2 = new ArrayBuffer[String]()
|
|
|
+ // 1 sparse 特征 16个
|
|
|
+ // vid cate1 cate2 video_style valid_time captions_color audience_age_group
|
|
|
+ // audience_value_type font_size cover_persons_num audience_gender sentiment_tendency
|
|
|
+ // video_type background_music_type captions has_end_credit_guide
|
|
|
+ left += onehotMap.getOrElse("vid:" + vid_left, "0")
|
|
|
+ var jsonLeft = JSON.parseObject(feature_left)
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "category1", "cate1:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "category2_1", "cate2:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "video_style", "video_style:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "valid_time", "valid_time:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "captions_color", "captions_color:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "audience_age_group", "audience_age_group:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "audience_value_type", "audience_value_type:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "font_size", "font_size:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "cover_persons_num", "cover_persons_num:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "audience_gender", "audience_gender:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "sentiment_tendency", "sentiment_tendency:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "video_type", "video_type:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "background_music_type", "background_music_type:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "captions", "captions:")
|
|
|
+ left += getOnehotValue(jsonLeft, onehotMap, "has_end_credit_guide", "has_end_credit_guide:")
|
|
|
+ // 2 dense通过分桶转换成sparse特征 47个 * 3 * 2
|
|
|
+ jsonLeft = JSON.parseObject(feature_left_action)
|
|
|
+ var res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day1", "action:str_day1")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day1", "action:rov_day1")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day1", "action:ros_day1")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day7", "action:str_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day7", "action:rov_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day7", "action:ros_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day21", "action:str_day21")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day21", "action:rov_day21")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day21", "action:ros_day21")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day336", "action:str_day336")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day336", "action:rov_day336")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day336", "action:ros_day336")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "vovd1_day7", "action:vovd1_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "vovd1_day21", "action:vovd1_day21")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "vovd1_day336", "action:vovd1_day336")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ //----------------------cate1-----------------------------cate1---------------------------cate1----------------------
|
|
|
+ jsonLeft = JSON.parseObject(feature_left_cate1)
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day1", "cate1:str_day1")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day1", "cate1:rov_day1")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day1", "cate1:ros_day1")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day3", "cate1:str_day3")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day3", "cate1:rov_day3")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day3", "cate1:ros_day3")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day7", "cate1:str_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day7", "cate1:rov_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day7", "cate1:ros_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day30", "cate1:str_day30")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day30", "cate1:rov_day30")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day30", "cate1:ros_day30")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "vovd1_day1", "cate1:vovd1_day1")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "vovd1_day3", "cate1:vovd1_day3")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "vovd1_day7", "cate1:vovd1_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "vovd1_day30", "cate1:vovd1_day30")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ //----------------------cate2-----------------------------cate2---------------------------cate2----------------------
|
|
|
+ jsonLeft = JSON.parseObject(feature_left_cate2)
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day1", "cate2:str_day1")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day1", "cate2:rov_day1")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day1", "cate2:ros_day1")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day3", "cate2:str_day3")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day3", "cate2:rov_day3")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day3", "cate2:ros_day3")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day7", "cate2:str_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day7", "cate2:rov_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day7", "cate2:ros_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "str_day30", "cate2:str_day30")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "rov_day30", "cate2:rov_day30")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "ros_day30", "cate2:ros_day30")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "vovd1_day1", "cate2:vovd1_day1")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "vovd1_day3", "cate2:vovd1_day3")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "vovd1_day7", "cate2:vovd1_day7")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+ res = getDenseBucketValue(jsonLeft, bucketsMap, "vovd1_day30", "cate2:vovd1_day30")
|
|
|
+ left += res._1.toString
|
|
|
+ left_dense1 += res._2.toString
|
|
|
+ left_dense2 += res._3.toString
|
|
|
+
|
|
|
+ // 3 left 和 right 分别 16+47*3=16+141 = 157
|
|
|
+ left ++= left_dense1
|
|
|
+ left ++= left_dense2
|
|
|
+ result.add(
|
|
|
+ (vid_left, left.mkString(",")).productIterator.mkString("\t")
|
|
|
+ )
|
|
|
+ }
|
|
|
+ result.iterator
|
|
|
+ })
|
|
|
+
|
|
|
+
|
|
|
+ val hdfsPath = savePath + "/" + dt
|
|
|
+ if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
|
|
|
+ println("删除路径并开始数据写入:" + hdfsPath)
|
|
|
+ MyHdfsUtils.delete_hdfs_path(hdfsPath)
|
|
|
+ data.coalesce(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
|
|
|
+ } else {
|
|
|
+ println("路径不合法,无法写入:" + hdfsPath)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|