瀏覽代碼

样本重新制作: 小时级别特征。

zhangbo 1 年之前
父節點
當前提交
bf2c8392ae

+ 125 - 125
src/main/java/examples/sparksql/SparkAdFeaToRedisLoader.java

@@ -1,125 +1,125 @@
-package examples.sparksql;
-
-import com.aliyun.odps.TableSchema;
-import com.aliyun.odps.data.Record;
-import com.tzld.piaoquan.ad.engine.commons.base.AdItemFeature;
-import com.tzld.piaoquan.ad.engine.commons.base.UserAdFeature;
-import examples.dataloader.AdRedisFeatureConstructor;
-import org.apache.spark.SparkConf;
-import org.apache.spark.aliyun.odps.OdpsOps;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function2;
-import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
-import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.serializer.StringRedisSerializer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-public class SparkAdFeaToRedisLoader {
-
-    private static final String userKeyFormat = "user:ad:%s";
-
-    private static final String adKeyFormat = "ad:%s";
-
-
-    public static RedisTemplate<String, String> buildRedisTemplate() {
-        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
-        rsc.setPort(6379);
-        rsc.setPassword("Wqsd@2019");
-        rsc.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com");
-        RedisTemplate<String, String> template = new RedisTemplate<>();
-        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
-        fac.afterPropertiesSet();
-        template.setDefaultSerializer(new StringRedisSerializer());
-        template.setConnectionFactory(fac);
-        template.afterPropertiesSet();
-        return template;
-    }
-
-
-    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, List<String> line) {
-        Map<String, String> redisFormat = new HashMap<String, String>();
-        String key = line.get(0);
-        String value = line.get(1);
-        redisFormat.put(key, value);
-        redisTemplate.opsForValue().multiSet(redisFormat);
-    }
-
-
-    static class RecordsToAdRedisKV implements Function2<Record, TableSchema, List<String>> {
-        @Override
-        public List<String> call(Record record, TableSchema schema) throws Exception {
-            AdItemFeature adItemFeature = AdRedisFeatureConstructor.constructItemFeature(record);
-            // ad feature 中的key以creativeID拼接
-            String key = String.format(adKeyFormat, adItemFeature.getCreativeId());
-            String value = adItemFeature.getValue();
-            List<String> kv = new ArrayList<String>();
-            kv.add(key);
-            kv.add(value);
-            return kv;
-        }
-    }
-
-
-    static class RecordsToUserRedisKV implements Function2<Record, TableSchema, List<String>> {
-        @Override
-        public List<String> call(Record record, TableSchema schema) throws Exception {
-            UserAdFeature userFeature = AdRedisFeatureConstructor.constructUserFeature(record);
-            List<String> kv = new ArrayList<String>();
-            String key = String.format(userKeyFormat, userFeature.getKey());
-
-            String value = userFeature.getValue();
-            kv.add(key);
-            kv.add(value);
-            return kv;
-        }
-    }
-
-
-    public static void main(String[] args) {
-
-        String partition = args[0];
-        String accessId = "LTAIWYUujJAm7CbH";
-        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
-        String odpsUrl = "http://service.odps.aliyun.com/api";
-        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
-        String project = "loghubods";
-        String tableAdInfo = "alg_ad_item_info";
-        String tableUserInfo = "alg_ad_user_info";
-
-
-        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
-        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
-        System.out.println("Read odps table...");
-
-
-        // load Ad features
-        JavaRDD<List<String>> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, new RecordsToAdRedisKV(), Integer.valueOf(10));
-        readAdData.foreachPartition(
-                rowIterator -> {
-                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
-                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
-                }
-        );
-
-
-        // load user features
-        JavaRDD<List<String>> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, new RecordsToUserRedisKV(), Integer.valueOf(50));
-        readUserData.repartition(50).foreachPartition(
-                rowIterator -> {
-                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
-                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
-                }
-        );
-    }
-
-
-}
+//package examples.sparksql;
+//
+//import com.aliyun.odps.TableSchema;
+//import com.aliyun.odps.data.Record;
+//import com.tzld.piaoquan.ad.engine.commons.base.AdItemFeature;
+//import com.tzld.piaoquan.ad.engine.commons.base.UserAdFeature;
+//import examples.dataloader.AdRedisFeatureConstructor;
+//import org.apache.spark.SparkConf;
+//import org.apache.spark.aliyun.odps.OdpsOps;
+//import org.apache.spark.api.java.JavaRDD;
+//import org.apache.spark.api.java.JavaSparkContext;
+//import org.apache.spark.api.java.function.Function2;
+//import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+//import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+//import org.springframework.data.redis.core.RedisTemplate;
+//import org.springframework.data.redis.serializer.StringRedisSerializer;
+//
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//
+//
+//public class SparkAdFeaToRedisLoader {
+//
+//    private static final String userKeyFormat = "user:ad:%s";
+//
+//    private static final String adKeyFormat = "ad:%s";
+//
+//
+//    public static RedisTemplate<String, String> buildRedisTemplate() {
+//        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
+//        rsc.setPort(6379);
+//        rsc.setPassword("Wqsd@2019");
+//        rsc.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com");
+//        RedisTemplate<String, String> template = new RedisTemplate<>();
+//        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
+//        fac.afterPropertiesSet();
+//        template.setDefaultSerializer(new StringRedisSerializer());
+//        template.setConnectionFactory(fac);
+//        template.afterPropertiesSet();
+//        return template;
+//    }
+//
+//
+//    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, List<String> line) {
+//        Map<String, String> redisFormat = new HashMap<String, String>();
+//        String key = line.get(0);
+//        String value = line.get(1);
+//        redisFormat.put(key, value);
+//        redisTemplate.opsForValue().multiSet(redisFormat);
+//    }
+//
+//
+//    static class RecordsToAdRedisKV implements Function2<Record, TableSchema, List<String>> {
+//        @Override
+//        public List<String> call(Record record, TableSchema schema) throws Exception {
+//            AdItemFeature adItemFeature = AdRedisFeatureConstructor.constructItemFeature(record);
+//            // ad feature 中的key以creativeID拼接
+//            String key = String.format(adKeyFormat, adItemFeature.getCreativeId());
+//            String value = adItemFeature.getValue();
+//            List<String> kv = new ArrayList<String>();
+//            kv.add(key);
+//            kv.add(value);
+//            return kv;
+//        }
+//    }
+//
+//
+//    static class RecordsToUserRedisKV implements Function2<Record, TableSchema, List<String>> {
+//        @Override
+//        public List<String> call(Record record, TableSchema schema) throws Exception {
+//            UserAdFeature userFeature = AdRedisFeatureConstructor.constructUserFeature(record);
+//            List<String> kv = new ArrayList<String>();
+//            String key = String.format(userKeyFormat, userFeature.getKey());
+//
+//            String value = userFeature.getValue();
+//            kv.add(key);
+//            kv.add(value);
+//            return kv;
+//        }
+//    }
+//
+//
+//    public static void main(String[] args) {
+//
+//        String partition = args[0];
+//        String accessId = "LTAIWYUujJAm7CbH";
+//        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+//        String odpsUrl = "http://service.odps.aliyun.com/api";
+//        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+//        String project = "loghubods";
+//        String tableAdInfo = "alg_ad_item_info";
+//        String tableUserInfo = "alg_ad_user_info";
+//
+//
+//        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
+//        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+//        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+//        System.out.println("Read odps table...");
+//
+//
+//        // load Ad features
+//        JavaRDD<List<String>> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, new RecordsToAdRedisKV(), Integer.valueOf(10));
+//        readAdData.foreachPartition(
+//                rowIterator -> {
+//                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
+//                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
+//                }
+//        );
+//
+//
+//        // load user features
+//        JavaRDD<List<String>> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, new RecordsToUserRedisKV(), Integer.valueOf(50));
+//        readUserData.repartition(50).foreachPartition(
+//                rowIterator -> {
+//                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
+//                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
+//                }
+//        );
+//    }
+//
+//
+//}

+ 123 - 123
src/main/java/examples/sparksql/SparkVideoFeaToRedisLoader.java

@@ -1,123 +1,123 @@
-package examples.sparksql;
-
-import com.aliyun.odps.TableSchema;
-import com.aliyun.odps.data.Record;
-
-import com.tzld.piaoquan.recommend.feature.domain.video.base.ItemFeature;
-import com.tzld.piaoquan.recommend.feature.domain.video.base.UserFeature;
-import examples.dataloader.AdRedisFeatureConstructor;
-import examples.dataloader.RecommRedisFeatureConstructor;
-import org.apache.spark.SparkConf;
-import org.apache.spark.aliyun.odps.OdpsOps;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function2;
-import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
-import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.serializer.StringRedisSerializer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-public class SparkVideoFeaToRedisLoader {
-
-    private static final String userKeyFormat = "user:video:%s";
-
-    private static final String adKeyFormat = "video:%s";
-
-
-    public static RedisTemplate<String, String> buildRedisTemplate() {
-        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
-        rsc.setPort(6379);
-        rsc.setPassword("Wqsd@2019");
-        rsc.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com");
-        RedisTemplate<String, String> template = new RedisTemplate<>();
-        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
-        fac.afterPropertiesSet();
-        template.setDefaultSerializer(new StringRedisSerializer());
-        template.setConnectionFactory(fac);
-        template.afterPropertiesSet();
-        return template;
-    }
-
-
-    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, List<String> line) {
-        Map<String, String> redisFormat = new HashMap<String, String>();
-        String key = line.get(0);
-        String value = line.get(1);
-        redisFormat.put(key, value);
-        redisTemplate.opsForValue().multiSet(redisFormat);
-    }
-
-
-    static class RecordsToVideoRedisKV implements Function2<Record, TableSchema, List<String>> {
-        @Override
-        public List<String> call(Record record, TableSchema schema) throws Exception {
-            ItemFeature itemFeature = RecommRedisFeatureConstructor.constructItemFeature(record);
-            String key = String.format(adKeyFormat, itemFeature.getKey());
-            String value = itemFeature.getValue();
-            List<String> kv = new ArrayList<String>();
-            kv.add(key);
-            kv.add(value);
-            return kv;
-        }
-    }
-
-
-    static class RecordsToUserRedisKV implements Function2<Record, TableSchema, List<String>> {
-        @Override
-        public List<String> call(Record record, TableSchema schema) throws Exception {
-            UserFeature userFeature = RecommRedisFeatureConstructor.constructUserFeature(record);
-            String key = String.format(userKeyFormat, userFeature.getKey());
-            String value = userFeature.getValue();
-            List<String> kv = new ArrayList<String>();
-            kv.add(key);
-            kv.add(value);
-            return kv;
-        }
-    }
-
-
-    public static void main(String[] args) {
-
-        String partition = args[0];
-        String accessId = "LTAIWYUujJAm7CbH";
-        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
-        String odpsUrl = "http://service.odps.aliyun.com/api";
-        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
-        String project = "loghubods";
-        String tableItemInfo = "alg_recsys_video_info";
-        String tableUserInfo = "alg_recsys_user_info";
-
-        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
-        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
-        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
-        System.out.println("Read odps table...");
-
-
-        // load Ad features
-        JavaRDD<List<String>> readAdData = odpsOps.readTableWithJava(project, tableItemInfo, partition, new RecordsToVideoRedisKV(), Integer.valueOf(10));
-        readAdData.sample(false, 0.0001).foreachPartition(
-                rowIterator -> {
-                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
-                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
-                }
-        );
-
-
-        // load user features
-        JavaRDD<List<String>> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, new RecordsToUserRedisKV(), Integer.valueOf(50));
-        readUserData.repartition(50).sample(false, 0.00001).foreachPartition(
-                rowIterator -> {
-                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
-                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
-                }
-        );
-    }
-
-
-}
+//package examples.sparksql;
+//
+//import com.aliyun.odps.TableSchema;
+//import com.aliyun.odps.data.Record;
+//
+//import com.tzld.piaoquan.recommend.feature.domain.video.base.ItemFeature;
+//import com.tzld.piaoquan.recommend.feature.domain.video.base.UserFeature;
+//import examples.dataloader.AdRedisFeatureConstructor;
+//import examples.dataloader.RecommRedisFeatureConstructor;
+//import org.apache.spark.SparkConf;
+//import org.apache.spark.aliyun.odps.OdpsOps;
+//import org.apache.spark.api.java.JavaRDD;
+//import org.apache.spark.api.java.JavaSparkContext;
+//import org.apache.spark.api.java.function.Function2;
+//import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+//import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
+//import org.springframework.data.redis.core.RedisTemplate;
+//import org.springframework.data.redis.serializer.StringRedisSerializer;
+//
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//
+//
+//public class SparkVideoFeaToRedisLoader {
+//
+//    private static final String userKeyFormat = "user:video:%s";
+//
+//    private static final String adKeyFormat = "video:%s";
+//
+//
+//    public static RedisTemplate<String, String> buildRedisTemplate() {
+//        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
+//        rsc.setPort(6379);
+//        rsc.setPassword("Wqsd@2019");
+//        rsc.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com");
+//        RedisTemplate<String, String> template = new RedisTemplate<>();
+//        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
+//        fac.afterPropertiesSet();
+//        template.setDefaultSerializer(new StringRedisSerializer());
+//        template.setConnectionFactory(fac);
+//        template.afterPropertiesSet();
+//        return template;
+//    }
+//
+//
+//    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, List<String> line) {
+//        Map<String, String> redisFormat = new HashMap<String, String>();
+//        String key = line.get(0);
+//        String value = line.get(1);
+//        redisFormat.put(key, value);
+//        redisTemplate.opsForValue().multiSet(redisFormat);
+//    }
+//
+//
+//    static class RecordsToVideoRedisKV implements Function2<Record, TableSchema, List<String>> {
+//        @Override
+//        public List<String> call(Record record, TableSchema schema) throws Exception {
+//            ItemFeature itemFeature = RecommRedisFeatureConstructor.constructItemFeature(record);
+//            String key = String.format(adKeyFormat, itemFeature.getKey());
+//            String value = itemFeature.getValue();
+//            List<String> kv = new ArrayList<String>();
+//            kv.add(key);
+//            kv.add(value);
+//            return kv;
+//        }
+//    }
+//
+//
+//    static class RecordsToUserRedisKV implements Function2<Record, TableSchema, List<String>> {
+//        @Override
+//        public List<String> call(Record record, TableSchema schema) throws Exception {
+//            UserFeature userFeature = RecommRedisFeatureConstructor.constructUserFeature(record);
+//            String key = String.format(userKeyFormat, userFeature.getKey());
+//            String value = userFeature.getValue();
+//            List<String> kv = new ArrayList<String>();
+//            kv.add(key);
+//            kv.add(value);
+//            return kv;
+//        }
+//    }
+//
+//
+//    public static void main(String[] args) {
+//
+//        String partition = args[0];
+//        String accessId = "LTAIWYUujJAm7CbH";
+//        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+//        String odpsUrl = "http://service.odps.aliyun.com/api";
+//        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
+//        String project = "loghubods";
+//        String tableItemInfo = "alg_recsys_video_info";
+//        String tableUserInfo = "alg_recsys_user_info";
+//
+//        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
+//        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+//        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
+//        System.out.println("Read odps table...");
+//
+//
+//        // load Ad features
+//        JavaRDD<List<String>> readAdData = odpsOps.readTableWithJava(project, tableItemInfo, partition, new RecordsToVideoRedisKV(), Integer.valueOf(10));
+//        readAdData.sample(false, 0.0001).foreachPartition(
+//                rowIterator -> {
+//                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
+//                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
+//                }
+//        );
+//
+//
+//        // load user features
+//        JavaRDD<List<String>> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, new RecordsToUserRedisKV(), Integer.valueOf(50));
+//        readUserData.repartition(50).sample(false, 0.00001).foreachPartition(
+//                rowIterator -> {
+//                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
+//                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
+//                }
+//        );
+//    }
+//
+//
+//}

+ 9 - 9
src/main/scala/com/aliyun/odps/spark/examples/makedata/makedata_06_strData.scala

@@ -12,7 +12,7 @@ import org.apache.spark.sql.SparkSession
 import java.util
 import java.util
 import scala.collection.JavaConversions._
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.collection.mutable
-import java.util.{Arrays, HashMap, HashSet, Map, Set}
+import java.util.{Arrays, HashMap, HashSet, Map}
 import com.alibaba.fastjson.JSONObject
 import com.alibaba.fastjson.JSONObject
 
 
 /*
 /*
@@ -144,14 +144,14 @@ object makedata_06_strData {
 
 
           // 1:特征聚合到map中
           // 1:特征聚合到map中
           val result = new util.HashMap[String, String]()
           val result = new util.HashMap[String, String]()
-          result += f1
-          result += f2
-          result += f3
-          result += f4
-          result += f5
-          result += f6
-          result += f7
-          result += f8
+          result ++= f1
+          result ++= f2
+          result ++= f3
+          result ++= f4
+          result ++= f5
+          result ++= f6
+          result ++= f7
+          result ++= f8
           val names = Set(
           val names = Set(
             "ctx_week", "ctx_hour", "ctx_region", "ctx_city",
             "ctx_week", "ctx_hour", "ctx_region", "ctx_city",
             "machineinfo_brand", "machineinfo_model", "machineinfo_platform", "machineinfo_system",
             "machineinfo_brand", "machineinfo_model", "machineinfo_platform", "machineinfo_system",