Browse Source

新模型特征+分析汤姆森数据

zhangbo 11 months ago
parent
commit
4be14d915b

+ 27 - 0
pom.xml

@@ -17,6 +17,11 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
+    <parent>
+        <groupId>com.tzld.commons</groupId>
+        <artifactId>supom</artifactId>
+        <version>1.0.9</version>
+    </parent>
 
     <properties>
         <spark.version>2.3.0</spark.version>
@@ -36,6 +41,28 @@
     <packaging>jar</packaging>
 
     <dependencies>
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>3.12.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.hankcs</groupId>
+            <artifactId>hanlp</artifactId>
+            <version>portable-1.8.2</version>
+        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>com.medallia.word2vec</groupId>-->
+<!--            <artifactId>word2vec</artifactId>-->
+<!--            <version>0.1.42</version>-->
+<!--        </dependency>-->
+
+        <dependency>
+            <groupId>org.xm</groupId>
+            <artifactId>similarity</artifactId>
+            <version>1.1</version>
+        </dependency>
 
         <dependency>
             <groupId>com.alibaba</groupId>

+ 33 - 0
src/main/java/examples/extractor/RankExtractorFeature_20240530.java

@@ -0,0 +1,33 @@
+package examples.extractor;
+
+public class RankExtractorFeature_20240530 {
+
+    public static Double calDiv(double a, double b){
+        if (a == 0 || b == 0){
+            return 0D;
+        }
+        return a / b;
+    }
+    public static Double calLog(double a){
+        if (a <= 0){
+            return 0D;
+        }
+        return Math.log(a + 1.0);
+    }
+
+    public static void main(String[] args) {
+        System.out.println(Math.log(10));
+        System.out.println(Math.log(100));
+        System.out.println(Math.log(1000));
+        System.out.println(Math.log(10000));
+        System.out.println(Math.log(100000));
+
+        System.out.println(Math.log10(10));
+        System.out.println(Math.log10(100));
+        System.out.println(Math.log10(1000));
+        System.out.println(Math.log10(10000));
+        System.out.println(Math.log10(100000));
+    }
+}
+
+

+ 122 - 0
src/main/scala/com/aliyun/odps/spark/examples/ana/ana_01_cidvidpk.scala

@@ -0,0 +1,122 @@
+package com.aliyun.odps.spark.examples.ana
+
+
+import com.alibaba.fastjson.JSONObject
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import java.util
+import java.util.{HashMap, Map}
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+/*
+   所有获取不到的特征,给默认值0.
+ */
+
+object ana_01_cidvidpk {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "ad_engine_statistics_log_per5min")
+    val beginStr = param.getOrElse("beginStr", "2024060208")
+    val endStr = param.getOrElse("endStr", "2024060223")
+    val vidSelect = param.getOrElse("vidSelect", "")
+    val cidsSelect = param.getOrElse("cidsSelect", "").split(",").toSet
+    val apptypeSelect = param.getOrElse("apptype", "")
+
+    // 2 读取odps+表信息
+    val odpsOps = env.getODPS(sc)
+    val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
+    val partitions = new ArrayBuffer[String]()
+    for (dt_hh <- timeRange) {
+      for (mi <- List (
+        "0000", "0500", "1000", "1500", "2000", "2500",
+        "3000", "3500", "4000", "4500", "5000", "5500"
+      )){
+        val partition = dt_hh + mi
+        partitions.add(partition)
+      }
+    }
+    val rdds = partitions.map(p => {
+      odpsOps.readTable(project = project,
+        table = table,
+        partition = p,
+        transfer = func,
+        numPartition = tablePart)
+    }).reduce((r1, r2) => r1.union(r2))
+
+    val data = rdds.map(record=>{
+      val vid = if (record.isNull("videoid")) "" else record.getString("videoid")
+      val recalls = if (record.isNull("creativelist")) "" else record.getString("creativelist")
+      val ranks = if (record.isNull("scoreresult")) "" else record.getString("scoreresult")
+      val apptype = if (record.isNull("apptype")) "" else record.getString("apptype")
+      val abcode = if (record.isNull("adabgroup")) "" else record.getString("adabgroup")
+      (apptype, abcode, vid, recalls, ranks)
+    }).filter(r => r._1.equals(apptypeSelect) && !r._3.equals("") && !r._4.equals("") && !r._5.equals(""))
+      .filter(r=> r._3.equals(vidSelect)) // 过滤的vid
+      .map{
+        case (apptype, abcode, vid, recalls, ranks) =>
+          val recalls_json = JSON.parseArray(recalls).map(r=>{
+            val j = JSON.parseObject(r.toString)
+            j.getOrElse("creativeId", 0).toString
+          }).filter(!_.equals("0")).toSet
+          val ranks_json = JSON.parseArray(ranks).map(r => {
+            val j = JSON.parseObject(r.toString)
+            val adId = j.getOrElse("adId", 0).toString
+            val score = j.getOrElse("score", 0.0)
+            (adId, score.toString.toDouble)
+          })
+          var rankId = ranks_json.get(0)._1
+          var score = ranks_json.get(0)._2
+          for (i <- 1 until ranks_json.size){
+            val item = ranks_json.get(i)
+            if (item._2 > score){
+              rankId = item._1
+              score = item._2
+            }
+          }
+          (apptype, abcode, vid, recalls_json, rankId)
+      }.flatMap({
+        case (apptype, abcode, vid, recalls_json, rankId) =>
+          recalls_json.map(recallId=> {
+            (apptype, abcode, vid, recallId, rankId, recalls_json)
+          })
+      }).filter(r=> cidsSelect.contains(r._4)) // 过滤的cid
+      .map({
+        case (apptype, abcode, vid, recallId, rankId, recalls_json) =>
+          val x1 = 1
+          val x2 = if (recallId.equals(rankId)) 1 else 0
+          val x3 = if (cidsSelect.subsetOf(recalls_json)) 1 else 0
+          val x4 = if (cidsSelect.subsetOf(recalls_json) && recallId.equals(rankId)) 1 else 0
+          ((apptype, abcode, vid, recallId), (x1, x2, x3, x4))
+      }).aggregateByKey(
+        (0, 0, 0, 0)
+      )(
+        seqOp = (runningSum, x) => (runningSum._1 + x._1, runningSum._2 + x._2, runningSum._3 + x._3, runningSum._4 + x._4),
+        combOp = (sum1, sum2) => (sum1._1 + sum2._1, sum1._2 + sum2._2, sum1._3 + sum2._3, sum1._4 + sum2._4)
+      )
+
+    data.collect().foreach(r => println("结果\t" + r._1.productIterator.mkString("\t") + "\t" + r._2.productIterator.mkString("\t")))
+
+  }
+
+
+
+  def func(record: Record, schema: TableSchema): Record = {
+    record
+  }
+
+}

+ 222 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_13_originData_20240529.scala

@@ -0,0 +1,222 @@
+package com.aliyun.odps.spark.examples.makedata
+
+import com.alibaba.fastjson.JSONObject
+import com.alibaba.fastjson.JSON
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
+import examples.extractor.{RankExtractorItemFeatureV2, RankExtractorUserFeatureV2}
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.sql.SparkSession
+
+import java.util
+import java.util.{HashMap, Map}
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import examples.extractor.RankExtractorFeature_20240530
+/*
+   所有获取不到的特征,给默认值0.
+ */
+
+object makedata_13_originData_20240529 {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1 读取参数
+    val param = ParamUtils.parseArgs(args)
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val partitionPrefix = param.getOrElse("partitionPrefix", "dt={},hh={}")
+    val beginStr = param.getOrElse("beginStr", "2023010100")
+    val endStr = param.getOrElse("endStr", "2023010123")
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/13_sample_data/")
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "XXXX")
+
+
+    // 2 读取odps+表信息
+    val odpsOps = env.getODPS(sc)
+
+    // 3 循环执行数据生产
+    val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
+    for (dt_hh <- timeRange) {
+      val dt = dt_hh.substring(0, 8)
+      val hh = dt_hh.substring(8, 10)
+      val partition = s"dt=$dt,hh=$hh"
+      println("开始执行partiton:" + partition)
+      val odpsData = odpsOps.readTable(project = project,
+        table = table,
+        partition = partition,
+        transfer = func,
+        numPartition = tablePart)
+        .map(record => {
+          // a 视频特征
+          val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b1_feature"))
+          val b2: JSONObject = if (record.isNull("b2_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b2_feature"))
+          val b3: JSONObject = if (record.isNull("b3_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b3_feature"))
+          val b6: JSONObject = if (record.isNull("b6_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b6_feature"))
+          val b7: JSONObject = if (record.isNull("b7_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b7_feature"))
+
+          val b8: JSONObject = if (record.isNull("b8_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b8_feature"))
+          val b9: JSONObject = if (record.isNull("b9_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b9_feature"))
+          val b10: JSONObject = if (record.isNull("b10_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b10_feature"))
+          val b11: JSONObject = if (record.isNull("b11_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b11_feature"))
+          val b12: JSONObject = if (record.isNull("b12_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b12_feature"))
+          val b13: JSONObject = if (record.isNull("b13_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b13_feature"))
+          val b17: JSONObject = if (record.isNull("b17_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b17_feature"))
+          val b18: JSONObject = if (record.isNull("b18_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b18_feature"))
+          val b19: JSONObject = if (record.isNull("b19_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("b19_feature"))
+
+          val featureMap = new util.HashMap[String, Double]()
+          val origin_data = List(
+            (b1, b2, b3, "b123"), (b1, b6, b7, "b167"),
+            (b8, b9, b10, "b8910"), (b11, b12, b13, "b111213"),
+            (b17, b18, b19, "b171819")
+          )
+          for ((b_1, b_2, b_3, prefix1) <- origin_data){
+            for (prefix2 <- List(
+              "1h", "2h", "3h", "4h", "12h", "1d", "3d", "7d"
+            )){
+              val exp = if (b_1.isEmpty) 0D else b_1.getIntValue("exp_pv_" + prefix2).toDouble
+              val share = if (b_2.isEmpty) 0D else b_2.getIntValue("share_pv_" + prefix2).toDouble
+              val returns = if (b_3.isEmpty) 0D else b_3.getIntValue("return_uv_" + prefix2).toDouble
+              val f1 = RankExtractorFeature_20240530.calDiv(share, exp)
+              val f2 = RankExtractorFeature_20240530.calLog(share)
+              val f3 = RankExtractorFeature_20240530.calDiv(returns, exp)
+              val f4 = RankExtractorFeature_20240530.calLog(returns)
+              val f5 = f3 * f4
+              featureMap.put(prefix1 + "_" + prefix2 + "_" + "STR", f1)
+              featureMap.put(prefix1 + "_" + prefix2 + "_" + "log(share)", f2)
+              featureMap.put(prefix1 + "_" + prefix2 + "_" + "ROV", f3)
+              featureMap.put(prefix1 + "_" + prefix2 + "_" + "log(return)", f4)
+              featureMap.put(prefix1 + "_" + prefix2 + "_" + "ROV*log(return)", f5)
+            }
+          }
+
+          val video_info: JSONObject = if (record.isNull("t_v_info_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("t_v_info_feature"))
+          featureMap.put("total_time", if (video_info.containsKey("total_time")) video_info.getIntValue("total_time").toDouble else 0D)
+          featureMap.put("bit_rate", if (video_info.containsKey("bit_rate")) video_info.getIntValue("bit_rate").toDouble else 0D)
+
+          val c1: JSONObject = if (record.isNull("c1_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("c1_feature"))
+          featureMap.put("playcnt_6h", if (c1.containsKey("playcnt_6h")) c1.getIntValue("playcnt_6h").toDouble else 0D)
+          featureMap.put("playcnt_1d", if (c1.containsKey("playcnt_1d")) c1.getIntValue("playcnt_1d").toDouble else 0D)
+          featureMap.put("playcnt_3d", if (c1.containsKey("playcnt_3d")) c1.getIntValue("playcnt_3d").toDouble else 0D)
+          featureMap.put("playcnt_7d", if (c1.containsKey("playcnt_7d")) c1.getIntValue("playcnt_7d").toDouble else 0D)
+
+          val d1: JSONObject = if (record.isNull("d1_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("d1_feature"))
+          featureMap.put("return_n", if (c1.containsKey("return_n")) c1.getString("return_n").toDouble else 0D)
+          featureMap.put("rovn", if (c1.containsKey("rovn")) c1.getString("rovn").toDouble else 0D)
+
+
+          /*
+          视频:
+          视频时长、比特率
+
+          曝光使用pv 分享使用pv 回流使用uv --> 1h 2h 3h 4h 12h 1d 3d 7d
+          STR log(share) ROV log(return) ROV*log(return)
+          40个特征组合
+          整体、整体曝光对应、推荐非冷启root、推荐冷启root、分省份root
+          200个特征值
+
+          人:
+          播放次数 --> 6h 1d 3d 7d --> 4
+          带回来的分享pv 回流uv --> 12h 1d 3d 7d --> 8
+          播放点 回流点 --> 2h 1d 3d --> 匹配数量 匹配词 语义最高相似度分 语义平均相似度分
+          分享点 曝光点 (回流点) --> 1d 3d 7d 14d --> 匹配数量 匹配词 语义最高相似度分 语义平均相似度分
+
+          头部视频:
+          曝光 回流 ROVn 3个特征
+
+          场景:
+          小时 星期 apptype city province pagesource 机器型号
+           */
+
+
+          // b
+
+
+
+
+          //4 处理label信息。
+          val labels = Set(
+            "pagesource", "recommend_page_type", "pagesource_change",
+            "abcode",
+            "is_play", "playtime",
+            "is_share", "share_cnt_pv", "share_ts_list",
+            "is_return", "return_cnt_pv", "return_cnt_uv", "return_mid_ts_list"
+          )
+          val labelNew = new JSONObject
+          val labelMap = getFeatureFromSet(labels, record)
+          labels.foreach(r => {
+            if (labelMap.containsKey(r)) {
+              labelNew.put(r, labelMap(r))
+            }
+          })
+          //5 处理log key表头。
+          val mid = record.getString("mid")
+          val videoid = record.getString("videoid")
+          val logtimestamp = record.getString("logtimestamp")
+          val apptype = record.getString("apptype")
+          val pagesource_change = record.getString("pagesource_change")
+          val abcode = record.getString("abcode")
+          val video_recommend = if (!record.isNull("video_recommend")) record.getString("video_recommend") else "111"
+
+          val logKey = (mid, videoid, logtimestamp, apptype, pagesource_change, abcode, video_recommend).productIterator.mkString(":")
+          val labelKey = labelNew.toString()
+          val featureKey = "".toString()
+          //6 拼接数据,保存。
+          logKey + "\t" + labelKey + "\t" + featureKey
+
+        })
+
+
+      // 4 保存数据到hdfs
+      val hdfsPath = savePath + "/" + partition
+      if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")){
+        println("删除路径并开始数据写入:" + hdfsPath)
+        MyHdfsUtils.delete_hdfs_path(hdfsPath)
+        odpsData.saveAsTextFile(hdfsPath, classOf[GzipCodec])
+      }else{
+        println("路径不合法,无法写入:" + hdfsPath)
+      }
+    }
+  }
+
+  def func(record: Record, schema: TableSchema): Record = {
+    record
+  }
+
+  def getFeatureFromSet(set: Set[String], record: Record): mutable.HashMap[String, String] = {
+    val result = mutable.HashMap[String, String]()
+    set.foreach(r =>{
+      if (!record.isNull(r)){
+        try{
+          result.put(r, record.getString(r))
+        }catch {
+          case _ => result.put(r, String.valueOf(record.getBigint(r)))
+        }
+      }
+    })
+    result
+  }
+}

+ 36 - 1
src/main/scala/com/aliyun/odps/spark/examples/myUtils/MyDateUtils.scala

@@ -188,6 +188,31 @@ object MyDateUtils {
     ranges
   }
 
+  // 日期+小时 时间区间生成
+  def getDateHourRange(beginStr: String, endStr: String, format: String = "yyyyMMddHH"): ArrayBuffer[String] = {
+    val ranges = ArrayBuffer[String]()
+    val sdf = new SimpleDateFormat(format)
+    var dateBegin = sdf.parse(beginStr)
+    val dateEnd = sdf.parse(endStr)
+
+    while (dateBegin.compareTo(dateEnd) <= 0) {
+      ranges += sdf.format(dateBegin)
+      // 将开始时间增加一小时
+      dateBegin = addHours(dateBegin, 1)
+    }
+    ranges
+  }
+
+  import java.util.Date
+
+  // 辅助函数,用于给定的日期增加小时
+  def addHours(date: Date, hours: Int): Date = {
+    val cal = Calendar.getInstance()
+    cal.setTime(date)
+    cal.add(java.util.Calendar.HOUR_OF_DAY, hours)
+    cal.getTime
+  }
+
   import java.time.LocalDate
   import java.time.temporal.ChronoUnit
   def calculateDateDifference(startDate: String, endDate: String): Long = {
@@ -206,6 +231,16 @@ object MyDateUtils {
 //    val b = to.getTime / 3600
 //    println(b-a)
 
-    var from = DateUtils.parseDate("20240228", Array[String]("yyyyMMdd")).getTime / 1000
+    var from = getDateHourRange("2024050123", "2024050203")
+    from.foreach(println)
+
+    val partitionPrefix = "dt={},hh={}"
+    println(partitionPrefix.stripMargin.format("XX", "YY"))
+
+    val stdxx = "2024050116"
+    val dt = stdxx.substring(0, 8)
+    val hh = stdxx.substring(8, 10)
+    println(dt)
+    println(hh)
   }
 }

+ 29 - 5
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本

@@ -1,18 +1,17 @@
+
+【新 上游样本】
 nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.examples.makedata.makedata_10_originData_v3 \
 --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 64 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
 tablePart:64 savePath:/dw/recommend/model/10_sample_data_v3/ beginStr:20240227 endStr:20240227 > p10_.log 2>&1 &
 
-
-
-
 [ros样本生产]
 nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
 --class com.aliyun.odps.spark.examples.makedata.makedata_12_rosData_v3 \
 --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
-savePath:/dw/recommend/model/12_ros_data_v3_test/ beginStr:20240228 endStr:20240228 ifRepart:10 \
+savePath:/dw/recommend/model/12_ros_data_v3/ beginStr:20240228 endStr:20240228 ifRepart:10 \
 > p12_1.log 2>&1 &
 
 nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
@@ -39,4 +38,29 @@ nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.s
 --conf spark.yarn.executor.memoryoverhead=1024 \
 /root/zhangbo/recommend-emr-dataprocess/target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
 date:20240302 tablePart:96 expireDay:3 ifWriteRedisUser:True ifUser:True midDays:14 redisLimit:80000000 \
-savePathUser:/dw/recommend/model/09_feature/user/ > p09.log 2>&1 &
+savePathUser:/dw/recommend/model/09_feature/user/ > p09.log 2>&1 &
+
+
+
+--------------
+【旧STR 上游样本】
+nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata.makedata_06_originData \
+--master yarn --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 32 \
+--conf spark.yarn.executor.memoryoverhead=1024 \
+--conf spark.shuffle.service.enabled=true \
+--conf spark.shuffle.service.port=7337 \
+--conf spark.shuffle.consolidateFiles=true \
+--conf spark.shuffle.manager=sort \
+--conf spark.storage.memoryFraction=0.4 \
+--conf spark.shuffle.memoryFraction=0.5 \
+--conf spark.default.parallelism=200 \
+./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+tablePart:64 savePath:/dw/recommend/model/00_sample_data/ beginStr:20240311 endStr:20240312 > p6.log 2>&1 &
+【旧STR 训练数据】
+nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.spark.deploy.SparkSubmit \
+--class com.aliyun.odps.spark.examples.makedata.makedata_07_strData \
+--master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
+./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
+savePath:/dw/recommend/model/04_str_data/ beginStr:20240311 endStr:20240312 featureVersion:v4 ifRepart:100 \
+> p7.log 2>&1 &

+ 3 - 0
zhangbo/02_train_go.sh

@@ -24,3 +24,6 @@ done
 
 # nohup sh 02_train_go.sh 20240226 20240228 model_tom /dw/recommend/model/11_str_data_v3/ 0,1,0 >p2_model_tom.log 2>&1 &
 # nohup sh 02_train_go.sh 20240226 20240228 model_jerry /dw/recommend/model/12_ros_data_v3/ 0,1,0 >p2_model_jerry.log 2>&1 &
+
+
+# nohup sh 02_train_go.sh 20240311 20240314 model_str_mid /dw/recommend/model/04_str_data/ 1,1,0 >p2_model_str_mid.log 2>&1 &

+ 4 - 2
zhangbo/04_upload.sh

@@ -1,6 +1,6 @@
 
-cat /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_sharev2_20231220.txt | sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' > /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_sharev2_20231220_change.txt
-dfs -put /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_sharev2_20231220_change.txt oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/video_str_model/
+cat /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_str_mid_20240313.txt | sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' > /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_str_mid_20240313_change.txt
+dfs -put /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_str_mid_20240313_change.txt oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/video_str_model/model_str_mid.txt
 
 
 cat /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_sharev2_20231220.txt | sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' > /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_sharev2_20231220_change.txt
@@ -25,3 +25,5 @@ dfs -put /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_tom_2024022
 
 cat /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_jerry_20240225.txt | sed '1d' | awk -F " " '{if($2!="0") print $1"\t"$2}' > /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_jerry_20240225_change.txt
 dfs -put /root/zhangbo/recommend-emr-dataprocess/zhangbo/model/model_jerry_20240225_change.txt oss://art-recommend.oss-cn-hangzhou.aliyuncs.com/zhangbo/model_jerry.txt
+
+