Kaynağa Gözat

new feature

jch 2 ay önce
ebeveyn
işleme
faca8fa75a

+ 1 - 1
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_81_originData_20250217.scala

@@ -306,7 +306,7 @@ object makedata_recsys_81_originData_20250217 {
             if (null != ts && ts.nonEmpty && rankVideo.containsKey("gmt_create_timestamp")) {
               val currentMs = ts.toLong * 1000L
               val createMs = rankVideo.getString("gmt_create_timestamp").toLong
-              createTime = FeatureUtils.getCreateTime(currentMs, createMs)
+              createTime = FeatureUtils.getTimeDiff(currentMs, createMs)
             }
             featureMap.put("createTime", createTime)
 

+ 131 - 0
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_82_originData_20250221.scala

@@ -0,0 +1,131 @@
+package com.aliyun.odps.spark.examples.makedata_recsys_r_rate
+
+import com.aliyun.odps.data.Record
+import com.aliyun.odps.spark.examples.myUtils._
+import examples.utils.SimilarityUtils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+
+/*
+   20250221 提取特征
+ */
+
+object makedata_recsys_82_originData_20250221 {
+  private val labelNames = List(
+    "is_share", "share_cnt",
+    "is_return_1", "return_1_uv",
+    "is_return_n", "return_n_uv",
+    "is_return_noself", "return_1_uv_noself",
+    "is_return_n_noself", "return_n_uv_noself"
+  )
+
+  private def parseVideoRdd(videoRdd: RDD[Record]): RDD[(String, java.util.Map[String, String])] = {
+    videoRdd
+      .map(record => {
+        val vid = record.getString("vid")
+        val feature = ConvertUtils.getRecordCol(record, "feature")
+        feature.put("vid", vid)
+        (vid, feature)
+      })
+      .reduceByKey((a, b) => if (a.size() > b.size()) a else b)
+  }
+
+  private def joinVideoMap(logRdd: RDD[Record], videoRdd: RDD[(String, java.util.Map[String, String])]): RDD[(Record, List[java.util.Map[String, String]])] = {
+    val midLogRdd = logRdd
+      .map(record => {
+        val mid = record.getString("mid")
+        (mid, record)
+      })
+
+    val midSeqRdd = midLogRdd
+      .filter(_._1.nonEmpty)
+      .flatMap(raw => {
+        val result = new ArrayBuffer[(String, String)]
+        for (hVid <- ConvertUtils.getVidList(raw._2.getString("c9_feature"))) {
+          result += (hVid, raw._1) // (vid, mid)
+        }
+        result
+      })
+      .join(videoRdd) // (vid, (mid, map))
+      .map(raw => {
+        (raw._2._1, raw._2._2)
+      })
+      .groupByKey()
+      .map(raw => {
+        (raw._1, raw._2.toList)
+      })
+
+    midLogRdd.leftOuterJoin(midSeqRdd)
+      .map(raw => {
+        (raw._2._1, raw._2._2.orNull)
+      })
+  }
+
+  private def getFeature(rdd: RDD[(Record, List[java.util.Map[String, String]])]): RDD[String] = {
+    rdd.mapPartitions(partition => {
+      SimilarityUtils.init()
+      partition.map(raw => {
+        val record = raw._1
+        val videoSeq = raw._2
+        val logKey = DataUtils.getLogKey(record)
+        val labels = DataUtils.getLabels(labelNames, record).toString
+        val features = ConvertUtils.getFeature(record, videoSeq, 6).toString
+        val scoresMap = DataUtils.getSubJson(record, "extend_alg", "scoresMap").toString
+        logKey + "\t" + labels + "\t" + scoresMap + "\t" + features
+      })
+    })
+  }
+
+  def main(args: Array[String]): Unit = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getName)
+      .getOrCreate()
+    val sc = spark.sparkContext
+
+    // 1. 解析参数
+    val param = ParamUtils.parseArgs(args)
+    val project = param.getOrElse("project", "loghubods")
+    val table = param.getOrElse("table", "dwd_recsys_alg_sample_all_20250212")
+    val tablePart = param.getOrElse("tablePart", "64").toInt
+    val beginStr = param.getOrElse("beginStr", "2025022317")
+    val endStr = param.getOrElse("endStr", "2025022317")
+    val whatLabel = param.getOrElse("whatLabel", "is_return_n_noself")
+    val fuSampleRate = param.getOrElse("fuSampleRate", "0.1").toDouble
+    val savePath = param.getOrElse("savePath", "/dw/recommend/model/82_origin_data/")
+    val repartition = param.getOrElse("repartition", "32").toInt
+
+    // 2. 数据处理
+    val timeRange = MyDateUtils.getDateHourRange(beginStr, endStr)
+    for (dt_hh <- timeRange) {
+      // a. 分区
+      val dt = dt_hh.substring(0, 8)
+      val hh = dt_hh.substring(8, 10)
+      val partition = "dt=%s,hh=%s".format(dt, hh)
+      println("开始执行partition:" + partition)
+
+      // b. 加载视频信息
+      val originVideo = DataUtils.getODPSData(sc, project, "alg_vid_feature_basic_info", partition, tablePart)
+      val uniqVideo = parseVideoRdd(originVideo)
+
+      // c. 加载样本数据
+      val odpsData = DataUtils.getODPSData(sc, project, table, partition, tablePart)
+
+      // d. 样本重采样
+      val resampleData = DataUtils.resample(whatLabel, fuSampleRate, odpsData)
+
+      // e. 历史行为关联video
+      val seqSampleData = joinVideoMap(resampleData, uniqVideo)
+
+      // d. 特征转换
+      val featureData = getFeature(seqSampleData)
+
+      // f. 保存数据
+      val hdfsPath = "%s/%s%s".format(savePath, dt, hh)
+      DataUtils.saveData(featureData, hdfsPath, repartition)
+    }
+  }
+}

+ 168 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/ConvertUtils.java

@@ -0,0 +1,168 @@
+package com.aliyun.odps.spark.examples.myUtils;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.data.Record;
+import com.google.gson.Gson;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.*;
+
+public class ConvertUtils {
+    public static JSONObject getFeature(Record record, List<Map<String, String>> videoSeq, int scale) {
+        Map<String, Double> featMap = new HashMap<>();
+
+        // origin info
+        String ts = record.getString("ts");
+        long currentMs = Long.parseLong(ts) * 1000;
+        String vid = record.getString("vid");
+        Map<String, String> headInfo = getRecordCol(record, "v2_feature");
+        Map<String, String> rankInfo = getRecordCol(record, "v1_feature");
+        Map<String, Map<String, String>> userOriginInfo = getUserOriginInfo(record);
+        Map<String, Map<String, Map<String, String>>> videoOriginInfo = getVideoOriginInfo(record);
+
+        // parse info
+        Map<String, String> c9Map = userOriginInfo.get("alg_recsys_feature_user_share_return_stat");
+        String c9Str = new Gson().toJson(c9Map);
+        UserShareReturnProfile userProfile = JSON.parseObject(c9Str, UserShareReturnProfile.class);
+        Map<String, Map<String, String>> historyVideoMap = list2Map(videoSeq);
+
+        List<String> cfList = Arrays.asList("share", "return");
+        Map<String, Map<String, String[]>> c7Map = FeatureTransform.parseUCFScore(cfList, userOriginInfo.get("alg_mid_feature_sharecf"));
+        Map<String, Map<String, String[]>> c8Map = FeatureTransform.parseUCFScore(cfList, userOriginInfo.get("alg_mid_feature_returncf"));
+
+        // context feature
+        FeatureTransform.getContextFeature(currentMs, featMap);
+
+        // head video feature
+        FeatureTransform.getVideoBaseFeature("h", currentMs, headInfo, featMap);
+
+        // user feature
+        FeatureTransform.getUserFeature(userOriginInfo, featMap);
+        FeatureTransform.getUserProfileFeature(userProfile, featMap);
+
+        // user & video feature
+        FeatureTransform.getUserTagsCrossVideoFeature("c5", rankInfo, userOriginInfo.get("alg_mid_feature_return_tags"), featMap);
+        FeatureTransform.getUserTagsCrossVideoFeature("c6", rankInfo, userOriginInfo.get("alg_mid_feature_share_tags"), featMap);
+        FeatureTransform.getUserCFFeature("c7", vid, cfList, c7Map, featMap);
+        FeatureTransform.getUserCFFeature("c8", vid, cfList, c8Map, featMap);
+
+        // rank video feature
+        FeatureTransform.getVideoBaseFeature("r", currentMs, rankInfo, featMap);
+        FeatureTransform.getVideoFeature(vid, videoOriginInfo, featMap);
+
+        // head&rank cross feature
+        FeatureTransform.getHeadRankVideoCrossFeature(headInfo, rankInfo, featMap);
+
+        // user profile & rank cross
+        FeatureTransform.getProfileVideoCrossFeature(currentMs, userProfile, rankInfo, historyVideoMap, featMap);
+
+        return filterAndTruncate(featMap, scale);
+    }
+
+    private static Map<String, Map<String, String>> getUserOriginInfo(Record record) {
+        Map<String, Map<String, String>> map = new HashMap<>();
+        map.put("mid_global_feature_20250212", getRecordCol(record, "c1_feature"));
+        map.put("mid_merge_cate1_feature_20250212", getRecordCol(record, "c2_feature"));
+        map.put("mid_merge_cate2_feature_20250212", getRecordCol(record, "c3_feature"));
+        map.put("mid_u2u_friend_index_feature_20250212", getRecordCol(record, "c4_feature"));
+        map.put("alg_mid_feature_return_tags", getRecordCol(record, "c5_feature"));
+        map.put("alg_mid_feature_share_tags", getRecordCol(record, "c6_feature"));
+        map.put("alg_mid_feature_sharecf", getRecordCol(record, "c7_feature"));
+        map.put("alg_mid_feature_returncf", getRecordCol(record, "c8_feature"));
+        map.put("alg_recsys_feature_user_share_return_stat", getRecordCol(record, "c9_feature"));
+        return map;
+    }
+
+    private static Map<String, Map<String, Map<String, String>>> getVideoOriginInfo(Record record) {
+        Map<String, Map<String, String>> map = new HashMap<>();
+        map.put("alg_vid_global_feature_20250212", getRecordCol(record, "b1_feature"));
+        map.put("alg_vid_recommend_exp_feature_20250212", getRecordCol(record, "b2_feature"));
+        map.put("alg_vid_recommend_flowpool_exp_feature_20250212", getRecordCol(record, "b3_feature"));
+        map.put("alg_vid_apptype_recommend_exp_feature_20250212", getRecordCol(record, "b4_feature"));
+        map.put("alg_vid_province_recommend_exp_feature_20250212", getRecordCol(record, "b5_feature"));
+        map.put("alg_vid_brand_recommend_exp_feature_20250212", getRecordCol(record, "b6_feature"));
+        map.put("alg_vid_hotsencetype_recommend_exp_feature_20250212", getRecordCol(record, "b7_feature"));
+        map.put("alg_merge_cate1_recommend_exp_feature_20250212", getRecordCol(record, "b8_feature"));
+        map.put("alg_merge_cate2_recommend_exp_feature_20250212", getRecordCol(record, "b9_feature"));
+        map.put("alg_channel_recommend_exp_feature_20250212", getRecordCol(record, "b10_feature"));
+        map.put("alg_festive_recommend_exp_feature_20250212", getRecordCol(record, "b11_feature"));
+        map.put("alg_vid_long_period_recommend_exp_feature_20250212", getRecordCol(record, "b12_feature"));
+        map.put("alg_video_unionid_recommend_exp_feature_20250212", getRecordCol(record, "b13_feature"));
+        map.put("scene_type_vid_cf_feature_20250212", getRecordCol(record, "d1_feature"));
+        map.put("vid_click_cf_feature_20250212", getRecordCol(record, "d2_feature"));
+        map.put("alg_recsys_feature_cf_i2i_v2", getRecordCol(record, "d3_feature"));
+
+        Map<String, Map<String, Map<String, String>>> allMap = new HashMap<>();
+        String vid = record.getString("vid");
+        allMap.put(vid, map);
+        return allMap;
+    }
+
+    public static Set<String> getVidList(String data) {
+        Set<String> vidList = new HashSet<>();
+        if (null != data && !data.isEmpty()) {
+            UserShareReturnProfile user = JSON.parseObject(data, UserShareReturnProfile.class);
+            if (null != user) {
+                for (List<UserSRBO> list : Arrays.asList(user.getM_s_s(), user.getM_r_s(), user.getL_s_s(), user.getL_r_s())) {
+                    if (null != list) {
+                        for (UserSRBO u : list) {
+                            if (null != u) {
+                                vidList.add(u.getId() + "");
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return vidList;
+    }
+
+    public static Map<String, String> getRecordCol(Record record, String col) {
+        Map<String, String> colMap = new HashMap<>();
+        if (null != record && !record.isNull(col)) {
+            JSONObject json = JSON.parseObject(record.getString(col));
+            if (null != json) {
+                for (Map.Entry<String, Object> entry : json.entrySet()) {
+                    Object obj = entry.getValue();
+                    if (null != obj) {
+                        colMap.put(entry.getKey(), obj.toString());
+                    }
+                }
+            }
+        }
+        return colMap;
+    }
+
+    private static Map<String, Map<String, String>> list2Map(List<Map<String, String>> videSeq) {
+        Map<String, Map<String, String>> map = new HashMap<>();
+        if (null != videSeq && !videSeq.isEmpty()) {
+            for (Map<String, String> video : videSeq) {
+                if (null != video && !video.isEmpty()) {
+                    String vid = video.get("vid");
+                    if (null != vid && !vid.isEmpty()) {
+                        map.put(video.get("vid"), video);
+                    }
+                }
+            }
+        }
+        return map;
+    }
+
+    private static JSONObject filterAndTruncate(Map<String, Double> featMap, int scale) {
+        JSONObject jsonObject = new JSONObject();
+        for (Map.Entry<String, Double> entry : featMap.entrySet()) {
+            double value = entry.getValue();
+            if (value > 1E-8) {
+                if (value == Math.floor(value)) {
+                    jsonObject.put(entry.getKey(), entry.getValue());
+                } else {
+                    double newValue = new BigDecimal(value).setScale(scale, RoundingMode.HALF_UP).doubleValue();
+                    jsonObject.put(entry.getKey(), newValue);
+                }
+            }
+        }
+        return jsonObject;
+    }
+}

+ 86 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/DataUtils.scala

@@ -0,0 +1,86 @@
+package com.aliyun.odps.spark.examples.myUtils
+
+import com.alibaba.fastjson.{JSON, JSONObject}
+import com.aliyun.odps.TableSchema
+import com.aliyun.odps.data.Record
+import org.apache.hadoop.io.compress.GzipCodec
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.util.Random
+
+object DataUtils {
+  def getODPSData(sc: SparkContext, project: String, table: String, partition: String, numPartition: Int): RDD[Record] = {
+    val odpsOps = env.getODPS(sc)
+    odpsOps.readTable(project = project,
+      table = table,
+      partition = partition,
+      transfer = func,
+      numPartition = numPartition)
+  }
+
+  def resample(name: String, negRate: Double, rdd: RDD[Record]): RDD[Record] = {
+    rdd.filter(record => {
+      val label = getStringValue(record, name, "0").toDouble
+      label > 0 || new Random().nextDouble() <= negRate
+    })
+  }
+
+  def getLogKey(record: Record): String = {
+    val apptype = record.getString("apptype")
+    val page = getStringValue(record, "page")
+    val pagesource = getStringValue(record, "pagesource")
+    val recommendpagetype = getStringValue(record, "recommendpagetype")
+    val flowpool = getStringValue(record, "flowpool")
+    val abcode = record.getString("abcode")
+    val mid = record.getString("mid")
+    val vid = getStringValue(record, "vid")
+    val level = getStringValue(record, "level", "0")
+    val ts = record.getString("ts")
+    (apptype, page, pagesource, recommendpagetype, flowpool, abcode, mid, vid, level, ts).productIterator.mkString(",")
+  }
+
+  def getLabels(names: List[String], record: Record): JSONObject = {
+    val labels = new JSONObject
+    for (name <- names) {
+      if (!record.isNull(name)) {
+        labels.put(name, record.getString(name))
+      }
+    }
+    labels
+  }
+
+  def saveData(rdd: RDD[String], hdfsPath: String, repartition: Int = 64): Unit = {
+    if (hdfsPath.nonEmpty && hdfsPath.startsWith("/dw/recommend/model/")) {
+      println("删除路径并开始数据写入:" + hdfsPath)
+      MyHdfsUtils.delete_hdfs_path(hdfsPath)
+      rdd.repartition(repartition).saveAsTextFile(hdfsPath, classOf[GzipCodec])
+    } else {
+      println("路径不合法,无法写入:" + hdfsPath)
+    }
+  }
+
+  def func(record: Record, schema: TableSchema): Record = {
+    record
+  }
+
+  def getStringValue(record: Record, key: String, default: String = ""): String = {
+    if (!record.isNull(key)) {
+      val value = record.getString(key)
+      return value.trim
+    }
+    default
+  }
+
+  def getSubJson(record: Record, key1: String, key2: String): JSONObject = {
+    if (!record.isNull(key1)) {
+      val obj = JSON.parseObject(record.getString(key1))
+      if (obj.nonEmpty && obj.containsKey(key2)) {
+        val data = obj.getString(key2)
+        return JSON.parseObject(data.replace("\\", ""))
+      }
+    }
+    new JSONObject()
+  }
+}

+ 389 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/FeatureTransform.java

@@ -0,0 +1,389 @@
+package com.aliyun.odps.spark.examples.myUtils;
+
+import examples.utils.SimilarityUtils;
+
+import java.util.*;
+
+public class FeatureTransform {
+    private static final double smoothPlus = 5.0;
+    private static final List<String> periods = Arrays.asList("1h", "3h", "6h", "12h", "24h", "72h", "168h");
+    private static final List<String> videoCateAttrs = Arrays.asList(FeatureUtils.cate1Attr, FeatureUtils.cate2Attr, FeatureUtils.festive1Attr);
+    private static final List<String> videoSimAttrs = Arrays.asList("title", "cate2", "cate2_list", "keywords", "cate1_list", "topic");
+    private static final List<String> hVideoSimAttrs = Arrays.asList("title");
+
+    public static void getContextFeature(long currentMs, Map<String, Double> featureMap) {
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTimeInMillis(currentMs);
+
+        int week = calendar.get(Calendar.DAY_OF_WEEK);
+        int hour = calendar.get(Calendar.HOUR_OF_DAY) + 1;
+        featureMap.put("week", week * 1.0);
+        featureMap.put("hour", hour * 1.0);
+    }
+
+    public static void getUserFeature(Map<String, Map<String, String>> userOriginInfo, Map<String, Double> featMap) {
+        oneTypeStatFeature("c1", "return_1_uv", userOriginInfo.get("mid_global_feature_20250212"), featMap);
+        oneTypeStatFeature("c2", "return_n_uv", userOriginInfo.get("mid_merge_cate1_feature_20250212"), featMap);
+        oneTypeStatFeature("c3", "return_n_uv", userOriginInfo.get("mid_merge_cate2_feature_20250212"), featMap);
+        u2uFeature("c4", userOriginInfo.get("mid_u2u_friend_index_feature_20250212"), featMap);
+    }
+
+    public static void getUserProfileFeature(UserShareReturnProfile profile, Map<String, Double> featMap) {
+        if (null != profile) {
+            long s_pv = profile.getS_pv();              // share_pv(分享pv)
+            long s_cnt = profile.getS_cnt();            // share_cnt(分享次数)
+            long r_pv = profile.getR_pv();              // return_pv(回流pv)
+            long r_uv = profile.getR_uv();              // return_uv(回流uv)
+            long m_s_cnt = profile.getM_s_cnt();        // max_share_cnt(最大分享次数)
+            long m_r_uv = profile.getM_r_uv();          // max_return_uv(最大回流uv)
+            if (s_pv > 0) {
+                double s_pv_s = FeatureUtils.log1(s_pv);
+                double s_cnt_s = FeatureUtils.log1(s_cnt);
+                double r_pv_s = FeatureUtils.log1(r_pv);
+                double r_uv_s = FeatureUtils.log1(r_uv);
+                double m_s_cnt_s = FeatureUtils.log1(m_s_cnt);
+                double m_r_uv_s = FeatureUtils.log1(m_r_uv);
+                double ros_one = FeatureUtils.wilsonScore(r_pv, s_pv);
+                double ros = FeatureUtils.plusSmooth(r_uv, s_pv, smoothPlus);
+                double ros_minus = FeatureUtils.plusSmooth(r_uv, r_pv, smoothPlus);
+                featMap.put("up@s_pv", s_pv_s);
+                featMap.put("up@s_cnt", s_cnt_s);
+                featMap.put("up@r_pv", r_pv_s);
+                featMap.put("up@r_uv", r_uv_s);
+                featMap.put("up@m_s_cnt", m_s_cnt_s);
+                featMap.put("up@m_r_uv", m_r_uv_s);
+                featMap.put("up@ros_one", ros_one);
+                featMap.put("up@ros", ros);
+                featMap.put("up@ros_minus", ros_minus);
+            }
+        }
+    }
+
+    public static void getUserTagsCrossVideoFeature(String prefix, Map<String, String> videoInfo, Map<String, String> infoMap, Map<String, Double> featMap) {
+        if (null == videoInfo || videoInfo.isEmpty() || null == infoMap || infoMap.isEmpty()) {
+            return;
+        }
+        String title = videoInfo.getOrDefault("title", "");
+        if (title.isEmpty()) {
+            return;
+        }
+        for (String period : Arrays.asList("tags_1d", "tags_3d", "tags_7d")) {
+            String tags = infoMap.getOrDefault(period, "");
+            if (!tags.isEmpty()) {
+                Double[] doubles = FeatureUtils.funcC34567ForTagsNew(tags, title);
+                featMap.put(prefix + "_" + period + "_matchnum", doubles[0]);
+                featMap.put(prefix + "_" + period + "_maxscore", doubles[1]);
+                featMap.put(prefix + "_" + period + "_avgscore", doubles[2]);
+            }
+        }
+    }
+
+    public static void getUserCFFeature(String prefix, String vid, List<String> cfList, Map<String, Map<String, String[]>> infoMap, Map<String, Double> featMap) {
+        if (vid.isEmpty() || null == infoMap || infoMap.isEmpty()) {
+            return;
+        }
+        for (String cfType : cfList) {
+            if (infoMap.containsKey(cfType)) {
+                Map<String, String[]> cfScoresMap = infoMap.get(cfType);
+                if (null != cfScoresMap && cfScoresMap.containsKey(vid)) {
+                    String[] scores = cfScoresMap.get(vid);
+                    Double score1 = Double.parseDouble(scores[0]);
+                    Double score2 = Double.parseDouble(scores[1]);
+                    Double score3 = Double.parseDouble(scores[2]) <= 0 ? 0D : 1.0 / Double.parseDouble(scores[2]);
+                    featMap.put(prefix + "_" + cfType + "_score", score1);
+                    featMap.put(prefix + "_" + cfType + "_num", score2);
+                    featMap.put(prefix + "_" + cfType + "_rank", score3);
+                }
+            }
+        }
+    }
+
+    public static void getVideoFeature(String vid, Map<String, Map<String, Map<String, String>>> videoOriginInfo, Map<String, Double> featMap) {
+        oneTypeStatFeature("b1", "return_1_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_global_feature_20250212"), featMap);
+        oneTypeStatFeature("b2", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b3", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_recommend_flowpool_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b4", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_apptype_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b5", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_province_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b6", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_brand_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b7", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_hotsencetype_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b8", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_merge_cate1_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b9", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_merge_cate2_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b10", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_channel_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b11", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_festive_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b12", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_vid_long_period_recommend_exp_feature_20250212"), featMap);
+        oneTypeStatFeature("b13", "return_n_uv", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_video_unionid_recommend_exp_feature_20250212"), featMap);
+
+        // head video cf
+        headVideoCFD1Feature("d1", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("scene_type_vid_cf_feature_20250212"), featMap);
+        headVideoCFD2Feature("d2", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("vid_click_cf_feature_20250212"), featMap);
+        headVideoCFD3Feature("d3", videoOriginInfo.getOrDefault(vid, new HashMap<>()).get("alg_recsys_feature_cf_i2i_v2"), featMap);
+    }
+
+    public static void getVideoBaseFeature(String prefix, long currentMs, Map<String, String> videoInfo, Map<String, Double> featMap) {
+        if (null == videoInfo || videoInfo.isEmpty()) {
+            return;
+        }
+        featMap.put(prefix + "@total_time", Double.parseDouble(videoInfo.getOrDefault("total_time", "0")));
+        featMap.put(prefix + "@bit_rate", Double.parseDouble(videoInfo.getOrDefault("bit_rate", "0")));
+
+        // cate
+        getVideoCateFeature(prefix, videoInfo, featMap);
+
+        // time
+        double createTime = 1D;
+        try {
+            if (videoInfo.containsKey("gmt_create_timestamp")) {
+                String createMsStr = videoInfo.get("gmt_create_timestamp");
+                long createMs = Long.parseLong(createMsStr);
+                createTime = FeatureUtils.getTimeDiff(currentMs, createMs);
+            }
+        } catch (Exception ignored) {
+        }
+        featMap.put(prefix + "@ts", createTime);
+    }
+
+    public static void getHeadRankVideoCrossFeature(Map<String, String> headInfo, Map<String, String> rankInfo, Map<String, Double> featMap) {
+        getTwoVideoCrossFeature("hr_sim", FeatureTransform.videoSimAttrs, headInfo, rankInfo, featMap);
+    }
+
+    public static void getProfileVideoCrossFeature(long currentMs, UserShareReturnProfile profile, Map<String, String> rankVideo, Map<String, Map<String, String>> hVideoMap, Map<String, Double> featMap) {
+        if (null == profile) {
+            return;
+        }
+
+        int maxN = 2;
+        getRSCrossFeature("mss", currentMs, maxN, profile.getM_s_s(), rankVideo, hVideoMap, featMap);
+        getRSCrossFeature("mrs", currentMs, maxN, profile.getM_r_s(), rankVideo, hVideoMap, featMap);
+        getRSCrossFeature("lss", currentMs, maxN, profile.getL_s_s(), rankVideo, hVideoMap, featMap);
+        getRSCrossFeature("lrs", currentMs, maxN, profile.getL_r_s(), rankVideo, hVideoMap, featMap);
+
+        if (null == rankVideo || rankVideo.isEmpty()) {
+            return;
+        }
+        getVideoAttrSRCrossFeature("c1s", rankVideo.getOrDefault("merge_first_level_cate", ""), profile.getC1_s(), featMap);
+        getVideoAttrSRCrossFeature("c2s", rankVideo.getOrDefault("merge_second_level_cate", ""), profile.getC2_s(), featMap);
+        getVideoAttrSRCrossFeature("l1s", rankVideo.getOrDefault("festive_label1", ""), profile.getL1_s(), featMap);
+        getVideoAttrSRCrossFeature("l2s", rankVideo.getOrDefault("festive_label2", ""), profile.getL2_s(), featMap);
+    }
+
+    private static void getRSCrossFeature(String prefix, long currentMs, int maxN, List<UserSRBO> list, Map<String, String> rankVideo, Map<String, Map<String, String>> hVideoMap, Map<String, Double> featMap) {
+        if (null != list && !list.isEmpty()) {
+            for (int i = 0; i < list.size() && i < maxN; i++) {
+                UserSRBO u = list.get(i);
+                if (null != u) {
+                    long id = u.getId();
+                    long cnt = u.getCnt();
+                    long uv = u.getUv();
+                    long ts = u.getTs();
+                    if (id > 0) {
+                        String vid = id + "";
+                        String baseKey = String.format("%s@%d", prefix, i + 1);
+                        if (cnt > 0) {
+                            featMap.put(baseKey + "@cnt", FeatureUtils.log1(cnt));
+                        }
+                        if (uv > 0) {
+                            featMap.put(baseKey + "@uv", FeatureUtils.log1(uv));
+                        }
+                        if (ts > 0) {
+                            featMap.put(baseKey + "@ts", FeatureUtils.getTimeDiff(currentMs, ts * 1000));
+                        }
+                        if (null != hVideoMap && hVideoMap.containsKey(vid)) {
+                            Map<String, String> hVideo = hVideoMap.get(vid);
+                            getVideoCateFeature(prefix, hVideo, featMap);
+                            getTwoVideoCrossFeature(prefix, hVideoSimAttrs, hVideo, rankVideo, featMap);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private static void getVideoAttrSRCrossFeature(String prefix, String attr, Map<String, VideoAttrSRBO> attrMap, Map<String, Double> featMap) {
+        if (null == attrMap || attrMap.isEmpty()) {
+            return;
+        }
+        if (attrMap.containsKey(attr)) {
+            VideoAttrSRBO bo = attrMap.get(attr);
+            if (null != bo) {
+                long sp = bo.getSp();    // share_pv
+                long rp = bo.getRp();    // return_n_pv_noself
+                long ru = bo.getRu();    // return_n_uv_noself
+                long mu = bo.getMu();    // max_return_uv
+                if (sp > 0) {
+                    double sp_s = FeatureUtils.log1(sp);
+                    double rp_s = FeatureUtils.log1(rp);
+                    double ru_s = FeatureUtils.log1(ru);
+                    double mu_s = FeatureUtils.log1(mu);
+
+                    double ros_one = FeatureUtils.wilsonScore(rp, sp);
+                    double ros = FeatureUtils.plusSmooth(ru, sp, smoothPlus);
+                    double ros_minus = FeatureUtils.plusSmooth(ru, rp, smoothPlus);
+
+                    featMap.put(prefix + "@sp", sp_s);
+                    featMap.put(prefix + "@rp", rp_s);
+                    featMap.put(prefix + "@ru", ru_s);
+                    featMap.put(prefix + "@mu", mu_s);
+                    featMap.put(prefix + "@ros_one", ros_one);
+                    featMap.put(prefix + "@ros", ros);
+                    featMap.put(prefix + "@ros_minus", ros_minus);
+                }
+            }
+        }
+    }
+
+    private static void getVideoCateFeature(String prefix, Map<String, String> videoInfo, Map<String, Double> featMap) {
+        if (null == videoInfo || videoInfo.isEmpty()) {
+            return;
+        }
+        for (String attr : videoCateAttrs) {
+            String attrVal = videoInfo.getOrDefault(attr, "");
+            int attrId = FeatureUtils.getAttrId(attr, attrVal);
+            if (attrId > 0) {
+                String key = String.format("%s@%s@%d", prefix, attr, attrId);
+                featMap.put(key, 1.0);
+            }
+        }
+    }
+
+    private static void getTwoVideoCrossFeature(String prefix, List<String> attrs, Map<String, String> video1, Map<String, String> video2, Map<String, Double> featMap) {
+        if (null == video1 || video1.isEmpty() || null == video2 || video2.isEmpty()) {
+            return;
+        }
+        for (String attr : attrs) {
+            String attr1 = video1.getOrDefault(attr, "");
+            String attr2 = video2.getOrDefault(attr, "");
+            if (!"".equals(attr1) && !"unknown".equals(attr1) && !"".equals(attr2) && !"unknown".equals(attr2)) {
+                double simScore = SimilarityUtils.word2VecSimilarity(attr1, attr2);
+                featMap.put(prefix + "_" + attr, simScore);
+            }
+        }
+    }
+
+    private static void headVideoCFD1Feature(String prefix, Map<String, String> infoMap, Map<String, Double> featMap) {
+        double ros_cf_score = getOneInfo("ros_cf_score", infoMap);
+        double ros_cf_rank = getOneInfo("ros_cf_rank", infoMap);
+        double rov_cf_score = getOneInfo("rov_cf_score", infoMap);
+        double rov_cf_rank = getOneInfo("rov_cf_rank", infoMap);
+        featMap.put(prefix + "_ros_cf_score", ros_cf_score);
+        featMap.put(prefix + "_ros_cf_rank", ros_cf_rank);
+        featMap.put(prefix + "_rov_cf_score", rov_cf_score);
+        featMap.put(prefix + "_rov_cf_rank", rov_cf_rank);
+    }
+
+    private static void headVideoCFD2Feature(String prefix, Map<String, String> infoMap, Map<String, Double> featMap) {
+        double score = getOneInfo("score", infoMap);
+        double rank = getOneInfo("rank", infoMap);
+        double onlines = getOneInfo("onlines", infoMap);
+        featMap.put(prefix + "_score", score);
+        featMap.put(prefix + "_rank", rank);
+        featMap.put(prefix + "_onlines", onlines);
+    }
+
+    private static void headVideoCFD3Feature(String prefix, Map<String, String> infoMap, Map<String, Double> featMap) {
+        double exp = getOneInfo("exp", infoMap);
+        double return_n = getOneInfo("return_n", infoMap);
+        double rovn = getOneInfo("rovn", infoMap);
+        featMap.put(prefix + "_exp", FeatureUtils.log1(exp));
+        featMap.put(prefix + "_return_n", FeatureUtils.log1(return_n));
+        featMap.put(prefix + "_rovn", rovn);
+    }
+
+    public static Map<String, Map<String, String[]>> parseUCFScore(List<String> cfList, Map<String, String> mapInfo) {
+        Map<String, Map<String, String[]>> allScoresMap = new HashMap<>();
+        for (String cfType : cfList) {
+            String data = mapInfo.getOrDefault(cfType, "");
+            if (!data.isEmpty()) {
+                Map<String, String[]> oneScoresMap = new HashMap<>();
+                String[] entries = data.split(",");
+                for (String entry : entries) {
+                    String[] rList = entry.split(":");
+                    if (rList.length >= 4) { // 确保分割后有四个元素
+                        String key = rList[0];
+                        String value1 = rList[1];
+                        String value2 = rList[2];
+                        String value3 = rList[3];
+                        String[] strs = {value1, value2, value3};
+                        oneScoresMap.put(key, strs);
+                    }
+                }
+                if (!oneScoresMap.isEmpty()) {
+                    allScoresMap.put(cfType, oneScoresMap);
+                }
+            }
+        }
+        return allScoresMap;
+    }
+
+    private static void u2uFeature(String prefix, Map<String, String> infoMap, Map<String, Double> featMap) {
+        if (null == infoMap || infoMap.isEmpty()) {
+            return;
+        }
+        for (String calType : Arrays.asList("avg", "max", "min")) {
+            for (String period : periods) {
+                double str_one = getOneInfo(calType + "_str_one_" + period, infoMap);
+                double ros_one = getOneInfo(calType + "_ros_one_" + period, infoMap);
+                double str = getOneInfo(calType + "_str_" + period, infoMap);
+                double ros = getOneInfo(calType + "_ros_" + period, infoMap);
+                double str_plus = getOneInfo(calType + "_str_plus_" + period, infoMap);
+                double ros_minus = getOneInfo(calType + "_ros_minus_" + period, infoMap);
+                double rovn = getOneInfo(calType + "_rovn_" + period, infoMap);
+
+                featMap.put(prefix + "_" + period + "_" + calType + "_str_one_", str_one);
+                featMap.put(prefix + "_" + period + "_" + calType + "_ros_one_", ros_one);
+                featMap.put(prefix + "_" + period + "_" + calType + "_str_", str);
+                featMap.put(prefix + "_" + period + "_" + calType + "_ros_", ros);
+                featMap.put(prefix + "_" + period + "_" + calType + "_str_plus_", str_plus);
+                featMap.put(prefix + "_" + period + "_" + calType + "_ros_minus_", ros_minus);
+                featMap.put(prefix + "_" + period + "_" + calType + "_rovn_", rovn);
+            }
+        }
+    }
+
+    private static void oneTypeStatFeature(String prefix, String uvPrefix, Map<String, String> infoMap, Map<String, Double> featMap) {
+        if (null == infoMap || infoMap.isEmpty()) {
+            return;
+        }
+        for (String period : periods) {
+            double exp = getOneInfo("exp_" + period, infoMap);
+            double is_share = getOneInfo("is_share_" + period, infoMap);
+            double share_cnt = getOneInfo("share_cnt_" + period, infoMap);
+            double is_return_1 = getOneInfo("is_return_1_" + period, infoMap);
+            double return_n_uv = getOneInfo(uvPrefix + "_" + period, infoMap);
+
+            double exp_s = FeatureUtils.log1(exp);
+            double is_share_s = FeatureUtils.log1(is_share);
+            double share_cnt_s = FeatureUtils.log1(share_cnt);
+            double is_return_1_s = FeatureUtils.log1(is_return_1);
+            double return_n_uv_s = FeatureUtils.log1(return_n_uv);
+
+            double str = FeatureUtils.wilsonScore(is_share, exp);
+            double str_plus = FeatureUtils.wilsonScore(is_return_1, exp);
+            double ros_one = FeatureUtils.wilsonScore(is_return_1, is_share);
+
+            double rovn = FeatureUtils.plusSmooth(return_n_uv, exp, smoothPlus);
+            double ros = FeatureUtils.plusSmooth(return_n_uv, is_share, smoothPlus);
+            double ros_n = FeatureUtils.plusSmooth(return_n_uv, share_cnt, smoothPlus);
+            double ros_minus = FeatureUtils.plusSmooth(return_n_uv, is_return_1, smoothPlus);
+
+            featMap.put(prefix + "_" + period + "_" + "exp", exp_s);
+            featMap.put(prefix + "_" + period + "_" + "is_share", is_share_s);
+            featMap.put(prefix + "_" + period + "_" + "share_cnt", share_cnt_s);
+            featMap.put(prefix + "_" + period + "_" + "is_return_1", is_return_1_s);
+            featMap.put(prefix + "_" + period + "_" + "return_n_uv", return_n_uv_s);
+            featMap.put(prefix + "_" + period + "_" + "str", str);
+            featMap.put(prefix + "_" + period + "_" + "str_plus", str_plus);
+            featMap.put(prefix + "_" + period + "_" + "ros_one", ros_one);
+            featMap.put(prefix + "_" + period + "_" + "rovn", rovn);
+            featMap.put(prefix + "_" + period + "_" + "ros", ros);
+            featMap.put(prefix + "_" + period + "_" + "ros_n", ros_n);
+            featMap.put(prefix + "_" + period + "_" + "ros_minus", ros_minus);
+        }
+    }
+
+    private static double getOneInfo(String name, Map<String, String> map) {
+        if (null == map) {
+            return 0.0;
+        }
+        return map.isEmpty() ? 0 : Double.parseDouble(map.getOrDefault(name, "0.0"));
+    }
+}

+ 63 - 2
src/main/scala/com/aliyun/odps/spark/examples/myUtils/FeatureUtils.java

@@ -1,6 +1,10 @@
 package com.aliyun.odps.spark.examples.myUtils;
 
+import examples.utils.SimilarityUtils;
+
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 public class FeatureUtils {
@@ -77,11 +81,68 @@ public class FeatureUtils {
         }
     }
 
-    public static double getCreateTime(long currentMs, long createMs) {
-        double diff = (currentMs - createMs) / oneYearMs;
+    public static double getTimeDiff(long currentMs, long historyMs) {
+        double diff = (currentMs - historyMs) / oneYearMs;
         if (diff > 1.0) {
             diff = 1.0;
         }
         return diff;
     }
+
+    public static double log1(double data) {
+        if (data <= 0) {
+            return 0D;
+        }
+        return Math.log(data + 1.0);
+    }
+
+    public static double plusSmooth(double a, double b, double plus) {
+        if (a == 0 || b == 0) {
+            return 0D;
+        }
+        return a / (b + plus);
+    }
+
+    public static double wilsonScore(double click, double exposure) {
+        if (exposure <= 0) {
+            return 0.0;
+        }
+
+        double z = 1.96;
+        double ctr = click / exposure;
+        if (ctr > 1.0) {
+            ctr = 1.0;
+        }
+
+        double numerator_1 = ctr + Math.pow(z, 2) / (2 * exposure);
+        double numerator_2 = z * Math.sqrt((ctr * (1 - ctr)) / exposure + Math.pow(z / (2 * exposure), 2));
+        double denominator = 1.0 + Math.pow(z, 2) / exposure;
+        return (numerator_1 - numerator_2) / denominator;
+    }
+
+    public static Double[] funcC34567ForTagsNew(String tags, String title) {
+        String[] tagsList = tags.split(",");
+        int d1 = 0;
+        List<String> d2 = new ArrayList<>();
+        double d3 = 0.0;
+        double d4 = 0.0;
+
+        for (String tag : tagsList) {
+            if (title.contains(tag)) {
+                d1++;
+                d2.add(tag);
+            }
+            double score = SimilarityUtils.word2VecSimilarity(tag, title);
+            if (score > d3) {
+                d3 = score;
+            }
+            d4 += score;
+        }
+
+        d4 = (tagsList.length > 0) ? d4 / tagsList.length : d4;
+
+        // 使用数组来返回多个值
+        Double[] result = {(double) d1, d3, d4};
+        return result;
+    }
 }

+ 13 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/UserSRBO.java

@@ -0,0 +1,13 @@
+package com.aliyun.odps.spark.examples.myUtils;
+
+import lombok.Data;
+
+@Data
+public class UserSRBO {
+    // JSON_OBJECT("id",vid,"cnt",share_cnt,"ts",ts)
+    // JSON_OBJECT("id",vid,"uv",return_n_uv_noself,"ts",ts)
+    private long id;   // return vid
+    private long cnt;  // share cnt
+    private long uv;   // return uv
+    private long ts;   // view ts
+}

+ 73 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/UserShareReturnProfile.java

@@ -0,0 +1,73 @@
+package com.aliyun.odps.spark.examples.myUtils;
+
+import com.alibaba.fastjson.JSON;
+import lombok.Data;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Data
+public class UserShareReturnProfile {
+    private long s_pv;                  // share_pv(分享pv)
+    private long s_cnt;                 // share_cnt(分享次数)
+    private long r_pv;                  // return_pv(回流pv)
+    private long r_uv;                  // return_uv(回流uv)
+    private long m_s_cnt;               // max_share_cnt(最大分享次数)
+    private long m_r_uv;                // max_return_uv(最大回流uv)
+    private List<UserSRBO> m_s_s;    // max_share_seq(最大分享序列)
+    private List<UserSRBO> m_r_s;   // max_return_seq(最大回流序列)
+    private List<UserSRBO> l_s_s;    // last_share_seq(最近分享序列)
+    private List<UserSRBO> l_r_s;   // last_return_seq(最近回流序列)
+    private Map<String, VideoAttrSRBO> c1_s;   // cate1_seq(merge_first_level_cate序列-回流率)
+    private Map<String, VideoAttrSRBO> c2_s;   // cate2_seq(merge_second_level_cate序列-回流率)
+    private Map<String, VideoAttrSRBO> l1_s;   // label1_seq(festive_label1序列-回流率)
+    private Map<String, VideoAttrSRBO> l2_s;   // label2_seq(festive_label2序列-回流率)
+
+    public void setM_s_s(String data) {
+        this.m_s_s = JSON.parseArray(data, UserSRBO.class);
+    }
+
+    public void setM_r_s(String data) {
+        this.m_r_s = JSON.parseArray(data, UserSRBO.class);
+    }
+
+    public void setL_s_s(String data) {
+        this.l_s_s = JSON.parseArray(data, UserSRBO.class);
+    }
+
+    public void setL_r_s(String data) {
+        this.l_r_s = JSON.parseArray(data, UserSRBO.class);
+    }
+
+    public void setC1_s(String data) {
+        this.c1_s = parseVideoAttrSR(data);
+    }
+
+    public void setC2_s(String data) {
+        this.c2_s = parseVideoAttrSR(data);
+    }
+
+    public void setL1_s(String data) {
+        this.l1_s = parseVideoAttrSR(data);
+    }
+
+    public void setL2_s(String data) {
+        this.l2_s = parseVideoAttrSR(data);
+    }
+
+    private Map<String, VideoAttrSRBO> parseVideoAttrSR(String data) {
+        Map<String, VideoAttrSRBO> map = new HashMap<>();
+        if (null != data && !data.isEmpty()) {
+            List<VideoAttrSRBO> list = JSON.parseArray(data, VideoAttrSRBO.class);
+            if (null != list) {
+                for (VideoAttrSRBO v : list) {
+                    if (null != v) {
+                        map.put(v.getNa(), v);
+                    }
+                }
+            }
+        }
+        return map;
+    }
+}

+ 16 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/VideoAttrSRBO.java

@@ -0,0 +1,16 @@
+package com.aliyun.odps.spark.examples.myUtils;
+
+import lombok.Data;
+
+@Data
+public class VideoAttrSRBO {
+    // JSON_OBJECT("na",cate1,"sp",share_pv,"rp",return_n_pv_noself,"ru",return_n_uv_noself,"mu",max_return_uv)
+    // JSON_OBJECT("na",cate2,"sp",share_pv,"rp",return_n_pv_noself,"ru",return_n_uv_noself,"mu",max_return_uv)
+    // JSON_OBJECT("na",label1,"sp",share_pv,"rp",return_n_pv_noself,"ru",return_n_uv_noself,"mu",max_return_uv)
+    // JSON_OBJECT("na",label2,"sp",share_pv,"rp",return_n_pv_noself,"ru",return_n_uv_noself,"mu",max_return_uv)
+    private String na;  // attr name
+    private long sp;    // share_pv
+    private long rp;    // return_n_pv_noself
+    private long ru;    // return_n_uv_noself
+    private long mu;    // max_return_uv
+}