jch hai 2 meses
pai
achega
1a60d1b31f

+ 12 - 9
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_82_originData_20250221.scala

@@ -33,11 +33,11 @@ object makedata_recsys_82_originData_20250221 {
       .reduceByKey((a, b) => if (a.size() > b.size()) a else b)
   }
 
-  private def getVidMidRdd(logRdd: RDD[Record]): RDD[(String, String)] = {
+  private def getVidMidRdd(logRdd: RDD[java.util.Map[String, String]]): RDD[(String, String)] = {
     logRdd
-      .map(record => {
-        val mid = record.getString("mid")
-        val c9 = record.getString("c9_feature")
+      .map(raw => {
+        val mid = raw.getOrElse("mid", "")
+        val c9 = raw.getOrElse("c9_feature", "")
         (mid, c9)
       })
       .filter(_._1.nonEmpty)
@@ -63,11 +63,11 @@ object makedata_recsys_82_originData_20250221 {
       })
   }
 
-  private def joinMidSeq(logRdd: RDD[Record], midSeqRdd: RDD[(String, List[java.util.Map[String, String]])]): RDD[(Record, List[java.util.Map[String, String]])] = {
+  private def joinMidSeq(logRdd: RDD[java.util.Map[String, String]], midSeqRdd: RDD[(String, List[java.util.Map[String, String]])]): RDD[(java.util.Map[String, String], List[java.util.Map[String, String]])] = {
     logRdd
-      .map(record => {
-        val mid = record.getString("mid")
-        (mid, record)
+      .map(raw => {
+        val mid = raw.getOrElse("mid", "")
+        (mid, raw)
       })
       .leftOuterJoin(midSeqRdd)
       .map(raw => {
@@ -75,7 +75,7 @@ object makedata_recsys_82_originData_20250221 {
       })
   }
 
-  private def getFeature(rdd: RDD[(Record, List[java.util.Map[String, String]])]): RDD[String] = {
+  private def getFeature(rdd: RDD[(java.util.Map[String, String], List[java.util.Map[String, String]])]): RDD[String] = {
     rdd.mapPartitions(partition => {
       SimilarityUtils.init()
       partition.map(raw => {
@@ -127,6 +127,9 @@ object makedata_recsys_82_originData_20250221 {
 
       // d. 样本重采样
       val resampleData = DataUtils.resample(whatLabel, fuSampleRate, odpsData)
+        .map(record => {
+          ConvertUtils.record2Map(record)
+        })
 
       // e. get vid mid rdd
       val vidMidRdd = getVidMidRdd(resampleData)

+ 42 - 6
src/main/scala/com/aliyun/odps/spark/examples/myUtils/ConvertUtils.java

@@ -2,6 +2,7 @@ package com.aliyun.odps.spark.examples.myUtils;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.Column;
 import com.aliyun.odps.data.Record;
 import com.google.gson.Gson;
 
@@ -10,13 +11,13 @@ import java.math.RoundingMode;
 import java.util.*;
 
 public class ConvertUtils {
-    public static JSONObject getFeature(Record record, List<Map<String, String>> videoSeq, int scale) {
+    public static JSONObject getFeature(Map<String, String> record, List<Map<String, String>> videoSeq, int scale) {
         Map<String, Double> featMap = new HashMap<>();
 
         // origin info
-        String ts = record.getString("ts");
+        String ts = record.get("ts");
         long currentMs = Long.parseLong(ts) * 1000;
-        String vid = record.getString("vid");
+        String vid = record.get("vid");
         Map<String, String> headInfo = getRecordCol(record, "v2_feature");
         Map<String, String> rankInfo = getRecordCol(record, "v1_feature");
         Map<String, Map<String, String>> userOriginInfo = getUserOriginInfo(record);
@@ -60,7 +61,7 @@ public class ConvertUtils {
         return filterAndTruncate(featMap, scale);
     }
 
-    private static Map<String, Map<String, String>> getUserOriginInfo(Record record) {
+    private static Map<String, Map<String, String>> getUserOriginInfo(Map<String, String> record) {
         Map<String, Map<String, String>> map = new HashMap<>();
         map.put("mid_global_feature_20250212", getRecordCol(record, "c1_feature"));
         map.put("mid_merge_cate1_feature_20250212", getRecordCol(record, "c2_feature"));
@@ -74,7 +75,7 @@ public class ConvertUtils {
         return map;
     }
 
-    private static Map<String, Map<String, Map<String, String>>> getVideoOriginInfo(Record record) {
+    private static Map<String, Map<String, Map<String, String>>> getVideoOriginInfo(Map<String, String> record) {
         Map<String, Map<String, String>> map = new HashMap<>();
         map.put("alg_vid_global_feature_20250212", getRecordCol(record, "b1_feature"));
         map.put("alg_vid_recommend_exp_feature_20250212", getRecordCol(record, "b2_feature"));
@@ -94,7 +95,7 @@ public class ConvertUtils {
         map.put("alg_recsys_feature_cf_i2i_v2", getRecordCol(record, "d3_feature"));
 
         Map<String, Map<String, Map<String, String>>> allMap = new HashMap<>();
-        String vid = record.getString("vid");
+        String vid = record.get("vid");
         allMap.put(vid, map);
         return allMap;
     }
@@ -137,6 +138,41 @@ public class ConvertUtils {
         return colMap;
     }
 
+    public static Map<String, String> getRecordCol(Map<String, String> record, String col) {
+        Map<String, String> colMap = new HashMap<>();
+        if (null != record && record.containsKey(col)) {
+            try {
+                JSONObject json = JSON.parseObject(record.get(col));
+                if (null != json) {
+                    for (Map.Entry<String, Object> entry : json.entrySet()) {
+                        Object obj = entry.getValue();
+                        if (null != obj) {
+                            colMap.put(entry.getKey(), obj.toString());
+                        }
+                    }
+                }
+            } catch (Exception ignored) {
+            }
+        }
+        return colMap;
+    }
+
+    public static Map<String, String> record2Map(Record record) {
+        Map<String, String> map = new HashMap<>();
+        if (null != record) {
+            Column[] columns = record.getColumns();
+            if (null != columns) {
+                for (Column column : columns) {
+                    String name = column.getName();
+                    if (!record.isNull(name)) {
+                        map.put(name, record.getString(name));
+                    }
+                }
+            }
+        }
+        return map;
+    }
+
     public static Map<String, Map<String, String>> list2Map(List<Map<String, String>> videSeq) {
         Map<String, Map<String, String>> map = new HashMap<>();
         if (null != videSeq && !videSeq.isEmpty()) {

+ 17 - 17
src/main/scala/com/aliyun/odps/spark/examples/myUtils/DataUtils.scala

@@ -27,25 +27,25 @@ object DataUtils {
     })
   }
 
-  def getLogKey(record: Record): String = {
-    val apptype = record.getString("apptype")
-    val page = getStringValue(record, "page")
-    val pagesource = getStringValue(record, "pagesource")
-    val recommendpagetype = getStringValue(record, "recommendpagetype")
-    val flowpool = getStringValue(record, "flowpool")
-    val abcode = record.getString("abcode")
-    val mid = record.getString("mid")
-    val vid = getStringValue(record, "vid")
-    val level = getStringValue(record, "level", "0")
-    val ts = record.getString("ts")
+  def getLogKey(record: java.util.Map[String, String]): String = {
+    val apptype = record.getOrElse("apptype", "")
+    val page = record.getOrElse("page", "")
+    val pagesource = record.getOrElse("pagesource", "")
+    val recommendpagetype = record.getOrElse("recommendpagetype", "")
+    val flowpool = record.getOrElse("flowpool", "")
+    val abcode = record.getOrElse("abcode", "")
+    val mid = record.getOrElse("mid", "")
+    val vid = record.getOrElse("vid", "")
+    val level = record.getOrElse("level", "0")
+    val ts = record.getOrElse("ts", "0")
     (apptype, page, pagesource, recommendpagetype, flowpool, abcode, mid, vid, level, ts).productIterator.mkString(",")
   }
 
-  def getLabels(names: List[String], record: Record): JSONObject = {
+  def getLabels(names: List[String], record: java.util.Map[String, String]): JSONObject = {
     val labels = new JSONObject
     for (name <- names) {
-      if (!record.isNull(name)) {
-        labels.put(name, record.getString(name))
+      if (record.containsKey(name)) {
+        labels.put(name, record.get(name))
       }
     }
     labels
@@ -73,9 +73,9 @@ object DataUtils {
     default
   }
 
-  def getSubJson(record: Record, key1: String, key2: String): JSONObject = {
-    if (!record.isNull(key1)) {
-      val obj = JSON.parseObject(record.getString(key1))
+  def getSubJson(record: java.util.Map[String, String], key1: String, key2: String): JSONObject = {
+    if (record.containsKey(key1)) {
+      val obj = JSON.parseObject(record.get(key1))
       if (obj.nonEmpty && obj.containsKey(key2)) {
         val data = obj.getString(key2)
         return JSON.parseObject(data.replace("\\", ""))