sunmingze 1 рік тому
батько
коміт
b64bdc1c92

+ 35 - 22
src/main/java/examples/sparksql/SparkAdFeaToRedisLoader.java

@@ -1,20 +1,21 @@
 package examples.sparksql;
 
+import com.aliyun.odps.TableSchema;
 import com.aliyun.odps.data.Record;
 import com.tzld.piaoquan.ad.engine.commons.base.AdItemFeature;
 import com.tzld.piaoquan.ad.engine.commons.base.UserAdFeature;
-import com.tzld.piaoquan.data.base.UserFeature;
 import examples.dataloader.AdRedisFeatureConstructor;
-import examples.dataloader.RecommRedisFeatureConstructor;
 import org.apache.spark.SparkConf;
 import org.apache.spark.aliyun.odps.OdpsOps;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
 import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
 import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -40,23 +41,36 @@ public class SparkAdFeaToRedisLoader {
         return template;
     }
 
-    public static void loadUserAdFeatureToRedis(RedisTemplate<String, String> redisTemplate, Record line) {
-        Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
-        UserAdFeature userFeature = AdRedisFeatureConstructor.constructUserFeature(line);
-        String key = String.format(userKeyFormat, userFeature.getKey());
-        String value = userFeature.getValue();
-        userFeaRedisFormat.put(key, value);
-        redisTemplate.opsForValue().multiSet(userFeaRedisFormat);
+
+    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, String line) {
+        Map<String, String> redisFormat = new HashMap<String, String>();
+        String key = line.split("\t")[0];
+        String value = line.split("\t")[1];
+        redisFormat.put(key, value);
+        redisTemplate.opsForValue().multiSet(redisFormat);
+
+    }
+
+
+    static class RecordsToAdRedisKV implements Function2<Record, TableSchema, String> {
+        @Override
+        public String call(Record record, TableSchema schema) throws Exception {
+            AdItemFeature adItemFeature = AdRedisFeatureConstructor.constructItemFeature(record);
+            String key = String.format(adKeyFormat, adItemFeature.getKey());
+            String value = adItemFeature.getValue();
+            return key + "\t" + value;
+        }
     }
 
 
-    public static void loadAdFeatureToRedis(RedisTemplate<String, String> redisTemplate, Record line) {
-        Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
-        AdItemFeature adItemFeature = AdRedisFeatureConstructor.constructItemFeature(line);
-        String key = String.format(userKeyFormat, adItemFeature.getKey());
-        String value = adItemFeature.getValue();
-        userFeaRedisFormat.put(key, value);
-        redisTemplate.opsForValue().multiSet(userFeaRedisFormat);
+    static class RecordsToUserRedisKV implements Function2<Record, TableSchema, String> {
+        @Override
+        public String call(Record record, TableSchema schema) throws Exception {
+            UserAdFeature userFeature = AdRedisFeatureConstructor.constructUserFeature(record);
+            String key = String.format(userKeyFormat, userFeature.getKey());
+            String value = userFeature.getValue();
+            return key + "\t" + value;
+        }
     }
 
 
@@ -79,25 +93,24 @@ public class SparkAdFeaToRedisLoader {
 
 
         // load Ad features
-        JavaRDD<Record> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, null, Integer.valueOf(10));
+        JavaRDD<String> readAdData = odpsOps.readTableWithJava(project, tableAdInfo, partition, new RecordsToAdRedisKV(), Integer.valueOf(10));
         readAdData.foreachPartition(
                 rowIterator -> {
                     RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
-                    rowIterator.forEachRemaining(line -> loadAdFeatureToRedis(redisTemplate, line));
+                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
                 }
         );
 
 
         // load user features
-        JavaRDD<Record> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, null, Integer.valueOf(10));
+        JavaRDD<String> readUserData = odpsOps.readTableWithJava(project, tableUserInfo, partition, new RecordsToUserRedisKV(), Integer.valueOf(30));
         readUserData.foreachPartition(
                 rowIterator -> {
                     RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
-                    rowIterator.forEachRemaining(line -> loadUserAdFeatureToRedis(redisTemplate, line));
+                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
                 }
         );
-
-
     }
 
+
 }