sunmingze 1 year ago
parent
commit
8ff1a0ff2c

+ 10 - 7
src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkEMRReadMaxCompute.java → src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkEMRLoaderDemo.java

@@ -12,11 +12,11 @@ import java.util.ArrayList;
 import java.util.List;
 
 
-public class SparkEMRReadMaxCompute {
+public class SparkEMRLoaderDemo {
 
     public static void main(String[] args) {
 
-        String partition = "20231210";
+        String partition = "dt=20231123";
         String accessId = "LTAIWYUujJAm7CbH";
         String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
         String odpsUrl = "http://service.cn.maxcompute.aliyun.com/api";
@@ -30,18 +30,21 @@ public class SparkEMRReadMaxCompute {
         OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
 
         System.out.println("Read odps table...");
-        JavaRDD<List<Long>> readData = odpsOps.readTableWithJava(project, partition, table, new RecordToLongs(), Integer.valueOf(10));
+        JavaRDD<List<String>> readData = odpsOps.readTableWithJava(project, table, partition, new RecordToLongs(), Integer.valueOf(10));
 
         System.out.println("counts: ");
         System.out.println(readData.count());
     }
 
-    static class RecordToLongs implements Function2<Record, TableSchema, List<Long>> {
+    static class RecordToLongs implements Function2<Record, TableSchema, List<String>> {
         @Override
-        public List<Long> call(Record record, TableSchema schema) throws Exception {
-            List<Long> ret = new ArrayList<Long>();
+        public List<String> call(Record record, TableSchema schema) throws Exception {
+            List<String> ret = new ArrayList<String>();
             for (int i = 0; i < schema.getColumns().size(); i++) {
-                ret.add(record.getBigint(i));
+                if (record.get(i) != null)
+                    ret.add(record.get(i).toString());
+                else
+                    ret.add("0.0");
             }
             return ret;
         }

+ 4 - 12
src/main/java/com/aliyun/odps/spark/examples/sparksql/JavaSparkSQL.java → src/main/java/com/aliyun/odps/spark/examples/sparksql/SparkLoadDataRedisMaxCompute.java

@@ -2,31 +2,23 @@
 
 package com.aliyun.odps.spark.examples.sparksql;
 
-import com.aliyun.odps.Odps;
-import com.aliyun.odps.cupid.CupidSession;
+import com.aliyun.odps.data.Record;
 import com.tzld.piaoquan.data.base.UserFeature;
 import com.tzld.piaoquan.data.dataloader.FeatureConstructor;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.RowFactory;
-import org.apache.spark.sql.types.*;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import org.apache.spark.sql.types.StructField;
 import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
 import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 
-public class JavaSparkSQL {
+public class SparkLoadDataRedisMaxCompute {
 
     private static final String userKeyFormat = "user:%s";
 
@@ -44,7 +36,7 @@ public class JavaSparkSQL {
         return template;
     }
 
-    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, Row line) {
+    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, Record line) {
         Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
         UserFeature userFeature = FeatureConstructor.constructUserFeature(line);
         String key = String.format(userKeyFormat, userFeature.getKey());
@@ -78,7 +70,7 @@ public class JavaSparkSQL {
         rptdf.toJavaRDD().foreachPartition(
                 rowIterator -> {
                     RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
-                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
+                 //   rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
                 }
         );
 

+ 90 - 89
src/main/java/com/tzld/piaoquan/data/dataloader/FeatureConstructor.java

@@ -3,6 +3,7 @@ package com.tzld.piaoquan.data.dataloader;
 
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
 import com.tzld.piaoquan.data.base.ItemFeature;
 import com.tzld.piaoquan.data.base.RequestContext;
 import com.tzld.piaoquan.data.base.UserActionFeature;
@@ -28,135 +29,135 @@ public class FeatureConstructor {
     private static final Account account = new AliyunAccount(ODPS_CONFIG.get("ACCESSID"), ODPS_CONFIG.get("ACCESSKEY"));
 
 
-    public static RequestContext constructRequestContext(Row record) {
+    public static RequestContext constructRequestContext(Record record) {
         RequestContext requestContext = new RequestContext();
-        requestContext.setApptype(record.getAs("apptype"));
-        requestContext.setMachineinfo_brand(record.getAs("machineinfo_brand"));
-        requestContext.setMachineinfo_model(record.getAs("machineinfo_model"));
-        requestContext.setMachineinfo_platform(record.getAs("machineinfo_platform"));
-        requestContext.setMachineinfo_sdkversion(record.getAs("machineinfo_sdkversion"));
-        requestContext.setMachineinfo_system(record.getAs("machineinfo_system"));
-        requestContext.setMachineinfo_wechatversion(record.getAs("machineinfo_wechatversion"));
-        requestContext.setDay(record.getAs("ctx_day"));
-        requestContext.setWeek(record.getAs("ctx_week"));
-        requestContext.setHour(record.getAs("ctx_hour"));
-        requestContext.setRegion(record.getAs("ctx_region"));
-        requestContext.setCity(record.getAs("ctx_city"));
+        requestContext.setApptype(record.getString("apptype"));
+        requestContext.setMachineinfo_brand(record.getString("machineinfo_brand"));
+        requestContext.setMachineinfo_model(record.getString("machineinfo_model"));
+        requestContext.setMachineinfo_platform(record.getString("machineinfo_platform"));
+        requestContext.setMachineinfo_sdkversion(record.getString("machineinfo_sdkversion"));
+        requestContext.setMachineinfo_system(record.getString("machineinfo_system"));
+        requestContext.setMachineinfo_wechatversion(record.getString("machineinfo_wechatversion"));
+        requestContext.setDay(record.getString("ctx_day"));
+        requestContext.setWeek(record.getString("ctx_week"));
+        requestContext.setHour(record.getString("ctx_hour"));
+        requestContext.setRegion(record.getString("ctx_region"));
+        requestContext.setCity(record.getString("ctx_city"));
         return requestContext;
     }
 
 
-    public static UserFeature constructUserFeature(Row record) {
+    public static UserFeature constructUserFeature(Record record) {
         UserFeature userFeature = new UserFeature();
-        userFeature.setUid(record.getAs("uid").toString());
-        userFeature.setUser_cycle_bucket_7days(record.getAs("u_cycle_bucket_7days"));
-        userFeature.setUser_cycle_bucket_30days(record.getAs("u_cycle_bucket_30days"));
-        userFeature.setUser_share_bucket_30days(record.getAs("u_share_bucket_30days"));
+        userFeature.setUid(record.get("uid").toString());
+        userFeature.setUser_cycle_bucket_7days(record.getString("u_cycle_bucket_7days"));
+        userFeature.setUser_cycle_bucket_30days(record.getString("u_cycle_bucket_30days"));
+        userFeature.setUser_share_bucket_30days(record.getString("u_share_bucket_30days"));
 
 
         // 1day features
         UserActionFeature user1dayActionFeature = new UserActionFeature();
-        user1dayActionFeature.setExp_cnt(record.getAs("u_1day_exp_cnt"));
-        user1dayActionFeature.setClick_cnt(record.getAs("u_1day_click_cnt"));
-        user1dayActionFeature.setShare_cnt(record.getAs("u_1day_share_cnt"));
-        user1dayActionFeature.setReturn_cnt(record.getAs("u_1day_return_cnt"));
-        user1dayActionFeature.setCtr(record.getAs("u_ctr_1day"));
-        user1dayActionFeature.setStr(record.getAs("u_str_1day"));
-        user1dayActionFeature.setRov(record.getAs("u_rov_1day"));
-        user1dayActionFeature.setRos(record.getAs("u_ros_1day"));
+        user1dayActionFeature.setExp_cnt(record.getString("u_1day_exp_cnt"));
+        user1dayActionFeature.setClick_cnt(record.getString("u_1day_click_cnt"));
+        user1dayActionFeature.setShare_cnt(record.getString("u_1day_share_cnt"));
+        user1dayActionFeature.setReturn_cnt(record.getString("u_1day_return_cnt"));
+        user1dayActionFeature.setCtr(record.getString("u_ctr_1day"));
+        user1dayActionFeature.setStr(record.getString("u_str_1day"));
+        user1dayActionFeature.setRov(record.getString("u_rov_1day"));
+        user1dayActionFeature.setRos(record.getString("u_ros_1day"));
         userFeature.setDay1_cnt_features(user1dayActionFeature);
 
         // 3day features
         UserActionFeature user3dayActionFeature = new UserActionFeature();
-        user3dayActionFeature.setExp_cnt(record.getAs("u_3day_exp_cnt"));
-        user3dayActionFeature.setClick_cnt(record.getAs("u_3day_click_cnt"));
-        user3dayActionFeature.setShare_cnt(record.getAs("u_3day_share_cnt"));
-        user3dayActionFeature.setReturn_cnt(record.getAs("u_3day_return_cnt"));
-        user3dayActionFeature.setCtr(record.getAs("u_ctr_3day"));
-        user3dayActionFeature.setStr(record.getAs("u_str_3day"));
-        user3dayActionFeature.setRov(record.getAs("u_rov_3day"));
-        user3dayActionFeature.setRos(record.getAs("u_ros_3day"));
+        user3dayActionFeature.setExp_cnt(record.getString("u_3day_exp_cnt"));
+        user3dayActionFeature.setClick_cnt(record.getString("u_3day_click_cnt"));
+        user3dayActionFeature.setShare_cnt(record.getString("u_3day_share_cnt"));
+        user3dayActionFeature.setReturn_cnt(record.getString("u_3day_return_cnt"));
+        user3dayActionFeature.setCtr(record.getString("u_ctr_3day"));
+        user3dayActionFeature.setStr(record.getString("u_str_3day"));
+        user3dayActionFeature.setRov(record.getString("u_rov_3day"));
+        user3dayActionFeature.setRos(record.getString("u_ros_3day"));
         userFeature.setDay3_cnt_features(user3dayActionFeature);
 
         // 7day features
         UserActionFeature user7dayActionFeature = new UserActionFeature();
-        user7dayActionFeature.setExp_cnt(record.getAs("u_7day_exp_cnt"));
-        user7dayActionFeature.setClick_cnt(record.getAs("u_7day_click_cnt"));
-        user7dayActionFeature.setShare_cnt(record.getAs("u_7day_share_cnt"));
-        user7dayActionFeature.setReturn_cnt(record.getAs("u_7day_return_cnt"));
-        user7dayActionFeature.setCtr(record.getAs("u_ctr_7day"));
-        user7dayActionFeature.setStr(record.getAs("u_str_7day"));
-        user7dayActionFeature.setRov(record.getAs("u_rov_7day"));
-        user7dayActionFeature.setRos(record.getAs("u_ros_7day"));
+        user7dayActionFeature.setExp_cnt(record.getString("u_7day_exp_cnt"));
+        user7dayActionFeature.setClick_cnt(record.getString("u_7day_click_cnt"));
+        user7dayActionFeature.setShare_cnt(record.getString("u_7day_share_cnt"));
+        user7dayActionFeature.setReturn_cnt(record.getString("u_7day_return_cnt"));
+        user7dayActionFeature.setCtr(record.getString("u_ctr_7day"));
+        user7dayActionFeature.setStr(record.getString("u_str_7day"));
+        user7dayActionFeature.setRov(record.getString("u_rov_7day"));
+        user7dayActionFeature.setRos(record.getString("u_ros_7day"));
         userFeature.setDay7_cnt_features(user7dayActionFeature);
 
         // 3month features
         UserActionFeature user3monthActionFeature = new UserActionFeature();
-        user3monthActionFeature.setExp_cnt(record.getAs("u_3month_exp_cnt"));
-        user3monthActionFeature.setClick_cnt(record.getAs("u_3month_click_cnt"));
-        user3monthActionFeature.setShare_cnt(record.getAs("u_3month_share_cnt"));
-        user3monthActionFeature.setReturn_cnt(record.getAs("u_3month_return_cnt"));
-        user3monthActionFeature.setCtr(record.getAs("u_ctr_3month"));
-        user3monthActionFeature.setStr(record.getAs("u_str_3month"));
-        user3monthActionFeature.setRov(record.getAs("u_rov_3month"));
-        user3monthActionFeature.setRos(record.getAs("u_ros_3month"));
+        user3monthActionFeature.setExp_cnt(record.getString("u_3month_exp_cnt"));
+        user3monthActionFeature.setClick_cnt(record.getString("u_3month_click_cnt"));
+        user3monthActionFeature.setShare_cnt(record.getString("u_3month_share_cnt"));
+        user3monthActionFeature.setReturn_cnt(record.getString("u_3month_return_cnt"));
+        user3monthActionFeature.setCtr(record.getString("u_ctr_3month"));
+        user3monthActionFeature.setStr(record.getString("u_str_3month"));
+        user3monthActionFeature.setRov(record.getString("u_rov_3month"));
+        user3monthActionFeature.setRos(record.getString("u_ros_3month"));
         userFeature.setMonth3_cnt_features(user3monthActionFeature);
 
         return userFeature;
     }
 
 
-    public static ItemFeature constructItemFeature(Row record) {
+    public static ItemFeature constructItemFeature(Record record) {
         ItemFeature itemFeature = new ItemFeature();
-        itemFeature.setVideoId(record.getAs("videoid").toString());
-        itemFeature.setUpId(record.getAs("uid").toString());
-        itemFeature.setTitleLength(record.getAs("play_count").toString());
-        itemFeature.setPlayLength(record.getAs("total_time").toString());
-        itemFeature.setTotalTime(record.getAs("total_time").toString());
-        itemFeature.setDaysSinceUpload(record.getAs("existence_days").toString());
+        itemFeature.setVideoId(record.getString("videoid").toString());
+        itemFeature.setUpId(record.getString("uid").toString());
+        itemFeature.setTitleLength(record.getString("play_count").toString());
+        itemFeature.setPlayLength(record.getString("total_time").toString());
+        itemFeature.setTotalTime(record.getString("total_time").toString());
+        itemFeature.setDaysSinceUpload(record.getString("existence_days").toString());
 
         UserActionFeature user1dayActionFeature = new UserActionFeature();
-        user1dayActionFeature.setExp_cnt(record.getAs("i_1day_exp_cnt"));
-        user1dayActionFeature.setClick_cnt(record.getAs("i_1day_click_cnt"));
-        user1dayActionFeature.setShare_cnt(record.getAs("i_1day_share_cnt"));
-        user1dayActionFeature.setReturn_cnt(record.getAs("i_1day_return_cnt"));
-        user1dayActionFeature.setCtr(record.getAs("i_ctr_1day"));
-        user1dayActionFeature.setStr(record.getAs("i_str_1day"));
-        user1dayActionFeature.setRov(record.getAs("i_rov_1day"));
-        user1dayActionFeature.setRos(record.getAs("i_ros_1day"));
+        user1dayActionFeature.setExp_cnt(record.getString("i_1day_exp_cnt"));
+        user1dayActionFeature.setClick_cnt(record.getString("i_1day_click_cnt"));
+        user1dayActionFeature.setShare_cnt(record.getString("i_1day_share_cnt"));
+        user1dayActionFeature.setReturn_cnt(record.getString("i_1day_return_cnt"));
+        user1dayActionFeature.setCtr(record.getString("i_ctr_1day"));
+        user1dayActionFeature.setStr(record.getString("i_str_1day"));
+        user1dayActionFeature.setRov(record.getString("i_rov_1day"));
+        user1dayActionFeature.setRos(record.getString("i_ros_1day"));
         itemFeature.setDay1_cnt_features(user1dayActionFeature);
 
         UserActionFeature user3dayActionFeature = new UserActionFeature();
-        user3dayActionFeature.setExp_cnt(record.getAs("i_3day_exp_cnt"));
-        user3dayActionFeature.setClick_cnt(record.getAs("i_3day_click_cnt"));
-        user3dayActionFeature.setShare_cnt(record.getAs("i_3day_share_cnt"));
-        user3dayActionFeature.setReturn_cnt(record.getAs("i_3day_return_cnt"));
-        user3dayActionFeature.setCtr(record.getAs("i_ctr_3day"));
-        user3dayActionFeature.setStr(record.getAs("i_str_3day"));
-        user3dayActionFeature.setRov(record.getAs("i_rov_3day"));
-        user3dayActionFeature.setRos(record.getAs("i_ros_3day"));
+        user3dayActionFeature.setExp_cnt(record.getString("i_3day_exp_cnt"));
+        user3dayActionFeature.setClick_cnt(record.getString("i_3day_click_cnt"));
+        user3dayActionFeature.setShare_cnt(record.getString("i_3day_share_cnt"));
+        user3dayActionFeature.setReturn_cnt(record.getString("i_3day_return_cnt"));
+        user3dayActionFeature.setCtr(record.getString("i_ctr_3day"));
+        user3dayActionFeature.setStr(record.getString("i_str_3day"));
+        user3dayActionFeature.setRov(record.getString("i_rov_3day"));
+        user3dayActionFeature.setRos(record.getString("i_ros_3day"));
         itemFeature.setDay3_cnt_features(user1dayActionFeature);
 
         UserActionFeature user7dayActionFeature = new UserActionFeature();
-        user7dayActionFeature.setExp_cnt(record.getAs("i_7day_exp_cnt"));
-        user7dayActionFeature.setClick_cnt(record.getAs("i_7day_click_cnt"));
-        user7dayActionFeature.setShare_cnt(record.getAs("i_7day_share_cnt"));
-        user7dayActionFeature.setReturn_cnt(record.getAs("i_7day_return_cnt"));
-        user7dayActionFeature.setCtr(record.getAs("i_ctr_7day"));
-        user7dayActionFeature.setStr(record.getAs("i_str_7day"));
-        user7dayActionFeature.setRov(record.getAs("i_rov_7day"));
-        user7dayActionFeature.setRos(record.getAs("i_ros_7day"));
+        user7dayActionFeature.setExp_cnt(record.getString("i_7day_exp_cnt"));
+        user7dayActionFeature.setClick_cnt(record.getString("i_7day_click_cnt"));
+        user7dayActionFeature.setShare_cnt(record.getString("i_7day_share_cnt"));
+        user7dayActionFeature.setReturn_cnt(record.getString("i_7day_return_cnt"));
+        user7dayActionFeature.setCtr(record.getString("i_ctr_7day"));
+        user7dayActionFeature.setStr(record.getString("i_str_7day"));
+        user7dayActionFeature.setRov(record.getString("i_rov_7day"));
+        user7dayActionFeature.setRos(record.getString("i_ros_7day"));
         itemFeature.setDay7_cnt_features(user1dayActionFeature);
 
         UserActionFeature user3monthActionFeature = new UserActionFeature();
-        user3monthActionFeature.setExp_cnt(record.getAs("i_3month_exp_cnt"));
-        user3monthActionFeature.setClick_cnt(record.getAs("i_3month_click_cnt"));
-        user3monthActionFeature.setShare_cnt(record.getAs("i_3month_share_cnt"));
-        user3monthActionFeature.setReturn_cnt(record.getAs("i_3month_return_cnt"));
-        user3monthActionFeature.setCtr(record.getAs("i_ctr_3month"));
-        user3monthActionFeature.setStr(record.getAs("i_str_3month"));
-        user3monthActionFeature.setRov(record.getAs("i_rov_3month"));
-        user3monthActionFeature.setRos(record.getAs("i_ros_3month"));
+        user3monthActionFeature.setExp_cnt(record.getString("i_3month_exp_cnt"));
+        user3monthActionFeature.setClick_cnt(record.getString("i_3month_click_cnt"));
+        user3monthActionFeature.setShare_cnt(record.getString("i_3month_share_cnt"));
+        user3monthActionFeature.setReturn_cnt(record.getString("i_3month_return_cnt"));
+        user3monthActionFeature.setCtr(record.getString("i_ctr_3month"));
+        user3monthActionFeature.setStr(record.getString("i_str_3month"));
+        user3monthActionFeature.setRov(record.getString("i_rov_3month"));
+        user3monthActionFeature.setRos(record.getString("i_ros_3month"));
         itemFeature.setMonth3_cnt_features(user3monthActionFeature);
         return itemFeature;
     }

+ 0 - 78
src/main/java/com/tzld/piaoquan/data/dataloader/UserFeatureSparkLoaderMaxcompute.java

@@ -1,78 +0,0 @@
-package com.tzld.piaoquan.data.dataloader;
-
-
-import com.tzld.piaoquan.data.base.UserFeature;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
-import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
-import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.serializer.StringRedisSerializer;
-
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class UserFeatureSparkLoaderMaxcompute {
-
-    private static final String userKeyFormat = "user:%s";
-
-    public static RedisTemplate<String, String> buildRedisTemplate() {
-        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
-        rsc.setPort(6379);
-        rsc.setPassword("Wqsd@2019");
-        rsc.setHostName("r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com");
-        RedisTemplate<String, String> template = new RedisTemplate<>();
-        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
-        fac.afterPropertiesSet();
-        template.setDefaultSerializer(new StringRedisSerializer());
-        template.setConnectionFactory(fac);
-        template.afterPropertiesSet();
-        return template;
-    }
-
-
-    public static void loadFeatureToRedis(RedisTemplate<String, String> redisTemplate, Row line) {
-        Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
-        UserFeature userFeature = FeatureConstructor.constructUserFeature(line);
-        String key = String.format(userKeyFormat, userFeature.getKey());
-        String value = userFeature.getValue();
-        userFeaRedisFormat.put(key, value);
-        redisTemplate.opsForValue().multiSet(userFeaRedisFormat);
-    }
-
-
-    public static Dataset<Row> readTable(SparkSession spark,
-                                         String ptTableName,
-                                         String dt) {
-        // 读 分区表
-        Dataset<Row> rptdf = spark.sql("select * from " + ptTableName + " where dt = " + dt + " limit 1000;");
-        System.out.println("rptdf count: " + rptdf.count());
-        rptdf.printSchema();
-        return rptdf;
-    }
-
-
-    public static void main(String[] args) {
-        SparkSession spark = SparkSession
-                .builder()
-                .appName("SparkSQL-on-MaxCompute")
-                .config("spark.sql.broadcastTimeout", 20 * 60)
-                .config("spark.sql.crossJoin.enabled", true)
-                .config("odps.exec.dynamic.partition.mode", "nonstrict")
-                .getOrCreate();
-        JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
-
-        Dataset<Row> rptdf = readTable(spark, "loghubods.alg_recsys_video_info", "20231207");
-
-        rptdf.toJavaRDD().foreachPartition(
-                rowIterator -> {
-                    RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
-                    rowIterator.forEachRemaining(line -> loadFeatureToRedis(redisTemplate, line));
-                }
-        );
-    }
-
-}