sunmingze 1 anno fa
parent
commit
cc6d87021d

+ 46 - 0
src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkGrapDataLoader.java

@@ -0,0 +1,46 @@
+package com.aliyun.odps.spark.examples.sparksql;
+
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.Record;
+import org.apache.spark.SparkConf;
+import org.apache.spark.aliyun.odps.OdpsOps;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+
+
+public class SparkGrapDataLoader {
+
+    public static void main(String[] args) {
+
+        String partition = "dt=20231204";
+        String accessId = "LTAIWYUujJAm7CbH";
+        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+        String odpsUrl = "http://service.odps.aliyun.com/api";
+        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+        String project = "loghubods";
+        String table = "share_machinecode";
+        String hdfsPath = "/dw/recommend/model/graph_data/" + partition;
+
+        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+        System.out.println("Read odps table...");
+
+        JavaRDD<String> readData = odpsOps.readTableWithJava(project, table, partition, new RecordsToSamples(), Integer.valueOf(50));
+        readData.filter(line-> line.split("\t")[2].equals("21")).saveAsTextFile(hdfsPath);
+    }
+
+
+    static class RecordsToSamples implements Function2<Record, TableSchema, String>  {
+        @Override
+        public String call(Record record, TableSchema schema) throws Exception {
+            String sourceUser = record.getString("machinecode_share");
+            String destUser = record.getString("machinecode_click");
+            String appType = record.getString("apptype");
+            return sourceUser + "\t" + destUser + "\t" + appType;
+        }
+    }
+
+
+}

+ 32 - 32
src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkLoadDataRedisMaxCompute.java → src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkUserFeaToRedisLoader.java

@@ -1,24 +1,22 @@
-
-
 package com.aliyun.odps.spark.examples.sparksql;
 
 import com.aliyun.odps.data.Record;
-import com.tzld.piaoquan.data.base.UserFeature;
-import com.tzld.piaoquan.data.dataloader.SampleFeatureConstructor;
-import org.apache.spark.sql.SparkSession;
+import com.tzld.piaoquan.data.base.*;
+import com.tzld.piaoquan.data.dataloader.RedisFeatureConstructor;
+import org.apache.spark.SparkConf;
+import org.apache.spark.aliyun.odps.OdpsOps;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.Dataset;
-
-import java.util.HashMap;
-import java.util.Map;
-
 import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
 import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 
-public class SparkLoadDataRedisMaxCompute {
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class SparkUserFeaToRedisLoader {
 
     private static final String userKeyFormat = "user:%s";
 
@@ -38,7 +36,7 @@ public class SparkLoadDataRedisMaxCompute {
 
     public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, Record line) {
         Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
-        UserFeature userFeature = SampleFeatureConstructor.constructUserFeature(line);
+        UserFeature userFeature = RedisFeatureConstructor.constructUserFeature(line);
         String key = String.format(userKeyFormat, userFeature.getKey());
         String value = userFeature.getValue();
         userFeaRedisFormat.put(key, value);
@@ -46,34 +44,36 @@ public class SparkLoadDataRedisMaxCompute {
     }
 
 
+    public static void main(String[] args) {
 
-    public static void main(String[] args) throws Exception {
-        SparkSession spark = SparkSession
-                .builder()
-                .appName("SparkSQL-on-MaxCompute")
-                .config("spark.sql.defaultCatalog","odps")
-                .config("spark.sql.catalog.odps", "org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog")
-                .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
-                .config("spark.sql.extensions", "org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions")
-                .config("spark.sql.catalogImplementation","hive")
-                .getOrCreate();
-        JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
-
-
+        String partition = "dt=20231211";
+        String accessId = "LTAIWYUujJAm7CbH";
+        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+        String odpsUrl = "http://service.odps.aliyun.com/api";
+        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+        String project = "loghubods";
+        String table = "alg_recsys_view_sample";
 
-        // 读 分区表
-        Dataset<Row> rptdf = spark.sql("select * from loghubods.alg_recsys_user_info where dt='20231210' limit 1000");
-        System.out.println("rptdf count: " + rptdf.count());
-        rptdf.printSchema();
+        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
+        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+        System.out.println("Read odps table...");
 
+        JavaRDD<Record> readData = odpsOps.readTableWithJava(project, table, partition, null, Integer.valueOf(50));
 
-        rptdf.toJavaRDD().foreachPartition(
+        readData.foreachPartition(
                 rowIterator -> {
                     RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
-                 //   rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
+                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
                 }
         );
 
 
+
+
+
     }
+
+
+
 }

+ 165 - 0
src/main/java/com/tzld/piaoquan/data/dataloader/RedisFeatureConstructor.java

@@ -0,0 +1,165 @@
+package com.tzld.piaoquan.data.dataloader;
+
+
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.data.base.ItemFeature;
+import com.tzld.piaoquan.data.base.RequestContext;
+import com.tzld.piaoquan.data.base.UserActionFeature;
+import com.tzld.piaoquan.data.base.UserFeature;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RedisFeatureConstructor {
+
+    private static final String BUCKET_NAME = "ali-recommend";
+    private static final Map<String, String> ODPS_CONFIG = new HashMap<String, String>();
+
+    static {
+        ODPS_CONFIG.put("ENDPOINT", "http://service.cn.maxcompute.aliyun.com/api");
+        ODPS_CONFIG.put("ACCESSID", "LTAIWYUujJAm7CbH");
+        ODPS_CONFIG.put("ACCESSKEY", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P");
+    }
+
+    ;
+
+    private static final Account account = new AliyunAccount(ODPS_CONFIG.get("ACCESSID"), ODPS_CONFIG.get("ACCESSKEY"));
+
+
+    public static RequestContext constructRequestContext(Record record) {
+        RequestContext requestContext = new RequestContext();
+        requestContext.setApptype(record.getString("apptype"));
+        requestContext.setMachineinfo_brand(record.getString("machineinfo_brand"));
+        requestContext.setMachineinfo_model(record.getString("machineinfo_model"));
+        requestContext.setMachineinfo_platform(record.getString("machineinfo_platform"));
+        requestContext.setMachineinfo_sdkversion(record.getString("machineinfo_sdkversion"));
+        requestContext.setMachineinfo_system(record.getString("machineinfo_system"));
+        requestContext.setMachineinfo_wechatversion(record.getString("machineinfo_wechatversion"));
+        requestContext.setDay(record.getString("ctx_day"));
+        requestContext.setWeek(record.getString("ctx_week"));
+        requestContext.setHour(record.getString("ctx_hour"));
+        requestContext.setRegion(record.getString("ctx_region"));
+        requestContext.setCity(record.getString("ctx_city"));
+        return requestContext;
+    }
+
+
+    public static UserFeature constructUserFeature(Record record) {
+        UserFeature userFeature = new UserFeature();
+        userFeature.setUid(record.get("uid").toString());
+        userFeature.setUser_cycle_bucket_7days(record.getString("u_cycle_bucket_7days"));
+        userFeature.setUser_cycle_bucket_30days(record.getString("u_cycle_bucket_30days"));
+        userFeature.setUser_share_bucket_30days(record.getString("u_share_bucket_30days"));
+
+
+        // 1day features
+        UserActionFeature user1dayActionFeature = new UserActionFeature();
+        user1dayActionFeature.setExp_cnt(record.getString("u_1day_exp_cnt"));
+        user1dayActionFeature.setClick_cnt(record.getString("u_1day_click_cnt"));
+        user1dayActionFeature.setShare_cnt(record.getString("u_1day_share_cnt"));
+        user1dayActionFeature.setReturn_cnt(record.getString("u_1day_return_cnt"));
+        user1dayActionFeature.setCtr(record.getString("u_ctr_1day"));
+        user1dayActionFeature.setStr(record.getString("u_str_1day"));
+        user1dayActionFeature.setRov(record.getString("u_rov_1day"));
+        user1dayActionFeature.setRos(record.getString("u_ros_1day"));
+        userFeature.setDay1_cnt_features(user1dayActionFeature);
+
+        // 3day features
+        UserActionFeature user3dayActionFeature = new UserActionFeature();
+        user3dayActionFeature.setExp_cnt(record.getString("u_3day_exp_cnt"));
+        user3dayActionFeature.setClick_cnt(record.getString("u_3day_click_cnt"));
+        user3dayActionFeature.setShare_cnt(record.getString("u_3day_share_cnt"));
+        user3dayActionFeature.setReturn_cnt(record.getString("u_3day_return_cnt"));
+        user3dayActionFeature.setCtr(record.getString("u_ctr_3day"));
+        user3dayActionFeature.setStr(record.getString("u_str_3day"));
+        user3dayActionFeature.setRov(record.getString("u_rov_3day"));
+        user3dayActionFeature.setRos(record.getString("u_ros_3day"));
+        userFeature.setDay3_cnt_features(user3dayActionFeature);
+
+        // 7day features
+        UserActionFeature user7dayActionFeature = new UserActionFeature();
+        user7dayActionFeature.setExp_cnt(record.getString("u_7day_exp_cnt"));
+        user7dayActionFeature.setClick_cnt(record.getString("u_7day_click_cnt"));
+        user7dayActionFeature.setShare_cnt(record.getString("u_7day_share_cnt"));
+        user7dayActionFeature.setReturn_cnt(record.getString("u_7day_return_cnt"));
+        user7dayActionFeature.setCtr(record.getString("u_ctr_7day"));
+        user7dayActionFeature.setStr(record.getString("u_str_7day"));
+        user7dayActionFeature.setRov(record.getString("u_rov_7day"));
+        user7dayActionFeature.setRos(record.getString("u_ros_7day"));
+        userFeature.setDay7_cnt_features(user7dayActionFeature);
+
+        // 3month features
+        UserActionFeature user3monthActionFeature = new UserActionFeature();
+        user3monthActionFeature.setExp_cnt(record.getString("u_3month_exp_cnt"));
+        user3monthActionFeature.setClick_cnt(record.getString("u_3month_click_cnt"));
+        user3monthActionFeature.setShare_cnt(record.getString("u_3month_share_cnt"));
+        user3monthActionFeature.setReturn_cnt(record.getString("u_3month_return_cnt"));
+        user3monthActionFeature.setCtr(record.getString("u_ctr_3month"));
+        user3monthActionFeature.setStr(record.getString("u_str_3month"));
+        user3monthActionFeature.setRov(record.getString("u_rov_3month"));
+        user3monthActionFeature.setRos(record.getString("u_ros_3month"));
+        userFeature.setMonth3_cnt_features(user3monthActionFeature);
+
+        return userFeature;
+    }
+
+
+    public static ItemFeature constructItemFeature(Record record) {
+        ItemFeature itemFeature = new ItemFeature();
+        itemFeature.setVideoId(record.getString("videoid"));
+        itemFeature.setUpId(record.getString("uid"));
+        itemFeature.setTitleLength(record.getString("i_title_len"));
+        itemFeature.setPlayLength(record.getString("i_play_len"));
+        itemFeature.setTotalTime(record.getString("total_time"));
+        itemFeature.setDaysSinceUpload(record.getString("i_days_since_upload"));
+
+        UserActionFeature user1dayActionFeature = new UserActionFeature();
+        user1dayActionFeature.setExp_cnt(record.getString("i_1day_exp_cnt"));
+        user1dayActionFeature.setClick_cnt(record.getString("i_1day_click_cnt"));
+        user1dayActionFeature.setShare_cnt(record.getString("i_1day_share_cnt"));
+        user1dayActionFeature.setReturn_cnt(record.getString("i_1day_return_cnt"));
+        user1dayActionFeature.setCtr(record.getString("i_ctr_1day"));
+        user1dayActionFeature.setStr(record.getString("i_str_1day"));
+        user1dayActionFeature.setRov(record.getString("i_rov_1day"));
+        user1dayActionFeature.setRos(record.getString("i_ros_1day"));
+        itemFeature.setDay1_cnt_features(user1dayActionFeature);
+
+        UserActionFeature user3dayActionFeature = new UserActionFeature();
+        user3dayActionFeature.setExp_cnt(record.getString("i_3day_exp_cnt"));
+        user3dayActionFeature.setClick_cnt(record.getString("i_3day_click_cnt"));
+        user3dayActionFeature.setShare_cnt(record.getString("i_3day_share_cnt"));
+        user3dayActionFeature.setReturn_cnt(record.getString("i_3day_return_cnt"));
+        user3dayActionFeature.setCtr(record.getString("i_ctr_3day"));
+        user3dayActionFeature.setStr(record.getString("i_str_3day"));
+        user3dayActionFeature.setRov(record.getString("i_rov_3day"));
+        user3dayActionFeature.setRos(record.getString("i_ros_3day"));
+        itemFeature.setDay3_cnt_features(user1dayActionFeature);
+
+        UserActionFeature user7dayActionFeature = new UserActionFeature();
+        user7dayActionFeature.setExp_cnt(record.getString("i_7day_exp_cnt"));
+        user7dayActionFeature.setClick_cnt(record.getString("i_7day_click_cnt"));
+        user7dayActionFeature.setShare_cnt(record.getString("i_7day_share_cnt"));
+        user7dayActionFeature.setReturn_cnt(record.getString("i_7day_return_cnt"));
+        user7dayActionFeature.setCtr(record.getString("i_ctr_7day"));
+        user7dayActionFeature.setStr(record.getString("i_str_7day"));
+        user7dayActionFeature.setRov(record.getString("i_rov_7day"));
+        user7dayActionFeature.setRos(record.getString("i_ros_7day"));
+        itemFeature.setDay7_cnt_features(user1dayActionFeature);
+
+        UserActionFeature user3monthActionFeature = new UserActionFeature();
+        user3monthActionFeature.setExp_cnt(record.getString("i_3month_exp_cnt"));
+        user3monthActionFeature.setClick_cnt(record.getString("i_3month_click_cnt"));
+        user3monthActionFeature.setShare_cnt(record.getString("i_3month_share_cnt"));
+        user3monthActionFeature.setReturn_cnt(record.getString("i_3month_return_cnt"));
+        user3monthActionFeature.setCtr(record.getString("i_ctr_3month"));
+        user3monthActionFeature.setStr(record.getString("i_str_3month"));
+        user3monthActionFeature.setRov(record.getString("i_rov_3month"));
+        user3monthActionFeature.setRos(record.getString("i_ros_3month"));
+        itemFeature.setMonth3_cnt_features(user3monthActionFeature);
+        return itemFeature;
+    }
+
+
+}