package examples.sparksql; import com.aliyun.odps.TableSchema; import com.aliyun.odps.data.Record; import com.tzld.piaoquan.recommend.feature.domain.ad.base.*; import examples.dataloader.AdRedisFeatureConstructor; import org.apache.spark.SparkConf; import org.apache.spark.aliyun.odps.OdpsOps; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.StringRedisSerializer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class SparkAdFeaToRedisLoader { private static final String userKeyFormat = "user:ad:%s"; private static final String adKeyFormat = "ad:%s"; public static RedisTemplate buildRedisTemplate() { RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration(); rsc.setPort(6379); rsc.setPassword("Wqsd@2019"); rsc.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com"); RedisTemplate template = new RedisTemplate<>(); JedisConnectionFactory fac = new JedisConnectionFactory(rsc); fac.afterPropertiesSet(); template.setDefaultSerializer(new StringRedisSerializer()); template.setConnectionFactory(fac); template.afterPropertiesSet(); return template; } public static void loadFeatureToRedis(RedisTemplate redisTemplate, List line) { Map redisFormat = new HashMap(); String key = line.get(0); String value = line.get(1); redisFormat.put(key, value); redisTemplate.opsForValue().multiSet(redisFormat); } static class RecordsToAdRedisKV implements Function2> { @Override public List call(Record record, TableSchema schema) throws Exception { AdItemFeature adItemFeature = AdRedisFeatureConstructor.constructItemFeature(record); // ad feature 中的key以creativeID拼接 String key = String.format(adKeyFormat, adItemFeature.getCreativeId()); String value = adItemFeature.getValue(); List kv = new ArrayList(); kv.add(key); kv.add(value); return kv; } } static class RecordsToUserRedisKV implements Function2> { @Override public List call(Record record, TableSchema schema) throws Exception { UserAdFeature userFeature = AdRedisFeatureConstructor.constructUserFeature(record); List kv = new ArrayList(); String key = String.format(userKeyFormat, userFeature.getKey()); String value = userFeature.getValue(); kv.add(key); kv.add(value); return kv; } } public static void main(String[] args) { String partition = args[0]; String accessId = "LTAIWYUujJAm7CbH"; String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"; String odpsUrl = "http://service.odps.aliyun.com/api"; String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com"; String project = "loghubods"; String tableAdInfo = "alg_ad_item_info"; String tableUserInfo = "alg_ad_user_info"; SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl); System.out.println("Read odps table..."); // load Ad features JavaRDD> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, new RecordsToAdRedisKV(), Integer.valueOf(10)); readAdData.foreachPartition( rowIterator -> { RedisTemplate redisTemplate = buildRedisTemplate(); rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line)); } ); // load user features JavaRDD> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, new RecordsToUserRedisKV(), Integer.valueOf(50)); readUserData.repartition(50).foreachPartition( rowIterator -> { RedisTemplate redisTemplate = buildRedisTemplate(); rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line)); } ); } }