瀏覽代碼

i2i样本制作,第一步。

zhangbo 5 月之前
父節點
當前提交
bbfab1a74d

+ 3 - 2
src/main/scala/com/aliyun/odps/spark/examples/makedata_dssm/makedata_i2i_01_originData_20241127.scala

@@ -25,10 +25,11 @@ object makedata_i2i_01_originData_20241127 {
     val tablePart = param.getOrElse("tablePart", "64").toInt
     val beginStr = param.getOrElse("beginStr", "2024062008")
     val endStr = param.getOrElse("endStr", "2024062023")
-    val savePath = param.getOrElse("savePath", "/dw/recommend/model/41_dssm_i2i_sample/")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/51_dssm_i2i_sample/")
     val project = param.getOrElse("project", "loghubods")
     val repartition = param.getOrElse("repartition", "100").toInt
     val filterHours = param.getOrElse("filterHours", "25").split(",").toSet
+    val negCnt = param.getOrElse("negCnt", "20").toInt
     // 2 读取odps+表信息
     val odpsOps = env.getODPS(sc)
     // 3 循环执行数据生产
@@ -70,7 +71,7 @@ object makedata_i2i_01_originData_20241127 {
             val vids = vids_br.value
             row.foreach {
               case (logKey, vid_left, vid_right) =>
-                val negs = Random.shuffle(vids).take(20).filter(r => !r.equals(vid_left) && !r.equals(vid_right))
+                val negs = Random.shuffle(vids).take(negCnt).filter(r => !r.equals(vid_left) && !r.equals(vid_right))
                 negs.foreach(negVid =>{
                   result.add((logKey, "0", vid_left, negVid).productIterator.mkString("\t"))
                 })

+ 153 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_dssm/makedata_i2i_02_joinFeatureData_20241128.scala

@@ -0,0 +1,153 @@
+package com.aliyun.odps.spark.examples.makedata_dssm
+
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+object makedata_i2i_02_joinFeatureData_20241128 {
+  def func(record: Record, schema: TableSchema): Record = {
+    record
+  }
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val beginStr = param.getOrElse("beginStr", "2024062008")
+    val endStr = param.getOrElse("endStr", "2024062023")
+    val readPath = param.getOrElse("readPath", "/dw/recommend/model/51_dssm_i2i_sample/")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/52_dssm_i2i_joinfeature/")
+    val project = param.getOrElse("project", "loghubods")
+    val repartition = param.getOrElse("repartition", "100").toInt
+    val filterHours = param.getOrElse("filterHours", "25").split(",").toSet
+    // 2 读取odps+表信息
+    val odpsOps = env.getODPS(sc)
+    // 3 循环执行数据生产
+    val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
+    for (dt_hh <- timeRange) {
+      val dt = dt_hh.substring(0, 8)
+      val hh = dt_hh.substring(8, 10)
+      val partition = s"dt=$dt,hh=$hh"
+      // 1 类目特征用broadcast
+      val category1_br = sc.broadcast(
+        odpsOps.readTable(project = project,
+          table = "t_vid_l1_cat_stat_feature",
+          partition = s"dt=$dt",
+          transfer = func,
+          numPartition = tablePart)
+          .map(record =>{
+            val category = record.getString("category1")
+            val feature = record.getString("feature")
+            (category, feature)
+        }).collectAsMap()
+      )
+      val category2_br = sc.broadcast(
+        odpsOps.readTable(project = project,
+            table = "t_vid_l2_cat_stat_feature",
+            partition = s"dt=$dt",
+            transfer = func,
+            numPartition = tablePart)
+          .map(record => {
+            val category = record.getString("category2")
+            val feature = record.getString("feature")
+            (category, feature)
+          }).collectAsMap()
+      )
+      // 2 视频特征用join
+      val vidStaticFeature = odpsOps.readTable(project = project,
+        table = "t_vid_tag_feature",
+        partition = s"dt=$dt",
+        transfer = func,
+        numPartition = tablePart)
+      .map(record => {
+        val vid = record.getString("vid")
+        val feature = record.getString("feature")
+        (vid, feature)
+      })
+      val vidActionFeature = odpsOps.readTable(project = project,
+          table = "t_vid_stat_feature",
+          partition = s"dt=$dt",
+          transfer = func,
+          numPartition = tablePart)
+        .map(record => {
+          val vid = record.getString("vid")
+          val feature = record.getString("feature")
+          (vid, feature)
+        })
+
+      if (filterHours.nonEmpty && filterHours.contains(hh)) {
+        println("不执行partiton:" + partition)
+      } else {
+        println("开始执行partiton:" + partition)
+        val savePartition = dt + hh
+        val sampleData1 = sc.textFile( readPath + "/" + savePartition).map(r=>{
+          val rList = r.split("\t")
+          val logKey = rList(0)
+          val label = rList(1)
+          val vid_left = rList(2)
+          val vid_right = rList(3)
+          (vid_left, (logKey, label, vid_right))
+        }).leftOuterJoin(vidStaticFeature).map{
+          case (vid_left, ((logKey, label, vid_right), Some(feature))) =>
+            (vid_right, (logKey, label, vid_left, feature))
+          case (vid_left, ((logKey, label, vid_right), None)) =>
+            (vid_right, (logKey, label, vid_left, "{}"))
+        }.leftOuterJoin(vidStaticFeature).map{
+          case (vid_right, ((logKey, label, vid_left, feature_left), Some(feature_right))) =>
+            (vid_left, (logKey, label, vid_right, feature_left, feature_right))
+          case (vid_right, ((logKey, label, vid_left, feature_left), None)) =>
+            (vid_left, (logKey, label, vid_right, feature_left, "{}"))
+        }.leftOuterJoin(vidActionFeature).map{
+          case (vid_left, ((logKey, label, vid_right, feature_left, feature_right), Some(feature))) =>
+            (vid_right, (logKey, label, vid_left, feature_left, feature_right, feature))
+          case (vid_left, ((logKey, label, vid_right, feature_left, feature_right), None)) =>
+            (vid_right, (logKey, label, vid_left, feature_left, feature_right, "{}"))
+        }.leftOuterJoin(vidActionFeature).map{
+          case (vid_right, ((logKey, label, vid_left, feature_left, feature_right, feature_left_action), Some(feature))) =>
+            (logKey, label, vid_left, vid_right, feature_left, feature_right, feature_left_action, feature)
+          case (vid_right, ((logKey, label, vid_left, feature_left, feature_right, feature_left_action), None)) =>
+            (logKey, label, vid_left, vid_right, feature_left, feature_right, feature_left_action, "{}")
+        }.mapPartitions(row =>{
+          val result = new ArrayBuffer[String]()
+          val category1 = category1_br.value
+          val category2 = category2_br.value
+          row.foreach{
+            case (logKey, label, vid_left, vid_right, feature_left, feature_right, feature_left_action, feature_right_action) =>
+              val cate1_left = JSON.parseObject(feature_left).getOrDefault("category1", "无").toString
+              val cate2_left = JSON.parseObject(feature_left).getOrDefault("category2_1", "无").toString
+              val cate1_right = JSON.parseObject(feature_right).getOrDefault("category1", "无").toString
+              val cate2_right = JSON.parseObject(feature_right).getOrDefault("category2_1", "无").toString
+              val feature_left_cate1 = category1.getOrElse(cate1_left, "{}")
+              val feature_left_cate2 = category1.getOrElse(cate2_left, "{}")
+              val feature_right_cate1 = category2.getOrElse(cate1_right, "{}")
+              val feature_right_cate2 = category2.getOrElse(cate2_right, "{}")
+              (logKey, label, vid_left, vid_right, feature_left, feature_right, feature_left_action, feature_right_action,
+                feature_left_cate1, feature_right_cate1, feature_left_cate2, feature_right_cate2)
+          }
+          result.iterator
+        })
+        val hdfsPath = savePath + "/" + savePartition
+        if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+          println("删除路径并开始数据写入:" + hdfsPath)
+          MyHdfsUtils.delete_hdfs_path(hdfsPath)
+          sampleData1.coalesce(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+        } else {
+          println("路径不合法,无法写入:" + hdfsPath)
+        }
+
+      }
+    }
+  }
+}

+ 6 - 0
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本-I2I

@@ -0,0 +1,6 @@
+nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata_dssm.makedata_i2i_01_originData_20241127 \
+--master yarn --driver-memory 2G --executor-memory 4G --executor-cores 1 --num-executors 16 \
+./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+beginStr:2024112612 endStr:2024112612 negCnt:20 \
+tablePart:64 savePath:/dw/recommend/model/51_dssm_i2i_sample/ > p51.log 2>&1 &