sunmingze 1 年之前
父節點
當前提交
83837fb109

+ 96 - 0
src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkEMRShareRatioSampleLoader.java

@@ -0,0 +1,96 @@
+package com.aliyun.odps.spark.examples.sparksql;
+
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.Record;
+import com.google.common.collect.ListMultimap;
+import com.tzld.piaoquan.data.base.*;
+import com.tzld.piaoquan.data.dataloader.FeatureConstructor;
+import com.tzld.piaoquan.data.score.feature.VlogShareLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.server.gen.recommend.BaseFeature;
+import com.tzld.piaoquan.recommend.server.gen.recommend.FeatureGroup;
+import org.apache.spark.SparkConf;
+import org.apache.spark.aliyun.odps.OdpsOps;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+
+public class SparkEMRShareRatioSampleLoader {
+
+    public static void main(String[] args) {
+
+        String partition = "dt=20231123";
+        String accessId = "LTAIWYUujJAm7CbH";
+        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+        String odpsUrl = "http://service.odps.aliyun.com/api";
+        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+        String project = "loghubods";
+        String table = "alg_recsys_view_sample";
+        String hdfsPath = "/dw/recommend/model/share_ratio_samples/" + partition;
+
+        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+        System.out.println("Read odps table...");
+
+        JavaRDD<String> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(50));
+        readData.saveAsTextFile(hdfsPath);
+    }
+
+
+    static class RecordsToSamples implements Function2<Record, TableSchema, String> {
+        @Override
+        public String call(Record record, TableSchema schema) throws Exception {
+            String labelName = "share_ornot";
+            String ret = singleParse(record, labelName);
+            return ret;
+        }
+    }
+
+
+    // 单条日志处理逻辑
+    public static String singleParse(Record record, String labelName) {
+        // 数据解析
+        String label = record.getString(labelName);
+        if (label == null) {
+            label = "0";
+        }
+
+        // 从sql的 record中 初始化对象内容
+        RequestContext requestContext = FeatureConstructor.constructRequestContext(record);
+        UserFeature userFeature = FeatureConstructor.constructUserFeature(record);
+        ItemFeature itemFeature = FeatureConstructor.constructItemFeature(record);
+
+        // 转化成bytes
+        RequestContextBytesFeature requestContextBytesFeature = new RequestContextBytesFeature(requestContext);
+        UserBytesFeature userBytesFeature = new UserBytesFeature(userFeature);
+        VideoBytesFeature videoBytesFeature = new VideoBytesFeature(itemFeature);
+
+        // 特征抽取
+        VlogShareLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new VlogShareLRFeatureExtractor();
+
+        bytesFeatureExtractor.getUserFeatures(userBytesFeature);
+        bytesFeatureExtractor.getItemFeature(videoBytesFeature);
+        bytesFeatureExtractor.getContextFeatures(requestContextBytesFeature);
+
+        ListMultimap<FeatureGroup, BaseFeature> featureMap = bytesFeatureExtractor.getFeatures();
+        return parseSamplesToString(label, featureMap);
+    }
+
+    // 构建样本的字符串
+    public static String parseSamplesToString(String label, ListMultimap<FeatureGroup, BaseFeature> featureMap) {
+        ArrayList<String> featureList = new ArrayList<String>();
+        for (Map.Entry<FeatureGroup, BaseFeature> entry : featureMap.entries()) {
+            FeatureGroup groupedFeature = entry.getKey();
+            BaseFeature baseFeature = entry.getValue();
+            Long featureIdentifier = baseFeature.getIdentifier();
+            featureList.add(String.valueOf(featureIdentifier) + ":1");
+        }
+        return label + "\t" + String.join("\t", featureList);
+    }
+
+}