jch 21 órája
szülő
commit
91513563bc

+ 11 - 6
src/main/scala/com/aliyun/odps/spark/examples/makedata_recsys_r_rate/makedata_recsys_83_originData_20250317.scala

@@ -22,6 +22,12 @@ object makedata_recsys_83_originData_20250317 {
     "is_return_noself", "return_1_uv_noself",
     "is_return_n_noself", "return_n_uv_noself"
   )
+  private val o2oMap = Map(
+    "d1_feature" -> "scene_type_vid_cf_feature_20250212",
+    "d2_feature" -> "vid_click_cf_feature_20250212",
+    "d3_feature" -> "alg_recsys_feature_cf_i2i_v2",
+    "v2_feature" -> "head_video"
+  )
 
   private def parseVideoRdd(videoRdd: RDD[Record]): RDD[(String, java.util.Map[String, String])] = {
     videoRdd
@@ -161,15 +167,14 @@ object makedata_recsys_83_originData_20250317 {
 
       // 2.5 样本重采样
       val filterColumns = Set("allfeaturemap", "metafeaturemap")
-      val onlineKeys = Set("head_video")
       val resampleData = DataUtils.resampleWithoutInterception(whatLabel, fuSampleRate, odpsData)
         .map(record => {
-          val map = ConvertUtils.record2Map(record, filterColumns, onlineKeys)
-          val page = map.getOrElse("page", "")
-          if (page.equals("详情页") && map.containsKey("head_video")) {
-            map.put("v2_feature", map.get("head_video"))
+          val page = getStringValue(record, "page")
+          if (page.equals("详情页")) {
+            OnlineLogUtils.log2Map(record, o2oMap)
+          } else {
+            ConvertUtils.record2Map(record, filterColumns)
           }
-          map
         })
 
       // 2.6 join video stat

+ 0 - 35
src/main/scala/com/aliyun/odps/spark/examples/myUtils/ConvertUtils.java

@@ -87,41 +87,6 @@ public class ConvertUtils {
         return map;
     }
 
-    public static Map<String, String> record2Map(Record record, Set<String> filterColumns, Set<String> onlineKeys) {
-        Map<String, String> map = new HashMap<>();
-        if (null != record) {
-            Column[] columns = record.getColumns();
-            if (null != columns) {
-                for (Column column : columns) {
-                    String name = column.getName();
-                    if (filterColumns.contains(name)) {
-                        if (name.equals("metafeaturemap")) {
-                            String data = record.getString(name);
-                            if (null != data && data.length() > 4) {
-                                JSONObject json = JSON.parseObject(data);
-                                if (null != json) {
-                                    for (String key : onlineKeys) {
-                                        if (json.containsKey(key)) {
-                                            String value = json.getString(key);
-                                            if (null != value && value.length() > 4) {
-                                                map.put(key, value);
-                                            }
-                                        }
-                                    }
-                                }
-                            }
-                        }
-                        continue;
-                    }
-                    if (!record.isNull(name)) {
-                        map.put(name, record.getString(name));
-                    }
-                }
-            }
-        }
-        return map;
-    }
-
     public static Map<String, Map<String, String>> list2Map(List<Map<String, String>> videSeq) {
         Map<String, Map<String, String>> map = new HashMap<>();
         if (null != videSeq && !videSeq.isEmpty()) {

+ 49 - 0
src/main/scala/com/aliyun/odps/spark/examples/myUtils/OnlineLogUtils.java

@@ -0,0 +1,49 @@
+package com.aliyun.odps.spark.examples.myUtils;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.Column;
+import com.aliyun.odps.data.Record;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class OnlineLogUtils {
+    public static Map<String, String> log2Map(Record record, Map<String, String> o2oMap) {
+        Map<String, String> map = new HashMap<>();
+        if (null != record) {
+            Column[] columns = record.getColumns();
+            if (null != columns) {
+                for (Column column : columns) {
+                    String name = column.getName();
+                    if (o2oMap.containsKey(name)) {
+                        continue;
+                    }
+                    if (!record.isNull(name)) {
+                        if (!name.equals("metafeaturemap")) {
+                            map.put(name, record.getString(name));
+                        } else {
+                            String data = record.getString(name);
+                            if (null != data && data.length() > 4) {
+                                JSONObject json = JSON.parseObject(data);
+                                if (null != json) {
+                                    for (Map.Entry<String, String> entry : o2oMap.entrySet()) {
+                                        String offKey = entry.getKey();
+                                        String onlineKey = entry.getValue();
+                                        if (json.containsKey(onlineKey)) {
+                                            String value = json.getString(onlineKey);
+                                            if (null != value && value.length() > 4) {
+                                                map.put(offKey, value);
+                                            }
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+        return map;
+    }
+}