123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- package examples.sparksql;
- import com.aliyun.odps.TableSchema;
- import com.aliyun.odps.data.Record;
- import com.tzld.piaoquan.recommend.feature.domain.ad.base.*;
- import examples.dataloader.AdRedisFeatureConstructor;
- import org.apache.spark.SparkConf;
- import org.apache.spark.aliyun.odps.OdpsOps;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.Function2;
- import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
- import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.data.redis.serializer.StringRedisSerializer;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- public class SparkAdFeaToRedisLoader {
- private static final String userKeyFormat = "user:ad:%s";
- private static final String adKeyFormat = "ad:%s";
- public static RedisTemplate<String, String> buildRedisTemplate() {
- RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
- rsc.setPort(6379);
- rsc.setPassword("Wqsd@2019");
- rsc.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com");
- RedisTemplate<String, String> template = new RedisTemplate<>();
- JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
- fac.afterPropertiesSet();
- template.setDefaultSerializer(new StringRedisSerializer());
- template.setConnectionFactory(fac);
- template.afterPropertiesSet();
- return template;
- }
- public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, List<String> line) {
- Map<String, String> redisFormat = new HashMap<String, String>();
- String key = line.get(0);
- String value = line.get(1);
- redisFormat.put(key, value);
- redisTemplate.opsForValue().multiSet(redisFormat);
- }
- static class RecordsToAdRedisKV implements Function2<Record, TableSchema, List<String>> {
- @Override
- public List<String> call(Record record, TableSchema schema) throws Exception {
- AdItemFeature adItemFeature = AdRedisFeatureConstructor.constructItemFeature(record);
- // ad feature 中的key以creativeID拼接
- String key = String.format(adKeyFormat, adItemFeature.getCreativeId());
- String value = adItemFeature.getValue();
- List<String> kv = new ArrayList<String>();
- kv.add(key);
- kv.add(value);
- return kv;
- }
- }
- static class RecordsToUserRedisKV implements Function2<Record, TableSchema, List<String>> {
- @Override
- public List<String> call(Record record, TableSchema schema) throws Exception {
- UserAdFeature userFeature = AdRedisFeatureConstructor.constructUserFeature(record);
- List<String> kv = new ArrayList<String>();
- String key = String.format(userKeyFormat, userFeature.getKey());
- String value = userFeature.getValue();
- kv.add(key);
- kv.add(value);
- return kv;
- }
- }
- public static void main(String[] args) {
- String partition = args[0];
- String accessId = "LTAIWYUujJAm7CbH";
- String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
- String odpsUrl = "http://service.odps.aliyun.com/api";
- String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
- String project = "loghubods";
- String tableAdInfo = "alg_ad_item_info";
- String tableUserInfo = "alg_ad_user_info";
- SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
- OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
- System.out.println("Read odps table...");
- // load Ad features
- JavaRDD<List<String>> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, new RecordsToAdRedisKV(), Integer.valueOf(10));
- readAdData.foreachPartition(
- rowIterator -> {
- RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
- rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
- }
- );
- // load user features
- JavaRDD<List<String>> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, new RecordsToUserRedisKV(), Integer.valueOf(50));
- readUserData.repartition(50).foreachPartition(
- rowIterator -> {
- RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
- rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
- }
- );
- }
- }
|