Browse Source

repair redis pipeline 1207

sunmingze 1 year ago
parent
commit
6dd50e7d3c

+ 5 - 1
recommend-server-service/pom.xml

@@ -48,7 +48,11 @@
             <artifactId>odps-sdk-core</artifactId>
             <version>0.45.6-public</version>
         </dependency>
-
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+            <version>3.3.0</version>
+        </dependency>
 <!--        <dependency>-->
 <!--            <groupId>org.apache.hadoop</groupId>-->
 <!--            <artifactId>hadoop-hdfs</artifactId>-->

+ 7 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/Application.java

@@ -2,6 +2,9 @@ package com.tzld.piaoquan.recommend.server;
 
 // import com.tzld.piaoquan.recommend.feature.client.FeatureClient;
 import com.tzld.piaoquan.recommend.feature.client.FeatureClient;
+import com.tzld.piaoquan.recommend.server.dataloader.ItemFeatureToRedisLoader;
+import com.tzld.piaoquan.recommend.server.dataloader.OfflineAdOutSamplesLoader;
+import com.tzld.piaoquan.recommend.server.dataloader.UserFeatureToRedisLoader;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@@ -26,6 +29,10 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy;
 public class Application {
     public static void main(String[] args) {
         SpringApplication.run(Application.class, args);
+        // UserFeatureToRedisLoader userFeatureToRedisLoader = new UserFeatureToRedisLoader();
+        // userFeatureToRedisLoader.loadFeatureToRedis("user_redis_tmp_smz","20231206");
+        OfflineAdOutSamplesLoader.mutiplyParser("user_video_features_data_final", "20231206");
+
     }
 
 

+ 4 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/ItemFeature.java

@@ -1,5 +1,7 @@
 package com.tzld.piaoquan.recommend.server.common.base;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 import lombok.Data;
 import lombok.Getter;
@@ -130,7 +132,8 @@ public class ItemFeature {
     }
 
     public String getValue(){
-        return JSONUtils.toJson(this);
+        Gson gson = new GsonBuilder().serializeSpecialFloatingPointValues().create();
+        return gson.toJson(this);
     }
 
 

+ 4 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/UserFeature.java

@@ -1,5 +1,7 @@
 package com.tzld.piaoquan.recommend.server.common.base;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 import lombok.Data;
 import lombok.Getter;
@@ -80,7 +82,8 @@ public class UserFeature {
     }
 
     public String getValue(){
-        return JSONUtils.toJson(this);
+        Gson gson = new GsonBuilder().serializeSpecialFloatingPointValues().create();
+        return gson.toJson(this);
     }
 
 }

+ 24 - 7
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/dataloader/ItemFeatureToRedisLoader.java

@@ -3,10 +3,12 @@ package com.tzld.piaoquan.recommend.server.dataloader;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.tunnel.io.TunnelRecordReader;
 import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
-import com.tzld.piaoquan.recommend.server.common.base.ItemFeature;
-import com.tzld.piaoquan.recommend.server.common.base.UserFeature;
+import com.tzld.piaoquan.recommend.server.common.base.*;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
@@ -15,15 +17,28 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 
-@Component
 public class ItemFeatureToRedisLoader {
 
-    @Autowired
-    private RedisTemplate<String, String> redisTemplate;
     private final String videoKeyFormat = "video:%s";
     private ExecutorService pool = ThreadPoolFactory.defaultPool();
 
+    public static RedisTemplate<String, String> buildRedisTemplate() {
+        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
+        rsc.setPort(6379);
+        rsc.setPassword("Wqsd@2019");
+        rsc.setHostName("r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com");
+        RedisTemplate<String, String> template = new RedisTemplate<>();
+        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
+        fac.afterPropertiesSet();
+        template.setDefaultSerializer(new StringRedisSerializer());
+        template.setConnectionFactory(fac);
+        template.afterPropertiesSet();
+        return template;
+    }
+
+
     public void loadFeatureToRedis(String table, String dt) {
+        RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
         String sql = String.format("select * from %s where dt ='%s';", table, dt);
         TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(sql, table, dt);
         Record record;
@@ -35,12 +50,14 @@ public class ItemFeatureToRedisLoader {
                 String key = String.format(videoKeyFormat, itemFeature.getKey());
                 String value = itemFeature.getValue();
                 userFeaRedisFormat.put(key, value);
-                if (count < 10000) {
+                if (count < 200) {
                     count++;
-                } else if (count == 10000) {
+                } else if (count == 200) {
                     redisTemplate.opsForValue().multiSet(userFeaRedisFormat);
+                    System.out.println("------succes add 2000-----");
                     userFeaRedisFormat = new HashMap<String, String>();
                     count = 0;
+                    break;
                 }
             }
         } catch (IOException e) {

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/dataloader/OfflineAdOutSamplesLoader.java

@@ -68,7 +68,7 @@ public class OfflineAdOutSamplesLoader {
 
     //  主处理逻辑
     public static void mutiplyParser(String table, String dt) {
-        String sql = String.format("select * from %s where dt ='%s' and ad_ornot = '0' and apptype != '13';", table, dt);
+        String sql = String.format("select * from %s where and ad_ornot = '0' and apptype != '13' dt ='%s';", table, dt);
         TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(sql, table, dt);
         Record record;
         try {

+ 23 - 5
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/dataloader/UserFeatureToRedisLoader.java

@@ -5,7 +5,10 @@ import com.aliyun.odps.tunnel.io.TunnelRecordReader;
 import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.common.base.*;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
+import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
@@ -15,16 +18,29 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 
-@Component
 public class UserFeatureToRedisLoader {
 
-    @Autowired
-    private RedisTemplate<String, String> redisTemplate;
     private final String userKeyFormat = "user:%s";
     private ExecutorService pool = ThreadPoolFactory.defaultPool();
 
+    public static RedisTemplate<String, String> buildRedisTemplate() {
+        RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration();
+        rsc.setPort(6379);
+        rsc.setPassword("Wqsd@2019");
+        rsc.setHostName("r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com");
+        RedisTemplate<String, String> template = new RedisTemplate<>();
+        JedisConnectionFactory fac = new JedisConnectionFactory(rsc);
+        fac.afterPropertiesSet();
+        template.setDefaultSerializer(new StringRedisSerializer());
+        template.setConnectionFactory(fac);
+        template.afterPropertiesSet();
+        return template;
+    }
+
+
 
     public void loadFeatureToRedis(String userTable, String dt) {
+        RedisTemplate<String, String> redisTemplate = buildRedisTemplate();
         String sql = String.format("select * from %s where dt ='%s';", userTable, dt);
         TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(sql, userTable, dt);
         Record record;
@@ -36,12 +52,14 @@ public class UserFeatureToRedisLoader {
                 String key = String.format(userKeyFormat, userFeature.getKey());
                 String value = userFeature.getValue();
                 userFeaRedisFormat.put(key, value);
-                if(count < 10000) {
+                if(count < 200) {
                     count++;
-                } else if (count == 10000) {
+                } else if (count == 200) {
                     redisTemplate.opsForValue().multiSet(userFeaRedisFormat);
+                    System.out.println("------succes add 2000-----");
                     userFeaRedisFormat = new HashMap<String, String>();
                     count = 0;
+                    break;
                 }
             }
         } catch (IOException e) {