소스 검색

write to redis 1204

sunmingze 1 년 전
부모
커밋
0b565ae76c

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/Application.java

@@ -27,7 +27,7 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy;
 public class Application {
     public static void main(String[] args) {
         SpringApplication.run(Application.class, args);
-        OfflineSamplesLoader.mutiplyParser("user_video_features_data_final", "20231203");
+        OfflineSamplesLoader.mutiplyParser("user_video_features_data_final", "20231206");
     }
 
 

+ 91 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/ItemFeature.java

@@ -1,8 +1,11 @@
 package com.tzld.piaoquan.recommend.server.common.base;
 
 import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
 
-@Data
+@Getter
+@NoArgsConstructor
 public class ItemFeature {
     private String videoId;
 
@@ -34,4 +37,91 @@ public class ItemFeature {
     // 3个月统计量
     private UserActionFeature month3_cnt_features;
 
+
+    public void setVideoId(String key){
+        if(key == null){
+            this.videoId = "0";
+        } else {
+            this.videoId = key;
+        }
+    }
+
+    public void setUpId(String key){
+        if(key == null){
+            this.upId = "0";
+        } else {
+            this.upId = key;
+        }
+    }
+
+    public void setTags(String key){
+        if(key == null){
+            this.tags = "0";
+        } else {
+            this.tags = key;
+        }
+    }
+
+    public void setTitle(String key){
+        if(key == null){
+            this.title = "0";
+        } else {
+            this.title = key;
+        }
+    }
+
+
+    public void setDay1_cnt_features(UserActionFeature feature){
+        this.day1_cnt_features = feature;
+    }
+
+
+    public void setDay3_cnt_features(UserActionFeature feature){
+        this.day3_cnt_features = feature;
+
+    }
+
+    public void setDay7_cnt_features(UserActionFeature feature){
+        this.day7_cnt_features = feature;
+
+    }
+
+    public void setMonth3_cnt_features(UserActionFeature feature){
+        this.month3_cnt_features= feature;
+
+    }
+
+    public void setTitleLength(String key) {
+        if(key == null){
+            this.titleLength = "0";
+        } else {
+            this.titleLength = key;
+        }
+    }
+
+
+    public void setDaysSinceUpload(String key) {
+        if(key == null){
+            this.daysSinceUpload = "0";
+        } else {
+            this.daysSinceUpload = key;
+        }
+    }
+
+    public void setPlayLength(String key) {
+        if(key == null){
+            this.playLength = "0";
+        } else {
+            this.playLength = key;
+        }
+    }
+
+    public void setTotalTime(String key) {
+        if(key == null){
+            this.totalTime = "0";
+        } else {
+            this.totalTime = key;
+        }
+    }
+
 }

+ 50 - 16
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/UserActionFeature.java

@@ -30,39 +30,73 @@ public class UserActionFeature {
 
 
     public void setExp_cnt(Object key){
-        String formateKey = key.toString().replace("\\N", "-1");
-        this.exp_cnt = ceilLog(Double.valueOf(formateKey));
+        if(key == null ) {
+            this.exp_cnt = 0.0;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.exp_cnt = ceilLog(Double.valueOf(formateKey));
+        }
     }
 
     public void setClick_cnt(Object key){
-        String formateKey = key.toString().replace("\\N", "-1");
-        this.click_cnt = ceilLog(Double.valueOf(formateKey));
+        if(key == null ){
+            this.click_cnt = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.click_cnt = ceilLog(Double.valueOf(formateKey));
+        }
     }
     public void setShare_cnt(Object key){
-        String formateKey = key.toString().replace("\\N", "-1");
-        this.share_cnt = ceilLog(Double.valueOf(formateKey));
+        if(key == null ){
+            this.share_cnt = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.share_cnt = ceilLog(Double.valueOf(formateKey));
+        }
     }
     public void setReturn_cnt(Object key){
-        String formateKey = key.toString().replace("\\N", "-1");
-        this.return_cnt = ceilLog(Double.valueOf(formateKey));
+        if(key == null ){
+            this.return_cnt = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.return_cnt = ceilLog(Double.valueOf(formateKey));
+        }
     }
 
     public void setCtr(Object key){
-        String formateKey = key.toString().replace("\\N", "-1");
-        this.ctr = bucketRatioFeature(Double.valueOf(formateKey));
+        if(key == null ){
+            this.ctr = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.ctr = bucketRatioFeature(Double.valueOf(formateKey));
+        }
     }
+
     public void setStr(Object key){
-        String formateKey = key.toString().replace("\\N", "-1");
-        this.str = bucketRatioFeature(Double.valueOf(formateKey));
+        if(key == null ){
+            this.str = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.str = bucketRatioFeature(Double.valueOf(formateKey));
+        }
     }
+
     public void setRov(Object key){
-        String formateKey = key.toString().replace("\\N", "-1");
-        this.rov = bucketRatioFeature(Double.valueOf(formateKey));
+        if(key == null ){
+            this.rov = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.rov = bucketRatioFeature(Double.valueOf(formateKey));
+        }
     }
 
     public void setRos(Object key){
-        String formateKey = key.toString().replace("\\N", "-1");
-        this.ros = bucketRatioFeature(Double.valueOf(formateKey));
+        if(key == null ){
+            this.ros = 0.0 ;
+        } else {
+            String formateKey = key.toString().replace("\\N", "-1");
+            this.ros = bucketRatioFeature(Double.valueOf(formateKey));
+        }
     }
 
 

+ 54 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/UserFeature.java

@@ -1,10 +1,13 @@
 package com.tzld.piaoquan.recommend.server.common.base;
 
 import lombok.Data;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
 
 import java.util.Map;
 
-@Data
+@Getter
+@NoArgsConstructor
 public class UserFeature {
     private String uid;
     // 当天统计量信息
@@ -20,6 +23,56 @@ public class UserFeature {
     private String user_cycle_bucket_30days;
     private String user_share_bucket_30days;
 
+    public void setUid(String key){
+        this.uid = key;
+        if(key == null)
+            this.uid = "0";
+    }
+
+
+    public void setDay1_cnt_features(UserActionFeature key){
+        this.day1_cnt_features = key;
+        if(key == null)
+            this.day1_cnt_features = new UserActionFeature();
+    }
+
+    public void setDay3_cnt_features(UserActionFeature key){
+        this.day3_cnt_features = key;
+        if(key == null)
+            this.day3_cnt_features = new UserActionFeature();
+    }
+
+    public void setDay7_cnt_features(UserActionFeature key){
+        this.day7_cnt_features = key;
+        if(key == null)
+            this.day7_cnt_features = new UserActionFeature();
+    }
+
+    public void setMonth3_cnt_features(UserActionFeature key) {
+        this.month3_cnt_features = key;
+        if(key == null)
+            this.month3_cnt_features = new UserActionFeature();
+    }
+
+
+    public void setUser_cycle_bucket_7days(String key){
+        this.user_cycle_bucket_7days = key;
+        if(key == null)
+            this.user_cycle_bucket_7days = "0";
+    }
+
+    public void setUser_cycle_bucket_30days(String key){
+        this.user_cycle_bucket_30days = key;
+        if(key == null)
+            this.user_cycle_bucket_30days = "0";
+    }
+
+    public void setUser_share_bucket_30days(String key){
+        this.user_share_bucket_30days = key;
+        if(key == null)
+            this.user_share_bucket_30days = "0";
+    }
+
 
     public String parsetoRedisString() {
 

+ 49 - 21
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/dataloader/FeatureConstructor.java

@@ -1,6 +1,9 @@
 package com.tzld.piaoquan.recommend.server.dataloader;
 
 
+import com.aliyun.odps.data.ResultSet;
+import com.aliyun.odps.tunnel.InstanceTunnel;
+import com.aliyun.odps.tunnel.io.TunnelRecordReader;
 import com.fasterxml.jackson.databind.ser.Serializers;
 import com.google.common.collect.ListMultimap;
 import com.tzld.piaoquan.recommend.server.common.base.*;
@@ -16,9 +19,11 @@ import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
+
+import java.io.IOException;
 import java.util.*;
 
-public class FeatureConstructor {
+public class  FeatureConstructor {
 
     private static final String BUCKET_NAME = "ali-recommend";
     private static  final Map<String, String> ODPS_CONFIG =  new HashMap<String, String>();
@@ -28,12 +33,14 @@ public class FeatureConstructor {
         ODPS_CONFIG.put("ACCESSKEY", "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P");
     };
 
+    private static final Account account =new AliyunAccount(ODPS_CONFIG.get("ACCESSID"), ODPS_CONFIG.get("ACCESSKEY"));
+
+
     public static List<Record> loadStreamDataFromOSS(String table, String dt) {
-        Account account = new AliyunAccount(ODPS_CONFIG.get("ACCESSID"), ODPS_CONFIG.get("ACCESSKEY"));
         Odps odps = new Odps(account);
         odps.setEndpoint(ODPS_CONFIG.get("ENDPOINT"));
         odps.setDefaultProject("loghubods");
-        String sql = String.format("select * from %s where dt ='%s' limit 100;", table, dt);
+        String sql = String.format("select * from %s where dt ='%s' limit 100000;", table, dt);
         Instance instance;
         List<Record> records = new ArrayList<Record>();
         try {
@@ -47,21 +54,42 @@ public class FeatureConstructor {
     }
 
 
+    public static TunnelRecordReader loadDataFromOSSSession(String table, String dt) {
+        Odps odps = new Odps(account);
+        odps.setEndpoint(ODPS_CONFIG.get("ENDPOINT"));
+        odps.setDefaultProject("loghubods");
+        String sql = String.format("select * from %s where dt ='%s';", table, dt);
+        TunnelRecordReader reader = null;
+        try {
+            Instance instance = SQLTask.run(odps, sql);
+            instance.waitForSuccess();
+            InstanceTunnel tunnel = new InstanceTunnel(odps);
+            InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(odps.getDefaultProject(), instance.getId());
+            long count = session.getRecordCount();
+            reader = session.openRecordReader(0, count);
+        } catch (OdpsException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return reader;
+    }
+
 
     public static RequestContext constructRequestContext(Record record) {
         RequestContext requestContext = new RequestContext();
-        requestContext.setApptype(record.get("apptype").toString());
-        requestContext.setMachineinfo_brand(record.get("machineinfo_brand").toString());
-        requestContext.setMachineinfo_model(record.get("machineinfo_model").toString());
-        requestContext.setMachineinfo_platform(record.get("machineinfo_platform").toString());
-        requestContext.setMachineinfo_sdkversion(record.get("machineinfo_sdkversion").toString());
-        requestContext.setMachineinfo_system(record.get("machineinfo_system").toString());
-        requestContext.setMachineinfo_wechatversion(record.get("machineinfo_wechatversion").toString());
-        requestContext.setDay(record.get("ctx_day").toString());
-        requestContext.setWeek(record.get("ctx_week").toString());
-        requestContext.setHour(record.get("ctx_hour").toString());
-        requestContext.setRegion(record.get("ctx_region").toString());
-        requestContext.setCity(record.get("ctx_city").toString());
+        requestContext.setApptype(record.getString("apptype"));
+        requestContext.setMachineinfo_brand(record.getString("machineinfo_brand"));
+        requestContext.setMachineinfo_model(record.getString("machineinfo_model"));
+        requestContext.setMachineinfo_platform(record.getString("machineinfo_platform"));
+        requestContext.setMachineinfo_sdkversion(record.getString("machineinfo_sdkversion"));
+        requestContext.setMachineinfo_system(record.getString("machineinfo_system"));
+        requestContext.setMachineinfo_wechatversion(record.getString("machineinfo_wechatversion"));
+        requestContext.setDay(record.getString("ctx_day"));
+        requestContext.setWeek(record.getString("ctx_week"));
+        requestContext.setHour(record.getString("ctx_hour"));
+        requestContext.setRegion(record.getString("ctx_region"));
+        requestContext.setCity(record.getString("ctx_city"));
         return requestContext;
     }
 
@@ -129,12 +157,12 @@ public class FeatureConstructor {
 
     public static ItemFeature constructItemFeature(Record record){
         ItemFeature itemFeature = new ItemFeature();
-        itemFeature.setVideoId(record.get("videoid").toString());
-        itemFeature.setUpId(record.get("i_up_id").toString());
-        itemFeature.setTitleLength(record.get("i_title_len").toString());
-        itemFeature.setPlayLength(record.get("i_play_len").toString());
-        itemFeature.setTotalTime(record.get("total_time").toString());
-        itemFeature.setDaysSinceUpload(record.get("i_days_since_upload").toString());
+        itemFeature.setVideoId(record.getString("videoid"));
+        itemFeature.setUpId(record.getString("i_up_id"));
+        itemFeature.setTitleLength(record.getString("i_title_len"));
+        itemFeature.setPlayLength(record.getString("i_play_len"));
+        itemFeature.setTotalTime(record.getString("total_time"));
+        itemFeature.setDaysSinceUpload(record.getString("i_days_since_upload"));
 
         UserActionFeature user1dayActionFeature = new UserActionFeature();
         user1dayActionFeature.setExp_cnt(record.getString("i_1day_exp_cnt"));

+ 39 - 22
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/dataloader/FeatureToRedisLoader.java

@@ -7,49 +7,66 @@ import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
+import com.aliyun.odps.tunnel.io.TunnelRecordReader;
 import com.google.common.collect.ListMultimap;
 import com.tzld.piaoquan.recommend.server.common.base.*;
-import com.tzld.piaoquan.recommend.server.gen.recommend.BaseFeature;
-import com.tzld.piaoquan.recommend.server.gen.recommend.FeatureGroup;
-import com.tzld.piaoquan.recommend.server.service.score.feature.VlogShareLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.server.dataloader.OfflineSamplesLoader;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class FeatureToRedisLoader {
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-    public static void loadFeatureToRedis(String userTable, String itemTable, String dt) {
-        // 1. 数据读取
-        List<Record> userOriginData = FeatureConstructor.loadStreamDataFromOSS(userTable, dt);
-        List<Record> itemOriginData = FeatureConstructor.loadStreamDataFromOSS(itemTable, dt);
 
-        // 2. user item初始化
-        for(int i=0; i < userOriginData.size(); i++) {
-            Record userRecord = userOriginData.get(i);
-            UserFeature userFeature = FeatureConstructor.constructUserFeature(userRecord);
-        }
 
-        for(int i=0; i < itemOriginData.size(); i++) {
-            Record itemRecord = itemOriginData.get(i);
-            ItemFeature itemFeature = FeatureConstructor.constructItemFeature(itemRecord);
-        }
+class Task implements Runnable {
+    // 使用list避免线程队列增长无限制,所以采用10万条内容提交一次线程
+    private List<String> keys;
+    public Task(List<String> keys){
+        this.keys = keys;
     }
-
-    public static void writetoRedis(UserFeature userFeature) {
-
+    public void run() {
+     //TODO write List<String> to Redis
+    }
+}
 
 
+public class FeatureToRedisLoader {
 
+    private static final ExecutorService persistOnlineFeaturesExecutor = Executors.newFixedThreadPool(10);
+
+    public static void loadFeatureToRedis(String userTable, String dt) {
+        TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(userTable, dt);
+        Record record;
+        ArrayList<String> keyList = new ArrayList<>();
+        int count = 0;
+        try {
+            while ((record = reader.read()) != null) {
+                UserFeature userFeature = FeatureConstructor.constructUserFeature(record);
+                // TODO add string format for redis
+                String tmp =  userFeature.parsetoRedisString();
+                keyList.add(tmp);
+                if(count < 100000) {
+                    count++;
+                } else if (count == 100000) {
+                    persistOnlineFeaturesExecutor.execute(new Task(keyList));
+                    keyList = new ArrayList<>();
+                    count = 0;
+                }
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
     }
 
 
     public static void main(String[] args) {
         String Path = "";
 
-
     }
 
 }

+ 17 - 6
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/dataloader/OfflineSamplesLoader.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.dataloader;
 
+import com.aliyun.odps.tunnel.io.TunnelRecordReader;
 import com.fasterxml.jackson.databind.ser.Serializers;
 import com.google.common.collect.ListMultimap;
 import com.tzld.piaoquan.recommend.server.common.base.*;
@@ -15,6 +16,8 @@ import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
+
+import java.io.IOException;
 import java.util.*;
 
 
@@ -32,7 +35,10 @@ public class OfflineSamplesLoader {
     // 单条日志处理逻辑
     public static String singleParse(Record record) {
         // 数据解析
-        String label = record.get("ui_is_out").toString();
+        String label = record.getString("ui_is_out");
+        if(label == null){
+            label = "0";
+        }
 
         // 从sql的 record中 初始化对象内容
         RequestContext requestContext = FeatureConstructor.constructRequestContext(record);
@@ -57,7 +63,7 @@ public class OfflineSamplesLoader {
     }
 
     // 构建样本的字符串
-    public static String parseSamplesToString(String label, ListMultimap<FeatureGroup, BaseFeature>  featureMap) {
+    public static String parseSamplesToString(String label, ListMultimap<FeatureGroup, BaseFeature> featureMap) {
         ArrayList<String> featureList = new ArrayList<String>();
         for (Map.Entry<FeatureGroup, BaseFeature> entry : featureMap.entries()) {
             FeatureGroup groupedFeature = entry.getKey();
@@ -71,10 +77,15 @@ public class OfflineSamplesLoader {
 
     //  主处理逻辑
     public static void mutiplyParser(String table, String dt) {
-        List<Record> dataFrame = FeatureConstructor.loadStreamDataFromOSS(table, dt);
-        for(int i=0; i< dataFrame.size(); i++){
-            String samples = singleParse(dataFrame.get(i));
-            System.out.println(samples);
+        TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(table, dt);
+        Record record;
+        try {
+            while ((record = reader.read()) != null) {
+                String samples = singleParse(record);
+                System.out.println(samples);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
         }
     }