|  | @@ -0,0 +1,95 @@
 | 
											
												
													
														|  | 
 |  | +package examples.sparksql;
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +import com.aliyun.odps.TableSchema;
 | 
											
												
													
														|  | 
 |  | +import com.aliyun.odps.data.Record;
 | 
											
												
													
														|  | 
 |  | +import com.tzld.piaoquan.recommend.feature.domain.ad.base.AdItemFeature;
 | 
											
												
													
														|  | 
 |  | +import com.tzld.piaoquan.recommend.feature.domain.ad.base.UserAdFeature;
 | 
											
												
													
														|  | 
 |  | +import examples.dataloader.AdRedisFeatureConstructor;
 | 
											
												
													
														|  | 
 |  | +import org.apache.spark.SparkConf;
 | 
											
												
													
														|  | 
 |  | +import org.apache.spark.aliyun.odps.OdpsOps;
 | 
											
												
													
														|  | 
 |  | +import org.apache.spark.api.java.JavaRDD;
 | 
											
												
													
														|  | 
 |  | +import org.apache.spark.api.java.JavaSparkContext;
 | 
											
												
													
														|  | 
 |  | +import org.apache.spark.api.java.function.Function2;
 | 
											
												
													
														|  | 
 |  | +import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
 | 
											
												
													
														|  | 
 |  | +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 | 
											
												
													
														|  | 
 |  | +import org.springframework.data.redis.core.RedisTemplate;
 | 
											
												
													
														|  | 
 |  | +import org.springframework.data.redis.serializer.StringRedisSerializer;
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +import java.util.ArrayList;
 | 
											
												
													
														|  | 
 |  | +import java.util.HashMap;
 | 
											
												
													
														|  | 
 |  | +import java.util.List;
 | 
											
												
													
														|  | 
 |  | +import java.util.Map;
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +public class SparkAdFeaToRedisHourLoader {
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    private static final String adKeyFormat = "ad:%s";
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    public static RedisTemplate<String, String> buildRedisTemplate() {
 | 
											
												
													
														|  | 
 |  | +        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
 | 
											
												
													
														|  | 
 |  | +        rsc.setPort(6379);
 | 
											
												
													
														|  | 
 |  | +        rsc.setPassword("Wqsd@2019");
 | 
											
												
													
														|  | 
 |  | +        rsc.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com");
 | 
											
												
													
														|  | 
 |  | +        RedisTemplate<String, String> template = new RedisTemplate<>();
 | 
											
												
													
														|  | 
 |  | +        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
 | 
											
												
													
														|  | 
 |  | +        fac.afterPropertiesSet();
 | 
											
												
													
														|  | 
 |  | +        template.setDefaultSerializer(new StringRedisSerializer());
 | 
											
												
													
														|  | 
 |  | +        template.setConnectionFactory(fac);
 | 
											
												
													
														|  | 
 |  | +        template.afterPropertiesSet();
 | 
											
												
													
														|  | 
 |  | +        return template;
 | 
											
												
													
														|  | 
 |  | +    }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, List<String> line) {
 | 
											
												
													
														|  | 
 |  | +        Map<String, String> redisFormat = new HashMap<String, String>();
 | 
											
												
													
														|  | 
 |  | +        String key = line.get(0);
 | 
											
												
													
														|  | 
 |  | +        String value = line.get(1);
 | 
											
												
													
														|  | 
 |  | +        redisFormat.put(key, value);
 | 
											
												
													
														|  | 
 |  | +        redisTemplate.opsForValue().multiSet(redisFormat);
 | 
											
												
													
														|  | 
 |  | +    }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    static class RecordsToAdRedisKV implements Function2<Record, TableSchema, List<String>> {
 | 
											
												
													
														|  | 
 |  | +        @Override
 | 
											
												
													
														|  | 
 |  | +        public List<String> call(Record record, TableSchema schema) throws Exception {
 | 
											
												
													
														|  | 
 |  | +            AdItemFeature adItemFeature = AdRedisFeatureConstructor.constructItemFeature(record);
 | 
											
												
													
														|  | 
 |  | +            // ad feature 中的key以creativeID拼接
 | 
											
												
													
														|  | 
 |  | +            String key = String.format(adKeyFormat, adItemFeature.getCreativeId());
 | 
											
												
													
														|  | 
 |  | +            String value = adItemFeature.getValue();
 | 
											
												
													
														|  | 
 |  | +            List<String> kv = new ArrayList<String>();
 | 
											
												
													
														|  | 
 |  | +            kv.add(key);
 | 
											
												
													
														|  | 
 |  | +            kv.add(value);
 | 
											
												
													
														|  | 
 |  | +            return kv;
 | 
											
												
													
														|  | 
 |  | +        }
 | 
											
												
													
														|  | 
 |  | +    }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +    public static void main(String[] args) {
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +        String partition = args[0];
 | 
											
												
													
														|  | 
 |  | +        String accessId = "LTAIWYUujJAm7CbH";
 | 
											
												
													
														|  | 
 |  | +        String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
 | 
											
												
													
														|  | 
 |  | +        String odpsUrl = "http://service.odps.aliyun.com/api";
 | 
											
												
													
														|  | 
 |  | +        String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
 | 
											
												
													
														|  | 
 |  | +        String project = "loghubods";
 | 
											
												
													
														|  | 
 |  | +        String tableAdInfo = "alg_ad_item_info";
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +        SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
 | 
											
												
													
														|  | 
 |  | +        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
 | 
											
												
													
														|  | 
 |  | +        OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
 | 
											
												
													
														|  | 
 |  | +        System.out.println("Read odps table...");
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +        // load Ad features
 | 
											
												
													
														|  | 
 |  | +        JavaRDD<List<String>> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, new RecordsToAdRedisKV(), Integer.valueOf(10));
 | 
											
												
													
														|  | 
 |  | +        readAdData.foreachPartition(
 | 
											
												
													
														|  | 
 |  | +                rowIterator -> {
 | 
											
												
													
														|  | 
 |  | +                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
 | 
											
												
													
														|  | 
 |  | +                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
 | 
											
												
													
														|  | 
 |  | +                }
 | 
											
												
													
														|  | 
 |  | +        );
 | 
											
												
													
														|  | 
 |  | +    }
 | 
											
												
													
														|  | 
 |  | +
 | 
											
												
													
														|  | 
 |  | +}
 |