SparkAdFeaToRedisLoader.java 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package examples.sparksql;
  2. import com.aliyun.odps.TableSchema;
  3. import com.aliyun.odps.data.Record;
  4. import com.tzld.piaoquan.recommend.feature.domain.ad.base.*;
  5. import examples.dataloader.AdRedisFeatureConstructor;
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.aliyun.odps.OdpsOps;
  8. import org.apache.spark.api.java.JavaRDD;
  9. import org.apache.spark.api.java.JavaSparkContext;
  10. import org.apache.spark.api.java.function.Function2;
  11. import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
  12. import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
  13. import org.springframework.data.redis.core.RedisTemplate;
  14. import org.springframework.data.redis.serializer.StringRedisSerializer;
  15. import java.util.ArrayList;
  16. import java.util.HashMap;
  17. import java.util.List;
  18. import java.util.Map;
  19. public class SparkAdFeaToRedisLoader {
  20. private static final String userKeyFormat = "user:ad:%s";
  21. private static final String adKeyFormat = "ad:%s";
  22. public static RedisTemplate<String, String> buildRedisTemplate() {
  23. RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
  24. rsc.setPort(6379);
  25. rsc.setPassword("Wqsd@2019");
  26. rsc.setHostName("r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com");
  27. RedisTemplate<String, String> template = new RedisTemplate<>();
  28. JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
  29. fac.afterPropertiesSet();
  30. template.setDefaultSerializer(new StringRedisSerializer());
  31. template.setConnectionFactory(fac);
  32. template.afterPropertiesSet();
  33. return template;
  34. }
  35. public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, List<String> line) {
  36. Map<String, String> redisFormat = new HashMap<String, String>();
  37. String key = line.get(0);
  38. String value = line.get(1);
  39. redisFormat.put(key, value);
  40. redisTemplate.opsForValue().multiSet(redisFormat);
  41. }
  42. static class RecordsToAdRedisKV implements Function2<Record, TableSchema, List<String>> {
  43. @Override
  44. public List<String> call(Record record, TableSchema schema) throws Exception {
  45. AdItemFeature adItemFeature = AdRedisFeatureConstructor.constructItemFeature(record);
  46. // ad feature 中的key以creativeID拼接
  47. String key = String.format(adKeyFormat, adItemFeature.getCreativeId());
  48. String value = adItemFeature.getValue();
  49. List<String> kv = new ArrayList<String>();
  50. kv.add(key);
  51. kv.add(value);
  52. return kv;
  53. }
  54. }
  55. static class RecordsToUserRedisKV implements Function2<Record, TableSchema, List<String>> {
  56. @Override
  57. public List<String> call(Record record, TableSchema schema) throws Exception {
  58. UserAdFeature userFeature = AdRedisFeatureConstructor.constructUserFeature(record);
  59. List<String> kv = new ArrayList<String>();
  60. String key = String.format(userKeyFormat, userFeature.getKey());
  61. String value = userFeature.getValue();
  62. kv.add(key);
  63. kv.add(value);
  64. return kv;
  65. }
  66. }
  67. public static void main(String[] args) {
  68. String partition = args[0];
  69. String accessId = "LTAIWYUujJAm7CbH";
  70. String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
  71. String odpsUrl = "http://service.odps.aliyun.com/api";
  72. String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun-inc.com";
  73. String project = "loghubods";
  74. String tableAdInfo = "alg_ad_item_info";
  75. String tableUserInfo = "alg_ad_user_info";
  76. SparkConf sparkConf = new SparkConf().setAppName("E-MapReduce Demo 3-2: Spark MaxCompute Demo (Java)");
  77. JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  78. OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
  79. System.out.println("Read odps table...");
  80. // load Ad features
  81. JavaRDD<List<String>> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, new RecordsToAdRedisKV(), Integer.valueOf(10));
  82. readAdData.foreachPartition(
  83. rowIterator -> {
  84. RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
  85. rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
  86. }
  87. );
  88. // load user features
  89. JavaRDD<List<String>> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, new RecordsToUserRedisKV(), Integer.valueOf(50));
  90. readUserData.repartition(50).foreachPartition(
  91. rowIterator -> {
  92. RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
  93. rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
  94. }
  95. );
  96. }
  97. }