jch 1 mese fa
parent
commit
d38cbcd6e6

+ 182 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_83_originData_20250317.scala

@@ -0,0 +1,182 @@
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
+
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils._
+import examples.utils.SimilarityUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+/*
+   20250317 提取特征
+ */
+
+object makedata_recsys_83_originData_20250317 {
+  private val labelNames = List(
+    "is_share", "share_cnt",
+    "is_return_1", "return_1_uv",
+    "is_return_n", "return_n_uv",
+    "is_return_noself", "return_1_uv_noself",
+    "is_return_n_noself", "return_n_uv_noself"
+  )
+
+  private def parseVideoRdd(videoRdd: RDD[Record]): RDD[(String, java.util.Map[String, String])] = {
+    videoRdd
+      .map(record => {
+        val vid = record.getString("vid")
+        val feature = ConvertUtils.getRecordCol(record, "feature")
+        feature.put("vid", vid)
+        (vid, feature)
+      })
+      .reduceByKey((a, b) => if (a.size() > b.size()) a else b)
+  }
+
+  private def joinVideoStat(logRdd: RDD[java.util.Map[String, String]], videoStat: RDD[(String, String)]): RDD[java.util.Map[String, String]] = {
+    logRdd
+      .map(raw => {
+        val vid = raw.getOrElse("vid", "")
+        (vid, raw)
+      })
+      .leftOuterJoin(videoStat)
+      .map(raw => {
+        val info = raw._2._1
+        if (raw._2._2.isDefined) {
+          info.put("b0_feature", raw._2._2.get)
+        }
+        info
+      })
+  }
+
+  private def getVidMidRdd(logRdd: RDD[java.util.Map[String, String]]): RDD[(String, String)] = {
+    logRdd
+      .map(raw => {
+        val mid = raw.getOrElse("mid", "")
+        val c9 = raw.getOrElse("c9_feature", "")
+        (mid, c9)
+      })
+      .filter(_._1.nonEmpty)
+      .reduceByKey((a, b) => a)
+      .flatMap(raw => {
+        val result = new ArrayBuffer[(String, String)]
+        for (hVid <- ConvertUtils.getVidList(raw._2)) {
+          result += ((hVid, raw._1)) // (vid, mid)
+        }
+        result
+      })
+  }
+
+  private def getMidSeqRdd(vidMidRdd: RDD[(String, String)], videoRdd: RDD[(String, java.util.Map[String, String])]): RDD[(String, List[java.util.Map[String, String]])] = {
+    vidMidRdd
+      .join(videoRdd)
+      .map(raw => {
+        (raw._2._1, raw._2._2)
+      })
+      .groupByKey()
+      .map(raw => {
+        (raw._1, raw._2.toList)
+      })
+  }
+
+  private def joinMidSeq(logRdd: RDD[java.util.Map[String, String]], midSeqRdd: RDD[(String, List[java.util.Map[String, String]])]): RDD[(java.util.Map[String, String], List[java.util.Map[String, String]])] = {
+    logRdd
+      .map(raw => {
+        val mid = raw.getOrElse("mid", "")
+        (mid, raw)
+      })
+      .leftOuterJoin(midSeqRdd)
+      .map(raw => {
+        if (raw._2._2.isDefined) {
+          (raw._2._1, raw._2._2.get)
+        } else {
+          (raw._2._1, List())
+        }
+      })
+  }
+
+  private def getFeature(rdd: RDD[(java.util.Map[String, String], List[java.util.Map[String, String]])]): RDD[String] = {
+    rdd.mapPartitions(partition => {
+      SimilarityUtils.init()
+      partition.map(raw => {
+        val record = raw._1
+        val videoSeq = raw._2
+        val logKey = DataUtils.getLogKey(record)
+        val labels = DataUtils.getLabels(labelNames, record).toString
+        val features = ConvertV2.getFeature(record, videoSeq, 6).toString
+        val scoresMap = DataUtils.getSubJson(record, "extend_alg", "scoresMap").toString
+        logKey + "\t" + labels + "\t" + scoresMap + "\t" + features
+      })
+    })
+  }
+
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1. 解析参数
+    val param = ParamUtils.parseArgs(args)
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "dwd_recsys_alg_sample_all_20250212")
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val beginStr = param.getOrElse("beginStr", "2025031700")
+    val endStr = param.getOrElse("endStr", "2025031700")
+    val whatLabel = param.getOrElse("whatLabel", "is_share")
+    val fuSampleRate = param.getOrElse("fuSampleRate", "0.04").toDouble
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/83_origin_data/")
+    val repartition = param.getOrElse("repartition", "64").toInt
+
+    // 2. 数据处理
+    val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
+    for (dt_hh <- timeRange) {
+      // 2.1 分区
+      val dt = dt_hh.substring(0, 8)
+      val hh = dt_hh.substring(8, 10)
+      val partition = "dt=%s,hh=%s".format(dt, hh)
+      println("开始执行partition:" + partition)
+
+      // 2.2 加载视频信息
+      val originVideo = DataUtils.getODPSData(sc, project, "alg_recsys_feature_user_behavior_video", partition, tablePart)
+      val uniqVideo = parseVideoRdd(originVideo)
+
+      // 2.3 加载视频统计信息
+      val videoStat = DataUtils.getODPSData(sc, project, "alg_recsys_feature_video_clean_stat", partition, tablePart)
+        .map(record => {
+          val vid = record.getString("vid")
+          val feature = record.getString("feature")
+          (vid, feature)
+        })
+
+      // 2.4 加载样本数据
+      val odpsData = DataUtils.getODPSData(sc, project, table, partition, tablePart)
+
+      // 2.5 样本重采样
+      val resampleData = DataUtils.resampleWithoutInterception(whatLabel, fuSampleRate, odpsData)
+        .map(record => {
+          ConvertUtils.record2Map(record)
+        })
+
+      // 2.6 join video stat
+      val sampleData = joinVideoStat(resampleData, videoStat)
+
+      // 2.7 get vid mid rdd
+      val vidMidRdd = getVidMidRdd(sampleData)
+
+      // 2.8 get mid seq rdd
+      val midSeqRdd = getMidSeqRdd(vidMidRdd, uniqVideo)
+
+      // 2.9 历史行为关联video
+      val seqSampleData = joinMidSeq(sampleData, midSeqRdd)
+
+      // 2.10 特征转换
+      val featureData = getFeature(seqSampleData)
+
+      // 2.11 保存数据
+      val hdfsPath = "%s/%s%s".format(savePath, dt, hh)
+      DataUtils.saveData(featureData, hdfsPath, repartition)
+    }
+  }
+}

+ 1 - 1
src/main/scala/com/aliyun/odps/spark/examples/myUtils/ConvertUtils.java

@@ -188,7 +188,7 @@ public class ConvertUtils {
         return map;
     }
 
-    private static JSONObject filterAndTruncate(Map<String, Double> featMap, int scale) {
+    public static JSONObject filterAndTruncate(Map<String, Double> featMap, int scale) {
         JSONObject jsonObject = new JSONObject();
         for (Map.Entry<String, Double> entry : featMap.entrySet()) {
             double value = entry.getValue();

+ 100 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/ConvertV2.java

@@ -0,0 +1,100 @@
+package com.aliyun.odps.spark.examples.myUtils;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ConvertV2 {
+    public static JSONObject getFeature(Map<String, String> record, List<Map<String, String>> videoSeq, int scale) {
+        Map<String, Double> featMap = new HashMap<>();
+
+        // origin info
+        String ts = record.get("ts");
+        long currentMs = Long.parseLong(ts) * 1000;
+        String vid = record.get("vid");
+        Map<String, String> headInfo = ConvertUtils.getRecordCol(record, "v2_feature");
+        Map<String, String> rankInfo = ConvertUtils.getRecordCol(record, "v1_feature");
+        Map<String, Map<String, String>> userOriginInfo = getUserOriginInfo(record);
+        Map<String, Map<String, Map<String, String>>> videoOriginInfo = getVideoOriginInfo(record);
+
+        // parse info
+        Map<String, String> c9Map = userOriginInfo.get("alg_recsys_feature_user_share_return_stat");
+        UserShareReturnProfile userProfile = JSON.parseObject(ConvertUtils.toJson(c9Map), UserShareReturnProfile.class);
+        Map<String, Map<String, String>> historyVideoMap = ConvertUtils.list2Map(videoSeq);
+
+        Map<String, Map<String, String[]>> c7Map = FeatureTransformV2.parseUCFScore(userOriginInfo.get("alg_mid_feature_sharecf"));
+        Map<String, Map<String, String[]>> c8Map = FeatureTransformV2.parseUCFScore(userOriginInfo.get("alg_mid_feature_returncf"));
+
+        // context feature
+        FeatureTransformV2.getContextFeature(currentMs, featMap);
+
+        // head video feature
+        FeatureTransformV2.getVideoBaseFeature("h", currentMs, headInfo, featMap);
+
+        // user feature
+        FeatureTransformV2.getUserFeature(userOriginInfo, featMap);
+        FeatureTransformV2.getUserProfileFeature(userProfile, record, featMap);
+
+        // user & video feature
+        FeatureTransformV2.getUserTagsCrossVideoFeature("c5", rankInfo, userOriginInfo.get("alg_mid_feature_return_tags"), featMap);
+        FeatureTransformV2.getUserTagsCrossVideoFeature("c6", rankInfo, userOriginInfo.get("alg_mid_feature_share_tags"), featMap);
+        FeatureTransformV2.getUserCFFeature("c7", vid, c7Map, featMap);
+        FeatureTransformV2.getUserCFFeature("c8", vid, c8Map, featMap);
+
+        // rank video feature
+        FeatureTransformV2.getVideoBaseFeature("r", currentMs, rankInfo, featMap);
+        FeatureTransformV2.getVideoFeature(vid, videoOriginInfo, featMap);
+
+        // head&rank cross feature
+        FeatureTransformV2.getHeadRankVideoCrossFeature(headInfo, rankInfo, featMap);
+
+        // user profile & rank cross
+        FeatureTransformV2.getProfileVideoCrossFeature(currentMs, userProfile, rankInfo, historyVideoMap, featMap);
+
+        return ConvertUtils.filterAndTruncate(featMap, scale);
+    }
+
+    private static Map<String, Map<String, String>> getUserOriginInfo(Map<String, String> record) {
+        Map<String, Map<String, String>> map = new HashMap<>();
+        map.put("mid_global_feature_20250212", ConvertUtils.getRecordCol(record, "c1_feature"));
+        map.put("mid_u2u_friend_index_feature_20250212", ConvertUtils.getRecordCol(record, "c4_feature"));
+        map.put("alg_mid_feature_return_tags", ConvertUtils.getRecordCol(record, "c5_feature"));
+        map.put("alg_mid_feature_share_tags", ConvertUtils.getRecordCol(record, "c6_feature"));
+        map.put("alg_mid_feature_sharecf", ConvertUtils.getRecordCol(record, "c7_feature"));
+        map.put("alg_mid_feature_returncf", ConvertUtils.getRecordCol(record, "c8_feature"));
+        map.put("alg_recsys_feature_user_share_return_stat", ConvertUtils.getRecordCol(record, "c9_feature"));
+        return map;
+    }
+
+    private static Map<String, Map<String, Map<String, String>>> getVideoOriginInfo(Map<String, String> record) {
+        Map<String, Map<String, String>> map = new HashMap<>();
+        map.put("alg_recsys_feature_video_clean_stat", ConvertUtils.getRecordCol(record, "b0_feature"));
+        map.put("alg_vid_global_feature_20250212", ConvertUtils.getRecordCol(record, "b1_feature"));
+        map.put("alg_vid_recommend_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b2_feature"));
+        map.put("alg_vid_recommend_flowpool_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b3_feature"));
+        map.put("alg_vid_apptype_recommend_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b4_feature"));
+        map.put("alg_vid_province_recommend_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b5_feature"));
+        map.put("alg_vid_brand_recommend_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b6_feature"));
+        map.put("alg_vid_hotsencetype_recommend_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b7_feature"));
+        map.put("alg_merge_cate1_recommend_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b8_feature"));
+        map.put("alg_merge_cate2_recommend_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b9_feature"));
+        map.put("alg_channel_recommend_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b10_feature"));
+        map.put("alg_festive_recommend_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b11_feature"));
+        //map.put("alg_vid_long_period_recommend_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b12_feature"));
+        map.put("alg_video_unionid_recommend_exp_feature_20250212", ConvertUtils.getRecordCol(record, "b13_feature"));
+        map.put("scene_type_vid_cf_feature_20250212", ConvertUtils.getRecordCol(record, "d1_feature"));
+        map.put("vid_click_cf_feature_20250212", ConvertUtils.getRecordCol(record, "d2_feature"));
+        map.put("alg_recsys_feature_cf_i2i_v2", ConvertUtils.getRecordCol(record, "d3_feature"));
+        // 特征
+        map.put("mid_merge_cate1_feature_20250212", ConvertUtils.getRecordCol(record, "c2_feature"));
+        map.put("mid_merge_cate2_feature_20250212", ConvertUtils.getRecordCol(record, "c3_feature"));
+
+        Map<String, Map<String, Map<String, String>>> allMap = new HashMap<>();
+        String vid = record.get("vid");
+        allMap.put(vid, map);
+        return allMap;
+    }
+}

+ 18 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/DataUtils.scala

@@ -27,6 +27,13 @@ object DataUtils {
     })
   }
 
+  def resampleWithoutInterception(name: String, negRate: Double, rdd: RDD[Record]): RDD[Record] = {
+    rdd.filter(record => {
+      val label = getStringValue(record, name, "0").toDouble
+      label > 0 || (!interception(record, "extend", "animationSceneType", "interception") && new Random().nextDouble() <= negRate)
+    })
+  }
+
   def getLogKey(record: java.util.Map[String, String]): String = {
     val apptype = record.getOrElse("apptype", "")
     val page = record.getOrElse("page", "")
@@ -83,4 +90,15 @@ object DataUtils {
     }
     new JSONObject()
   }
+
+  def interception(record: Record, key1: String, key2: String, value2: String): Boolean = {
+    if (!record.isNull(key1)) {
+      val obj = JSON.parseObject(record.getString(key1))
+      if (obj.nonEmpty && obj.containsKey(key2)) {
+        val data = obj.getString(key2)
+        return data.equals(value2)
+      }
+    }
+    false
+  }
 }

+ 500 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/FeatureTransformV2.java

@@ -0,0 +1,500 @@
+package com.aliyun.odps.spark.examples.myUtils;
+
+import examples.utils.SimilarityUtils;
+
+import java.util.*;
+
+public class FeatureTransformV2 {
+    private static final int seqMaxN = 2;
+    private static final int seqLastN = 2;
+    private static final double smoothPlus = 5.0;
+    private static final double log1Scale = 10.0;
+    private static final List<String> c1Periods = Arrays.asList("72h", "168h");
+    private static final List<String> c4Periods = Arrays.asList("72h", "168h");
+    private static final List<String> b0Periods = Arrays.asList("1h", "3h", "6h", "12h");
+    private static final List<String> b1Periods = Arrays.asList("1h", "3h", "24h", "72h", "168h");
+    private static final List<String> b2Periods = Arrays.asList("1h", "3h", "24h");
+    private static final List<String> b3Periods = Arrays.asList("24h", "168h");
+    private static final List<String> b4Periods = Arrays.asList("1h", "12h");
+    private static final List<String> b5Periods = Arrays.asList("72h", "168h");
+    private static final List<String> b6Periods = Arrays.asList("1h", "24h");
+    private static final List<String> b7Periods = Arrays.asList("24h", "168h");
+    private static final List<String> b8Periods = Arrays.asList("24h");
+    private static final List<String> b9Periods = Arrays.asList("24h");
+    private static final List<String> b10Periods = Arrays.asList("1h", "12h");
+    private static final List<String> b11Periods = Arrays.asList("12h", "168h");
+    private static final List<String> b13Periods = Arrays.asList("24h", "168h");
+    private static final List<String> dayPeriods = Arrays.asList("7d", "14d", "30d", "60d");
+    private static final List<String> videoCateAttrs = Arrays.asList(FeatureUtils.cate1Attr, FeatureUtils.cate2Attr, FeatureUtils.festive1Attr);
+    private static final List<String> videoSimAttrs = Arrays.asList("title", "cate2", "cate2_list", "keywords");
+    private static final List<String> hVideoSimAttrs = Arrays.asList("title");
+    private static final List<String> cfList = Arrays.asList("share", "return");
+    private static final List<String> userAttrList = Arrays.asList("province", "city", "model", "brand", "system");
+
+    public static void getContextFeature(long currentMs, Map<String, Double> featureMap) {
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTimeInMillis(currentMs);
+
+        int week = calendar.get(Calendar.DAY_OF_WEEK);
+        int hour = calendar.get(Calendar.HOUR_OF_DAY) + 1;
+        featureMap.put("week", week * 1.0);
+        featureMap.put("hour", hour * 1.0);
+    }
+
+    public static void getUserFeature(Map<String, Map<String, String>> userOriginInfo, Map<String, Double> featMap) {
+        oneTypeStatFeature("c1", "return_1_uv", c1Periods, userOriginInfo.get("mid_global_feature_20250212"), featMap);
+        Map<String, String> c4Map = userOriginInfo.get("mid_u2u_friend_index_feature_20250212");
+        for (String calType : Arrays.asList("avg_", "max_", "min_")) {
+            getRateStatFeature("c4", calType, c4Periods, c4Map, featMap);
+        }
+    }
+
+    public static void getUserProfileFeature(UserShareReturnProfile profile, Map<String, String> userInfo, Map<String, Double> featMap) {
+        if (null != profile) {
+            long s_pv = profile.getS_pv();              // share_pv(分享pv)
+            long s_cnt = profile.getS_cnt();            // share_cnt(分享次数)
+            long r_pv = profile.getR_pv();              // return_pv(回流pv)
+            long r_uv = profile.getR_uv();              // return_uv(回流uv)
+            long m_s_cnt = profile.getM_s_cnt();        // max_share_cnt(最大分享次数)
+            long m_r_uv = profile.getM_r_uv();          // max_return_uv(最大回流uv)
+            if (s_pv > 0) {
+                double s_pv_s = FeatureUtils.log1(s_pv, log1Scale);
+                double s_cnt_s = FeatureUtils.log1(s_cnt, log1Scale);
+                double r_pv_s = FeatureUtils.log1(r_pv, log1Scale);
+                double r_uv_s = FeatureUtils.log1(r_uv, log1Scale);
+                double m_s_cnt_s = FeatureUtils.log1(m_s_cnt, log1Scale);
+                double m_r_uv_s = FeatureUtils.log1(m_r_uv, log1Scale);
+                double ros_one = FeatureUtils.wilsonScore(r_pv, s_pv);
+                double ros = FeatureUtils.plusSmooth(r_uv, s_pv, smoothPlus);
+                double ros_minus = FeatureUtils.plusSmooth(r_uv, r_pv, smoothPlus);
+                featMap.put("c9_s_pv", s_pv_s);
+                featMap.put("c9_s_cnt", s_cnt_s);
+                featMap.put("c9_r_pv", r_pv_s);
+                featMap.put("c9_r_uv", r_uv_s);
+                featMap.put("c9_m_s_cnt", m_s_cnt_s);
+                featMap.put("c9_m_r_uv", m_r_uv_s);
+                featMap.put("c9_ros_one", ros_one);
+                featMap.put("c9_ros", ros);
+                featMap.put("c9_ros_minus", ros_minus);
+            }
+        }
+        if (null != userInfo && !userInfo.isEmpty()) {
+            for (String attr : userAttrList) {
+                if (userInfo.containsKey(attr)) {
+                    String value = userInfo.get(attr).trim().replaceAll("(\\s+|\\t|:)", "_");
+                    if (!value.isEmpty()) {
+                        String key = String.format("%s@%s", attr, value.toLowerCase());
+                        featMap.put(key, 1.0);
+                    }
+                }
+            }
+        }
+    }
+
+    public static void getUserTagsCrossVideoFeature(String prefix, Map<String, String> videoInfo, Map<String, String> infoMap, Map<String, Double> featMap) {
+        if (null == videoInfo || videoInfo.isEmpty() || null == infoMap || infoMap.isEmpty()) {
+            return;
+        }
+        String title = videoInfo.getOrDefault("title", "");
+        if (title.isEmpty()) {
+            return;
+        }
+        for (String period : Arrays.asList("tags_1d", "tags_3d", "tags_7d")) {
+            String tags = infoMap.getOrDefault(period, "");
+            if (!tags.isEmpty()) {
+                Double[] doubles = FeatureUtils.funcC34567ForTagsNew(tags, title);
+                featMap.put(prefix + "_" + period + "_matchnum", doubles[0]);
+                featMap.put(prefix + "_" + period + "_maxscore", doubles[1]);
+                featMap.put(prefix + "_" + period + "_avgscore", doubles[2]);
+            }
+        }
+    }
+
+    public static void getUserCFFeature(String prefix, String vid, Map<String, Map<String, String[]>> infoMap, Map<String, Double> featMap) {
+        if (vid.isEmpty() || null == infoMap || infoMap.isEmpty()) {
+            return;
+        }
+        for (String cfType : cfList) {
+            if (infoMap.containsKey(cfType)) {
+                Map<String, String[]> cfScoresMap = infoMap.get(cfType);
+                if (null != cfScoresMap && cfScoresMap.containsKey(vid)) {
+                    String[] scores = cfScoresMap.get(vid);
+                    Double score1 = Double.parseDouble(scores[0]);
+                    Double score2 = Double.parseDouble(scores[1]);
+                    Double score3 = Double.parseDouble(scores[2]) <= 0 ? 0D : 1.0 / Double.parseDouble(scores[2]);
+                    featMap.put(prefix + "_" + cfType + "_score", score1);
+                    featMap.put(prefix + "_" + cfType + "_num", score2);
+                    featMap.put(prefix + "_" + cfType + "_rank", score3);
+                }
+            }
+        }
+    }
+
+    public static void getVideoFeature(String vid, Map<String, Map<String, Map<String, String>>> videoOriginInfo, Map<String, Double> featMap) {
+        oneTypeStatFeature("b0", b0Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_recsys_feature_video_clean_stat"), featMap);
+        oneTypeStatFeature("b1", "return_1_uv", b1Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_global_feature_20250212"), featMap);
+        oneTypeStatFeature("b2", "return_n_uv", b2Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b3", "return_n_uv", b3Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_recommend_flowpool_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b4", "return_n_uv", b4Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_apptype_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b5", "return_n_uv", b5Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_province_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b6", "return_n_uv", b6Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_brand_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b7", "return_n_uv", b7Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_hotsencetype_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b8", "return_n_uv", b8Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_merge_cate1_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b9", "return_n_uv", b9Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_merge_cate2_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b10", "return_n_uv", b10Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_channel_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b11", "return_n_uv", b11Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_festive_recommend_exp_feature_20250212"), featMap);
+        //getRateStatFeature("b12", "", dayPeriods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_long_period_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b13", "return_n_uv", b13Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_video_unionid_recommend_exp_feature_20250212"), featMap);
+
+        // head video cf
+        headVideoCFD1Feature("d1", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("scene_type_vid_cf_feature_20250212"), featMap);
+        headVideoCFD2Feature("d2", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("vid_click_cf_feature_20250212"), featMap);
+        headVideoCFD3Feature("d3", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_recsys_feature_cf_i2i_v2"), featMap);
+
+        // 特殊mid * cate
+        oneTypeStatFeature("c2", "return_n_uv", c1Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("mid_merge_cate1_feature_20250212"), featMap);
+        oneTypeStatFeature("c3", "return_n_uv", c1Periods, videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("mid_merge_cate2_feature_20250212"), featMap);
+    }
+
+    public static void getVideoBaseFeature(String prefix, long currentMs, Map<String, String> videoInfo, Map<String, Double> featMap) {
+        if (null == videoInfo || videoInfo.isEmpty()) {
+            return;
+        }
+        featMap.put(prefix + "@total_time", Double.parseDouble(videoInfo.getOrDefault("total_time", "0")));
+        featMap.put(prefix + "@bit_rate", Double.parseDouble(videoInfo.getOrDefault("bit_rate", "0")));
+
+        // cate
+        getVideoCateFeature(prefix, videoInfo, featMap);
+        if (videoInfo.containsKey("title")) {
+            int id = FeatureUtils.judgeVideoTimeType(videoInfo.get("title"));
+            if (id > 0) {
+                String key = String.format("%s@%s@%d", prefix, "tt", id);
+                featMap.put(key, 1.0);
+            }
+        }
+
+        // time
+        try {
+            if (videoInfo.containsKey("gmt_create_timestamp")) {
+                String createMsStr = videoInfo.get("gmt_create_timestamp");
+                long createMs = Long.parseLong(createMsStr);
+                double createTime = FeatureUtils.getTimeDiff(currentMs, createMs);
+                featMap.put(prefix + "@ts", 1 - createTime);
+            }
+        } catch (Exception ignored) {
+        }
+    }
+
+    public static void getHeadRankVideoCrossFeature(Map<String, String> headInfo, Map<String, String> rankInfo, Map<String, Double> featMap) {
+        getTwoVideoCrossFeature("hr_sim", videoSimAttrs, headInfo, rankInfo, featMap);
+    }
+
+    public static void getProfileVideoCrossFeature(long currentMs, UserShareReturnProfile profile, Map<String, String> rankVideo, Map<String, Map<String, String>> hVideoMap, Map<String, Double> featMap) {
+        if (null == profile) {
+            return;
+        }
+        getRSCrossFeature("c9_mss", currentMs, seqMaxN, profile.getM_s_s(), rankVideo, hVideoMap, featMap);
+        getRSCrossFeature("c9_mrs", currentMs, seqMaxN, profile.getM_r_s(), rankVideo, hVideoMap, featMap);
+        getRSCrossFeature("c9_lss", currentMs, seqLastN, profile.getL_s_s(), rankVideo, hVideoMap, featMap);
+        getRSCrossFeature("c9_lrs", currentMs, seqLastN, profile.getL_r_s(), rankVideo, hVideoMap, featMap);
+
+        if (null == rankVideo || rankVideo.isEmpty()) {
+            return;
+        }
+        getVideoAttrSRCrossFeature("c9_c1s", rankVideo.getOrDefault("merge_first_level_cate", ""), profile.getC1_s(), featMap);
+        getVideoAttrSRCrossFeature("c9_c2s", rankVideo.getOrDefault("merge_second_level_cate", ""), profile.getC2_s(), featMap);
+        getVideoAttrSRCrossFeature("c9_l1s", rankVideo.getOrDefault("festive_label1", ""), profile.getL1_s(), featMap);
+        getVideoAttrSRCrossFeature("c9_l2s", rankVideo.getOrDefault("festive_label2", ""), profile.getL2_s(), featMap);
+    }
+
+    private static void getRSCrossFeature(String prefix, long currentMs, int maxN, List<UserSRBO> list, Map<String, String> rankVideo, Map<String, Map<String, String>> hVideoMap, Map<String, Double> featMap) {
+        if (null != list && !list.isEmpty()) {
+            for (int i = 0; i < list.size() && i < maxN; i++) {
+                UserSRBO u = list.get(i);
+                if (null != u) {
+                    long id = u.getId();
+                    long cnt = u.getCnt();
+                    long uv = u.getUv();
+                    long ts = u.getTs();
+                    if (id > 0) {
+                        String vid = id + "";
+                        String baseKey = String.format("%s@%d", prefix, i + 1);
+                        if (cnt > 0) {
+                            featMap.put(baseKey + "@cnt", FeatureUtils.log1(cnt, log1Scale));
+                        }
+                        if (uv > 0) {
+                            featMap.put(baseKey + "@uv", FeatureUtils.log1(uv, log1Scale));
+                        }
+                        if (ts > 0) {
+                            featMap.put(baseKey + "@ts", 1 - FeatureUtils.getTimeDiff(currentMs, ts * 1000));
+                        }
+                        if (null != hVideoMap && hVideoMap.containsKey(vid)) {
+                            Map<String, String> hVideo = hVideoMap.get(vid);
+                            getVideoCateFeature(baseKey, hVideo, featMap);
+                            getTwoVideoCrossFeature(baseKey, hVideoSimAttrs, hVideo, rankVideo, featMap);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private static void getVideoAttrSRCrossFeature(String prefix, String attr, Map<String, VideoAttrSRBO> attrMap, Map<String, Double> featMap) {
+        if (null == attrMap || attrMap.isEmpty()) {
+            return;
+        }
+        attr = attr.trim();
+        if (attrMap.containsKey(attr)) {
+            VideoAttrSRBO bo = attrMap.get(attr);
+            if (null != bo) {
+                long sp = bo.getSp();    // share_pv
+                long rp = bo.getRp();    // return_n_pv_noself
+                long ru = bo.getRu();    // return_n_uv_noself
+                long mu = bo.getMu();    // max_return_uv
+                if (sp > 0) {
+                    double sp_s = FeatureUtils.log1(sp, log1Scale);
+                    double rp_s = FeatureUtils.log1(rp, log1Scale);
+                    double ru_s = FeatureUtils.log1(ru, log1Scale);
+                    double mu_s = FeatureUtils.log1(mu, log1Scale);
+
+                    double ros_one = FeatureUtils.wilsonScore(rp, sp);
+                    double ros = FeatureUtils.plusSmooth(ru, sp, smoothPlus);
+                    double ros_minus = FeatureUtils.plusSmooth(ru, rp, smoothPlus);
+
+                    featMap.put(prefix + "@sp", sp_s);
+                    featMap.put(prefix + "@rp", rp_s);
+                    featMap.put(prefix + "@ru", ru_s);
+                    featMap.put(prefix + "@mu", mu_s);
+                    featMap.put(prefix + "@ros_one", ros_one);
+                    featMap.put(prefix + "@ros", ros);
+                    featMap.put(prefix + "@ros_minus", ros_minus);
+                }
+            }
+        }
+    }
+
+    private static void getVideoCateFeature(String prefix, Map<String, String> videoInfo, Map<String, Double> featMap) {
+        if (null == videoInfo || videoInfo.isEmpty()) {
+            return;
+        }
+        for (String attr : videoCateAttrs) {
+            String attrVal = videoInfo.getOrDefault(attr, "");
+            attrVal = attrVal.trim();
+            int attrId = FeatureUtils.getAttrId(attr, attrVal);
+            if (attrId > 0) {
+                String key = String.format("%s@%s@%d", prefix, attr, attrId);
+                featMap.put(key, 1.0);
+            }
+        }
+        if (videoInfo.containsKey("keywords")) {
+            String keywords = videoInfo.get("keywords");
+            if (null != keywords && !keywords.isEmpty()) {
+                for (String kw : keywords.split(",")) {
+                    if (!kw.isEmpty()) {
+                        String featKey = String.format("%s@kw@%s", prefix, kw);
+                        featMap.put(featKey, 1.0);
+                    }
+                }
+            }
+        }
+    }
+
+    private static void getTwoVideoCrossFeature(String prefix, List<String> attrs, Map<String, String> video1, Map<String, String> video2, Map<String, Double> featMap) {
+        if (null == video1 || video1.isEmpty() || null == video2 || video2.isEmpty()) {
+            return;
+        }
+        for (String attr : attrs) {
+            String attr1 = video1.getOrDefault(attr, "");
+            String attr2 = video2.getOrDefault(attr, "");
+            if (!"".equals(attr1) && !"unknown".equals(attr1) && !"".equals(attr2) && !"unknown".equals(attr2)) {
+                double simScore = SimilarityUtils.word2VecSimilarity(attr1, attr2);
+                featMap.put(prefix + "_" + attr, simScore);
+            }
+        }
+    }
+
+    private static void headVideoCFD1Feature(String prefix, Map<String, String> infoMap, Map<String, Double> featMap) {
+        double ros_cf_score = getOneInfo("ros_cf_score", infoMap);
+        double ros_cf_rank = getOneInfo("ros_cf_rank", infoMap);
+        double rov_cf_score = getOneInfo("rov_cf_score", infoMap);
+        double rov_cf_rank = getOneInfo("rov_cf_rank", infoMap);
+        featMap.put(prefix + "_ros_cf_score", ros_cf_score);
+        featMap.put(prefix + "_ros_cf_rank", ros_cf_rank);
+        featMap.put(prefix + "_rov_cf_score", rov_cf_score);
+        featMap.put(prefix + "_rov_cf_rank", rov_cf_rank);
+    }
+
+    private static void headVideoCFD2Feature(String prefix, Map<String, String> infoMap, Map<String, Double> featMap) {
+        double score = getOneInfo("score", infoMap);
+        double rank = getOneInfo("rank", infoMap);
+        double onlines = getOneInfo("onlines", infoMap);
+        featMap.put(prefix + "_score", score);
+        featMap.put(prefix + "_rank", rank);
+        featMap.put(prefix + "_onlines", onlines);
+    }
+
+    private static void headVideoCFD3Feature(String prefix, Map<String, String> infoMap, Map<String, Double> featMap) {
+        double exp = getOneInfo("exp", infoMap);
+        double return_n = getOneInfo("return_n", infoMap);
+        double rovn = getOneInfo("rovn", infoMap);
+        featMap.put(prefix + "_exp", FeatureUtils.log1(exp, log1Scale));
+        featMap.put(prefix + "_return_n", FeatureUtils.log1(return_n, log1Scale));
+        featMap.put(prefix + "_rovn", rovn);
+    }
+
+    public static Map<String, Map<String, String[]>> parseUCFScore(Map<String, String> mapInfo) {
+        Map<String, Map<String, String[]>> allScoresMap = new HashMap<>();
+        for (String cfType : cfList) {
+            String data = mapInfo.getOrDefault(cfType, "");
+            if (!data.isEmpty()) {
+                Map<String, String[]> oneScoresMap = new HashMap<>();
+                String[] entries = data.split(",");
+                for (String entry : entries) {
+                    String[] rList = entry.split(":");
+                    if (rList.length >= 4) { // 确保分割后有四个元素
+                        String key = rList[0];
+                        String value1 = rList[1];
+                        String value2 = rList[2];
+                        String value3 = rList[3];
+                        String[] strs = {value1, value2, value3};
+                        oneScoresMap.put(key, strs);
+                    }
+                }
+                if (!oneScoresMap.isEmpty()) {
+                    allScoresMap.put(cfType, oneScoresMap);
+                }
+            }
+        }
+        return allScoresMap;
+    }
+
+    private static void getRateStatFeature(String prefix, String calType, List<String> periods, Map<String, String> infoMap, Map<String, Double> featMap) {
+        if (null == infoMap || infoMap.isEmpty()) {
+            return;
+        }
+        for (String period : periods) {
+            double str_one = getOneInfo(calType + "str_one_" + period, infoMap);
+            double ros_one = getOneInfo(calType + "ros_one_" + period, infoMap);
+            double str = getOneInfo(calType + "str_" + period, infoMap);
+            double ros = getOneInfo(calType + "ros_" + period, infoMap);
+            double str_plus = getOneInfo(calType + "str_plus_" + period, infoMap);
+            double ros_minus = getOneInfo(calType + "ros_minus_" + period, infoMap);
+            double rovn = getOneInfo(calType + "rovn_" + period, infoMap);
+
+            featMap.put(prefix + "_" + period + "_" + calType + "str_one", str_one);
+            featMap.put(prefix + "_" + period + "_" + calType + "ros_one", ros_one);
+            featMap.put(prefix + "_" + period + "_" + calType + "str", str);
+            featMap.put(prefix + "_" + period + "_" + calType + "ros", ros);
+            featMap.put(prefix + "_" + period + "_" + calType + "str_plus", str_plus);
+            featMap.put(prefix + "_" + period + "_" + calType + "ros_minus", ros_minus);
+            featMap.put(prefix + "_" + period + "_" + calType + "rovn", rovn);
+        }
+    }
+
+    private static void oneTypeStatFeature(String prefix, String uvPrefix, List<String> periods, Map<String, String> infoMap, Map<String, Double> featMap) {
+        if (null == infoMap || infoMap.isEmpty()) {
+            return;
+        }
+        for (String period : periods) {
+            double exp = getOneInfo("exp_" + period, infoMap);
+            if (!FeatureUtils.greaterThanZero(exp)) {
+                continue;
+            }
+            double is_share = getOneInfo("is_share_" + period, infoMap);
+            double share_cnt = getOneInfo("share_cnt_" + period, infoMap);
+            double is_return_1 = getOneInfo("is_return_1_" + period, infoMap);
+            double return_n_uv = getOneInfo(uvPrefix + "_" + period, infoMap);
+
+            double exp_s = FeatureUtils.log1(exp, log1Scale);
+            double is_share_s = FeatureUtils.log1(is_share, log1Scale);
+            double share_cnt_s = FeatureUtils.log1(share_cnt, log1Scale);
+            double is_return_1_s = FeatureUtils.log1(is_return_1, log1Scale);
+            double return_n_uv_s = FeatureUtils.log1(return_n_uv, log1Scale);
+
+            double str = FeatureUtils.wilsonScore(is_share, exp);
+            double str_plus = FeatureUtils.wilsonScore(is_return_1, exp);
+            double ros_one = FeatureUtils.wilsonScore(is_return_1, is_share);
+
+            double rovn = FeatureUtils.plusSmooth(return_n_uv, exp, smoothPlus);
+            double ros = FeatureUtils.plusSmooth(return_n_uv, is_share, smoothPlus);
+            double ros_n = FeatureUtils.plusSmooth(return_n_uv, share_cnt, smoothPlus);
+            double ros_minus = FeatureUtils.plusSmooth(return_n_uv, is_return_1, smoothPlus);
+
+            featMap.put(prefix + "_" + period + "_" + "exp", exp_s);
+            featMap.put(prefix + "_" + period + "_" + "is_share", is_share_s);
+            featMap.put(prefix + "_" + period + "_" + "share_cnt", share_cnt_s);
+            featMap.put(prefix + "_" + period + "_" + "is_return_1", is_return_1_s);
+            featMap.put(prefix + "_" + period + "_" + "return_n_uv", return_n_uv_s);
+            featMap.put(prefix + "_" + period + "_" + "str", str);
+            featMap.put(prefix + "_" + period + "_" + "str_plus", str_plus);
+            featMap.put(prefix + "_" + period + "_" + "ros_one", ros_one);
+            featMap.put(prefix + "_" + period + "_" + "rovn", rovn);
+            featMap.put(prefix + "_" + period + "_" + "ros", ros);
+            featMap.put(prefix + "_" + period + "_" + "ros_n", ros_n);
+            featMap.put(prefix + "_" + period + "_" + "ros_minus", ros_minus);
+        }
+    }
+
+    private static void oneTypeStatFeature(String prefix, List<String> periods, Map<String, String> infoMap, Map<String, Double> featMap) {
+        if (null == infoMap || infoMap.isEmpty()) {
+            return;
+        }
+        for (String period : periods) {
+            double exp = getOneInfo("exp_" + period, infoMap);
+            if (!FeatureUtils.greaterThanZero(exp)) {
+                continue;
+            }
+            double is_share = getOneInfo("is_share_" + period, infoMap);
+            double share_cnt = getOneInfo("share_cnt_" + period, infoMap);
+            double is_return_1 = getOneInfo("is_return_1_" + period, infoMap);
+            double return_1_uv = getOneInfo("return_1_uv_" + period, infoMap);
+            double return_n_uv = getOneInfo("return_n_uv_" + period, infoMap);
+
+            double exp_s = FeatureUtils.log1(exp, log1Scale);
+            double is_share_s = FeatureUtils.log1(is_share, log1Scale);
+            double share_cnt_s = FeatureUtils.log1(share_cnt, log1Scale);
+            double is_return_1_s = FeatureUtils.log1(is_return_1, log1Scale);
+            double return_1_uv_s = FeatureUtils.log1(return_1_uv, log1Scale);
+            double return_n_uv_s = FeatureUtils.log1(return_n_uv, log1Scale);
+
+            double str = FeatureUtils.wilsonScore(is_share, exp);
+            double str_plus = FeatureUtils.wilsonScore(is_return_1, exp);
+            double ros_one = FeatureUtils.wilsonScore(is_return_1, is_share);
+
+            double rovn1 = FeatureUtils.plusSmooth(return_1_uv, exp, smoothPlus);
+            double ros1 = FeatureUtils.plusSmooth(return_1_uv, is_share, smoothPlus);
+            double ros_n1 = FeatureUtils.plusSmooth(return_1_uv, share_cnt, smoothPlus);
+            double ros_minus1 = FeatureUtils.plusSmooth(return_1_uv, is_return_1, smoothPlus);
+
+            double rovn = FeatureUtils.plusSmooth(return_n_uv, exp, smoothPlus);
+            double ros = FeatureUtils.plusSmooth(return_n_uv, is_share, smoothPlus);
+            double ros_n = FeatureUtils.plusSmooth(return_n_uv, share_cnt, smoothPlus);
+            double ros_minus = FeatureUtils.plusSmooth(return_n_uv, is_return_1, smoothPlus);
+
+            featMap.put(prefix + "_" + period + "_" + "exp", exp_s);
+            featMap.put(prefix + "_" + period + "_" + "is_share", is_share_s);
+            featMap.put(prefix + "_" + period + "_" + "share_cnt", share_cnt_s);
+            featMap.put(prefix + "_" + period + "_" + "is_return_1", is_return_1_s);
+            featMap.put(prefix + "_" + period + "_" + "return_1_uv", return_1_uv_s);
+            featMap.put(prefix + "_" + period + "_" + "return_n_uv", return_n_uv_s);
+            featMap.put(prefix + "_" + period + "_" + "str", str);
+            featMap.put(prefix + "_" + period + "_" + "str_plus", str_plus);
+            featMap.put(prefix + "_" + period + "_" + "ros_one", ros_one);
+            featMap.put(prefix + "_" + period + "_" + "rovn1", rovn1);
+            featMap.put(prefix + "_" + period + "_" + "ros1", ros1);
+            featMap.put(prefix + "_" + period + "_" + "ros_n1", ros_n1);
+            featMap.put(prefix + "_" + period + "_" + "ros_minus1", ros_minus1);
+            featMap.put(prefix + "_" + period + "_" + "rovn", rovn);
+            featMap.put(prefix + "_" + period + "_" + "ros", ros);
+            featMap.put(prefix + "_" + period + "_" + "ros_n", ros_n);
+            featMap.put(prefix + "_" + period + "_" + "ros_minus", ros_minus);
+        }
+    }
+
+    private static double getOneInfo(String name, Map<String, String> map) {
+        if (null == map) {
+            return 0.0;
+        }
+        return map.isEmpty() ? 0 : Double.parseDouble(map.getOrDefault(name, "0.0"));
+    }
+}

+ 7 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/FeatureUtils.java

@@ -123,6 +123,13 @@ public class FeatureUtils {
         return Math.log(data + 1.0);
     }
 
+    public static double log1(double data, double scale) {
+        if (data <= 0) {
+            return 0D;
+        }
+        return Math.log(data + 1.0) / scale;
+    }
+
     public static double plusSmooth(double a, double b, double plus) {
         if (a == 0 || b == 0) {
             return 0D;