|
@@ -16,7 +16,9 @@ import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.data.redis.serializer.StringRedisSerializer;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
@@ -42,34 +44,40 @@ public class SparkAdFeaToRedisLoader {
|
|
|
}
|
|
|
|
|
|
|
|
|
- public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, String line) {
|
|
|
+ public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, List<String> line) {
|
|
|
Map<String, String> redisFormat = new HashMap<String, String>();
|
|
|
- String key = line.split("\t")[0];
|
|
|
- String value = line.split("\t")[1];
|
|
|
+ String key = line.get(0);
|
|
|
+ String value = line.get(1);
|
|
|
redisFormat.put(key, value);
|
|
|
redisTemplate.opsForValue().multiSet(redisFormat);
|
|
|
-
|
|
|
}
|
|
|
|
|
|
|
|
|
- static class RecordsToAdRedisKV implements Function2<Record, TableSchema, String> {
|
|
|
+
|
|
|
+ static class RecordsToAdRedisKV implements Function2<Record, TableSchema, List<String>> {
|
|
|
@Override
|
|
|
- public String call(Record record, TableSchema schema) throws Exception {
|
|
|
+ public List<String> call(Record record, TableSchema schema) throws Exception {
|
|
|
AdItemFeature adItemFeature = AdRedisFeatureConstructor.constructItemFeature(record);
|
|
|
String key = String.format(adKeyFormat, adItemFeature.getKey());
|
|
|
String value = adItemFeature.getValue();
|
|
|
- return key + "\t" + value;
|
|
|
+ List<String> kv = new ArrayList<String>();
|
|
|
+ kv.add(key);
|
|
|
+ kv.add(value);
|
|
|
+ return kv;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
- static class RecordsToUserRedisKV implements Function2<Record, TableSchema, String> {
|
|
|
+ static class RecordsToUserRedisKV implements Function2<Record, TableSchema, List<String>> {
|
|
|
@Override
|
|
|
- public String call(Record record, TableSchema schema) throws Exception {
|
|
|
+ public List<String> call(Record record, TableSchema schema) throws Exception {
|
|
|
UserAdFeature userFeature = AdRedisFeatureConstructor.constructUserFeature(record);
|
|
|
String key = String.format(userKeyFormat, userFeature.getKey());
|
|
|
String value = userFeature.getValue();
|
|
|
- return key + "\t" + value;
|
|
|
+ List<String> kv = new ArrayList<String>();
|
|
|
+ kv.add(key);
|
|
|
+ kv.add(value);
|
|
|
+ return kv;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -93,7 +101,7 @@ public class SparkAdFeaToRedisLoader {
|
|
|
|
|
|
|
|
|
// load Ad features
|
|
|
- JavaRDD<String> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, new RecordsToAdRedisKV(), Integer.valueOf(10));
|
|
|
+ JavaRDD<List<String>> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, new RecordsToAdRedisKV(), Integer.valueOf(10));
|
|
|
readAdData.foreachPartition(
|
|
|
rowIterator -> {
|
|
|
RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
|
|
@@ -103,8 +111,8 @@ public class SparkAdFeaToRedisLoader {
|
|
|
|
|
|
|
|
|
// load user features
|
|
|
- JavaRDD<String> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, new RecordsToUserRedisKV(), Integer.valueOf(30));
|
|
|
- readUserData.foreachPartition(
|
|
|
+ JavaRDD<List<String>> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, new RecordsToUserRedisKV(), Integer.valueOf(50));
|
|
|
+ readUserData.repartition(50).foreachPartition(
|
|
|
rowIterator -> {
|
|
|
RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
|
|
|
rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
|