zhangbo преди 1 година
родител
ревизия
4aa5d81c35

+ 110 - 56
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_13_originData_20240529.scala

@@ -5,17 +5,14 @@ import com.alibaba.fastjson.JSON
 import com.aliyun.odps.TableSchema
 import com.aliyun.odps.data.Record
 import com.aliyun.odps.spark.examples.myUtils.{MyDateUtils, MyHdfsUtils, ParamUtils, env}
-import examples.extractor.{RankExtractorItemFeatureV2, RankExtractorUserFeatureV2}
 import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.spark.sql.SparkSession
-
-import java.util
-import java.util.{HashMap, Map}
 import scala.collection.JavaConversions._
-import scala.collection.mutable
 import examples.extractor.RankExtractorFeature_20240530
+import org.xm.Similarity
+import scala.collection.mutable.ArrayBuffer
 /*
-   所有获取不到的特征,给默认值0.
+   20240608 提取特征
  */
 
 object makedata_13_originData_20240529 {
@@ -35,7 +32,7 @@ object makedata_13_originData_20240529 {
     val savePath = param.getOrElse("savePath", "/dw/recommend/model/13_sample_data/")
     val project = param.getOrElse("project", "loghubods")
     val table = param.getOrElse("table", "XXXX")
-
+    val repartition = param.getOrElse("repartition", "20").toInt
 
     // 2 读取odps+表信息
     val odpsOps = env.getODPS(sc)
@@ -53,6 +50,9 @@ object makedata_13_originData_20240529 {
         transfer = func,
         numPartition = tablePart)
         .map(record => {
+
+          val featureMap = new JSONObject()
+
           // a 视频特征
           val b1: JSONObject = if (record.isNull("b1_feature")) new JSONObject() else
             JSON.parseObject(record.getString("b1_feature"))
@@ -84,7 +84,7 @@ object makedata_13_originData_20240529 {
           val b19: JSONObject = if (record.isNull("b19_feature")) new JSONObject() else
             JSON.parseObject(record.getString("b19_feature"))
 
-          val featureMap = new util.HashMap[String, Double]()
+
           val origin_data = List(
             (b1, b2, b3, "b123"), (b1, b6, b7, "b167"),
             (b8, b9, b10, "b8910"), (b11, b12, b13, "b111213"),
@@ -117,21 +117,79 @@ object makedata_13_originData_20240529 {
 
           val c1: JSONObject = if (record.isNull("c1_feature")) new JSONObject() else
             JSON.parseObject(record.getString("c1_feature"))
-          featureMap.put("playcnt_6h", if (c1.containsKey("playcnt_6h")) c1.getIntValue("playcnt_6h").toDouble else 0D)
-          featureMap.put("playcnt_1d", if (c1.containsKey("playcnt_1d")) c1.getIntValue("playcnt_1d").toDouble else 0D)
-          featureMap.put("playcnt_3d", if (c1.containsKey("playcnt_3d")) c1.getIntValue("playcnt_3d").toDouble else 0D)
-          featureMap.put("playcnt_7d", if (c1.containsKey("playcnt_7d")) c1.getIntValue("playcnt_7d").toDouble else 0D)
+          if (c1.nonEmpty){
+            featureMap.put("playcnt_6h", if (c1.containsKey("playcnt_6h")) c1.getIntValue("playcnt_6h").toDouble else 0D)
+            featureMap.put("playcnt_1d", if (c1.containsKey("playcnt_1d")) c1.getIntValue("playcnt_1d").toDouble else 0D)
+            featureMap.put("playcnt_3d", if (c1.containsKey("playcnt_3d")) c1.getIntValue("playcnt_3d").toDouble else 0D)
+            featureMap.put("playcnt_7d", if (c1.containsKey("playcnt_7d")) c1.getIntValue("playcnt_7d").toDouble else 0D)
+          }
+          val c2: JSONObject = if (record.isNull("c2_feature")) new JSONObject() else
+            JSON.parseObject(record.getString("c2_feature"))
+          if (c2.nonEmpty){
+            featureMap.put("share_pv_12h", if (c2.containsKey("share_pv_12h")) c2.getIntValue("share_pv_12h").toDouble else 0D)
+            featureMap.put("share_pv_1d", if (c2.containsKey("share_pv_1d")) c2.getIntValue("share_pv_1d").toDouble else 0D)
+            featureMap.put("share_pv_3d", if (c2.containsKey("share_pv_3d")) c2.getIntValue("share_pv_3d").toDouble else 0D)
+            featureMap.put("share_pv_7d", if (c2.containsKey("share_pv_7d")) c2.getIntValue("share_pv_7d").toDouble else 0D)
+            featureMap.put("return_uv_12h", if (c2.containsKey("return_uv_12h")) c2.getIntValue("return_uv_12h").toDouble else 0D)
+            featureMap.put("return_uv_1d", if (c2.containsKey("return_uv_1d")) c2.getIntValue("return_uv_1d").toDouble else 0D)
+            featureMap.put("return_uv_3d", if (c2.containsKey("return_uv_3d")) c2.getIntValue("return_uv_3d").toDouble else 0D)
+            featureMap.put("return_uv_7d", if (c2.containsKey("return_uv_7d")) c2.getIntValue("return_uv_7d").toDouble else 0D)
+          }
+
+          val title = if (video_info.containsKey("title")) video_info.getString("title") else ""
+          if (!title.equals("")){
+            for (key_feature <- List("c3_feature", "c4_feature", "c5_feature", "c6_feature", "c7_feature")){
+              val c34567: JSONObject = if (record.isNull(key_feature)) new JSONObject() else
+                JSON.parseObject(record.getString(key_feature))
+              for (key_time <- List("tags_1d", "tags_3d", "tags_7d")) {
+                val tags = if (c34567.containsKey(key_time)) c34567.getString(key_time) else ""
+                if (!tags.equals("")){
+                  val (f1, f2, f3, f4) = funcC34567ForTags(tags, title)
+                  featureMap.put(key_feature + "_" + key_time + "_matchnum", f1)
+                  featureMap.put(key_feature + "_" + key_time + "_maxscore", f3)
+                  featureMap.put(key_feature + "_" + key_time + "_avgscore", f4)
+                }
+              }
+            }
+          }
+
+          val vid = if (record.isNull("vid")) "" else record.getString("vid")
+          if (!vid.equals("")){
+            for (key_feature <- List("c8_feature", "c9_feature")){
+              val c89: JSONObject = if (record.isNull(key_feature)) new JSONObject() else
+                JSON.parseObject(record.getString(key_feature))
+              for (key_action <- List("share", "return")){
+                  val cfListStr = if (c89.containsKey(key_action)) c89.getString(key_action) else ""
+                  if (!cfListStr.equals("")){
+                    val cfMap = cfListStr.split(",").map(r =>{
+                      val rList = r.split(":")
+                      (rList(0), (rList(1), rList(2), rList(3)))
+                    }).toMap
+                    if (cfMap.contains(vid)){
+                      val (score, num, rank) = cfMap(vid)
+                      featureMap.put(key_feature + "_" + key_action + "_score", score.toDouble)
+                      featureMap.put(key_feature + "_" + key_action + "_num", num.toDouble)
+                      featureMap.put(key_feature + "_" + key_action + "_rank", 1.0 / rank.toDouble)
+                    }
+                  }
+              }
+            }
+          }
 
           val d1: JSONObject = if (record.isNull("d1_feature")) new JSONObject() else
             JSON.parseObject(record.getString("d1_feature"))
-          featureMap.put("return_n", if (c1.containsKey("return_n")) c1.getString("return_n").toDouble else 0D)
-          featureMap.put("rovn", if (c1.containsKey("rovn")) c1.getString("rovn").toDouble else 0D)
+          if (d1.nonEmpty){
+            featureMap.put("d1_exp", if (d1.containsKey("exp")) d1.getString("exp").toDouble else 0D)
+            featureMap.put("d1_return_n", if (d1.containsKey("return_n")) d1.getString("return_n").toDouble else 0D)
+            featureMap.put("d1_rovn", if (d1.containsKey("rovn")) d1.getString("rovn").toDouble else 0D)
+          }
 
 
           /*
           视频:
           视频时长、比特率
 
+          视频:
           曝光使用pv 分享使用pv 回流使用uv --> 1h 2h 3h 4h 12h 1d 3d 7d
           STR log(share) ROV log(return) ROV*log(return)
           40个特征组合
@@ -141,8 +199,10 @@ object makedata_13_originData_20240529 {
           人:
           播放次数 --> 6h 1d 3d 7d --> 4个
           带回来的分享pv 回流uv --> 12h 1d 3d 7d --> 8个
-          播放点 回流点 --> 2h 1d 3d --> 匹配数量 匹配词 语义最高相似度分 语义平均相似度分
-          分享点 曝光点 (回流点) --> 1d 3d 7d 14d --> 匹配数量 匹配词 语义最高相似度分 语义平均相似度分
+          人+vid-title:
+          播放点/回流点/分享点/累积分享/累积回流 --> 1d 3d 7d --> 匹配数量 匹配词 语义最高相似度分 语义平均相似度分 --> 60个
+          人+vid-cf
+          基于分享行为/基于回流行为 -->  “分享cf”+”回流点击cf“ 相似分 相似数量 相似rank的倒数 --> 12个
 
           头部视频:
           曝光 回流 ROVn 3个特征
@@ -152,50 +212,39 @@ object makedata_13_originData_20240529 {
            */
 
 
-          // b
-
-
-
 
           //4 处理label信息。
-          val labels = Set(
-            "pagesource", "recommend_page_type", "pagesource_change",
-            "abcode",
-            "is_play", "playtime",
-            "is_share", "share_cnt_pv", "share_ts_list",
-            "is_return", "return_cnt_pv", "return_cnt_uv", "return_mid_ts_list"
-          )
-          val labelNew = new JSONObject
-          val labelMap = getFeatureFromSet(labels, record)
-          labels.foreach(r => {
-            if (labelMap.containsKey(r)) {
-              labelNew.put(r, labelMap(r))
+          val labels = new JSONObject
+          for (labelKey <- List(
+            "is_play", "is_share", "is_return", "noself_is_return", "return_uv", "noself_return_uv", "total_return_uv",
+            "share_pv", "total_share_uv"
+          )){
+            if (!record.isNull(labelKey)){
+              labels.put(labelKey, record.getString(labelKey))
             }
-          })
+          }
           //5 处理log key表头。
-          val mid = record.getString("mid")
-          val videoid = record.getString("videoid")
-          val logtimestamp = record.getString("logtimestamp")
           val apptype = record.getString("apptype")
-          val pagesource_change = record.getString("pagesource_change")
+          val pagesource = record.getString("pagesource")
+          val mid = record.getString("mid")
+          // vid 已经提取了
+          val ts = record.getString("ts")
           val abcode = record.getString("abcode")
-          val video_recommend = if (!record.isNull("video_recommend")) record.getString("video_recommend") else "111"
-
-          val logKey = (mid, videoid, logtimestamp, apptype, pagesource_change, abcode, video_recommend).productIterator.mkString(":")
-          val labelKey = labelNew.toString()
-          val featureKey = "".toString()
+          val level = if (record.isNull("level")) "0" else record.getString("level")
+          val logKey = (apptype, pagesource, mid, vid, ts, abcode, level).productIterator.mkString(",")
+          val labelKey = labels.toString()
+          val featureKey = featureMap.toString()
           //6 拼接数据,保存。
           logKey + "\t" + labelKey + "\t" + featureKey
 
         })
 
-
       // 4 保存数据到hdfs
       val hdfsPath = savePath + "/" + partition
       if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")){
         println("删除路径并开始数据写入:" + hdfsPath)
         MyHdfsUtils.delete_hdfs_path(hdfsPath)
-        odpsData.saveAsTextFile(hdfsPath, classOf[GzipCodec])
+        odpsData.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
       }else{
         println("路径不合法,无法写入:" + hdfsPath)
       }
@@ -205,18 +254,23 @@ object makedata_13_originData_20240529 {
   def func(record: Record, schema: TableSchema): Record = {
     record
   }
-
-  def getFeatureFromSet(set: Set[String], record: Record): mutable.HashMap[String, String] = {
-    val result = mutable.HashMap[String, String]()
-    set.foreach(r =>{
-      if (!record.isNull(r)){
-        try{
-          result.put(r, record.getString(r))
-        }catch {
-          case _ => result.put(r, String.valueOf(record.getBigint(r)))
-        }
+  def funcC34567ForTags(tags: String, title: String): Tuple4[Double, String, Double, Double] = {
+    // 匹配数量 匹配词 语义最高相似度分 语义平均相似度分
+    val tagsList = tags.split(",")
+    var d1 = 0.0
+    val d2 = ArrayBuffer()
+    var d3 = 0.0
+    var d4 = 0.0
+    for (tag <- tagsList){
+      if (title.contains(tag)){
+        d1 = d1 + 1.0
+        d2.add(tag)
       }
-    })
-    result
+      val score = Similarity.conceptSimilarity(tag, title)
+      d3 = if (score > d3) score else d3
+      d4 = d4 + score
+    }
+    d4 = if (tagsList.nonEmpty) d4 / tagsList.size else d4
+    (d1, d2.mkString(","), d3, d4)
   }
 }

+ 2 - 2
src/main/scala/com/aliyun/odps/spark/examples/临时记录的脚本【分析】

@@ -3,6 +3,6 @@ nohup /opt/apps/SPARK2/spark-2.4.8-hadoop3.2-1.0.8/bin/spark-class2 org.apache.s
 --master yarn --driver-memory 1G --executor-memory 1G --executor-cores 1 --num-executors 32 \
 ./target/spark-examples-1.0.0-SNAPSHOT-shaded.jar \
 tablePart:5 \
-beginStr:2024060208 endStr:2024060223 \
-vidSelect:21006075 cidsSelect:1902,1310 apptype:0 \
+beginStr:2024060211 endStr:2024060211 \
+vidSelect:21006075 cidsSelect:1155,1902 apptype:0 \
 > p01_ana.log 2>&1 &