Pārlūkot izejas kodu

repair redis pipeline 1207

sunmingze 1 gadu atpakaļ
vecāks
revīzija
214837a2c4

+ 0 - 1
recommend-server-service/pom.xml

@@ -183,7 +183,6 @@
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
                 <configuration>
-                    <mainClass>com.tzld.piaoquan.recommend.server.dataloader.OfflineSamplesLoader</mainClass>
                     <layout>ZIP</layout>
                 </configuration>
                 <executions>

+ 0 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/Application.java

@@ -2,7 +2,6 @@ package com.tzld.piaoquan.recommend.server;
 
 // import com.tzld.piaoquan.recommend.feature.client.FeatureClient;
 import com.tzld.piaoquan.recommend.feature.client.FeatureClient;
-import com.tzld.piaoquan.recommend.server.dataloader.OfflineSamplesLoader;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@@ -27,7 +26,6 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy;
 public class Application {
     public static void main(String[] args) {
         SpringApplication.run(Application.class, args);
-        OfflineSamplesLoader.mutiplyParser("user_video_features_data_final", "20231206");
     }
 
 

+ 10 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/ItemFeature.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.common.base;
 
+import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 import lombok.Data;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
@@ -124,4 +125,13 @@ public class ItemFeature {
         }
     }
 
+    public String getKey() {
+        return this.videoId;
+    }
+
+    public String getValue(){
+        return JSONUtils.toJson(this);
+    }
+
+
 }

+ 2 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/dataloader/FeatureConstructor.java

@@ -40,7 +40,7 @@ public class  FeatureConstructor {
         Odps odps = new Odps(account);
         odps.setEndpoint(ODPS_CONFIG.get("ENDPOINT"));
         odps.setDefaultProject("loghubods");
-        String sql = String.format("select * from %s where dt ='%s' limit 100000;", table, dt);
+        String sql = String.format("select * from %s where dt ='%s';", table, dt);
         Instance instance;
         List<Record> records = new ArrayList<Record>();
         try {
@@ -54,11 +54,10 @@ public class  FeatureConstructor {
     }
 
 
-    public static TunnelRecordReader loadDataFromOSSSession(String table, String dt) {
+    public static TunnelRecordReader loadDataFromOSSSession(String sql, String table, String dt) {
         Odps odps = new Odps(account);
         odps.setEndpoint(ODPS_CONFIG.get("ENDPOINT"));
         odps.setDefaultProject("loghubods");
-        String sql = String.format("select * from %s where dt ='%s';", table, dt);
         TunnelRecordReader reader = null;
         try {
             Instance instance = SQLTask.run(odps, sql);

+ 8 - 12
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/dataloader/OfflineSamplesLoader.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/dataloader/OfflineAdOutSamplesLoader.java

@@ -1,27 +1,18 @@
 package com.tzld.piaoquan.recommend.server.dataloader;
 
 import com.aliyun.odps.tunnel.io.TunnelRecordReader;
-import com.fasterxml.jackson.databind.ser.Serializers;
 import com.google.common.collect.ListMultimap;
 import com.tzld.piaoquan.recommend.server.common.base.*;
-import com.tzld.piaoquan.recommend.server.common.enums.VlogFeatureGroup;
 import com.tzld.piaoquan.recommend.server.gen.recommend.BaseFeature;
 import com.tzld.piaoquan.recommend.server.gen.recommend.FeatureGroup;
-import com.tzld.piaoquan.recommend.server.gen.recommend.GroupedFeature;
 import com.tzld.piaoquan.recommend.server.service.score.feature.VlogShareLRFeatureExtractor;
-import com.aliyun.odps.Instance;
-import com.aliyun.odps.Odps;
-import com.aliyun.odps.OdpsException;
-import com.aliyun.odps.account.Account;
-import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
-import com.aliyun.odps.task.SQLTask;
 
 import java.io.IOException;
 import java.util.*;
 
 
-public class OfflineSamplesLoader {
+public class OfflineAdOutSamplesLoader {
 
     private static final String BUCKET_NAME = "ali-recommend";
     private static  final Map<String, String> ODPS_CONFIG =  new HashMap<String, String>();
@@ -77,7 +68,8 @@ public class OfflineSamplesLoader {
 
     //  主处理逻辑
     public static void mutiplyParser(String table, String dt) {
-        TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(table, dt);
+        String sql = String.format("select * from %s where dt ='%s' and ad_ornot = '0' and apptype != '13';", table, dt);
+        TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(sql, table, dt);
         Record record;
         try {
             while ((record = reader.read()) != null) {
@@ -91,7 +83,11 @@ public class OfflineSamplesLoader {
 
 
     public static void main(String[] args) {
-        OfflineSamplesLoader.mutiplyParser("user_video_features_data_final", "20231206");
+        if(args.length < 2){
+            System.out.println("--------args 缺失---------");
+            return;
+        }
+        OfflineAdOutSamplesLoader.mutiplyParser(args[0], args[1]);
     }
 
 }

+ 10 - 18
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/dataloader/FeatureToRedisLoader.java → recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/dataloader/UserFeatureToRedisLoader.java

@@ -1,46 +1,32 @@
 package com.tzld.piaoquan.recommend.server.dataloader;
 
-import com.aliyun.odps.Instance;
-import com.aliyun.odps.Odps;
-import com.aliyun.odps.OdpsException;
-import com.aliyun.odps.account.Account;
-import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
-import com.aliyun.odps.task.SQLTask;
 import com.aliyun.odps.tunnel.io.TunnelRecordReader;
-import com.google.common.collect.ListMultimap;
 import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.common.base.*;
-import com.tzld.piaoquan.recommend.server.dataloader.OfflineSamplesLoader;
-import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
 
 
 @Component
-public class FeatureToRedisLoader {
+public class UserFeatureToRedisLoader {
 
     @Autowired
     private RedisTemplate<String, String> redisTemplate;
     private final String userKeyFormat = "user:%s";
-    private final String videoKeyFormat = "video:%s";
     private ExecutorService pool = ThreadPoolFactory.defaultPool();
 
 
-
     public void loadFeatureToRedis(String userTable, String dt) {
-        TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(userTable, dt);
+        String sql = String.format("select * from %s where dt ='%s';", userTable, dt);
+        TunnelRecordReader reader = FeatureConstructor.loadDataFromOSSSession(sql, userTable, dt);
         Record record;
         Map<String, String> userFeaRedisFormat = new HashMap<String, String>();
         int count = 0;
@@ -64,8 +50,14 @@ public class FeatureToRedisLoader {
     }
 
 
-    public static void main(String[] args) {
 
+    public static void main(String[] args) {
+        if(args.length < 2){
+            System.out.println("--------args 缺失---------");
+            return;
+        }
+        UserFeatureToRedisLoader userFeatureToRedisLoader = new UserFeatureToRedisLoader();
+        userFeatureToRedisLoader.loadFeatureToRedis(args[0], args[1]);
     }
 
 }

+ 2 - 8
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java

@@ -68,12 +68,10 @@ public class RankService {
         ScorerPipeline scorerPipeline = ScorerUtils.getScorerPipeline(config);
         // TODO  merge 后返回待排序的video list
         List<Video> recallVideos = recallResult.mergeRecallVideos();
+
         // TODO 转化成rankitem
         // convert List<Video> to List<RankItem>
-        List<RankItem> rankItem = new ArrayList<RankItem>();
-
-        List<RankItem> rovRecallRank  =
-                scorerPipeline.scoring(requestContext, param, userFeature, rankItem);
+        List<RankItem> rovRecallRank  = scorerPipeline.scoring(requestContext, param, userFeature, videoRankFeatures);
 
         log.info("mergeAndRankRovRecall rovRecallRank={}", JSONUtils.toJson(rovRecallRank));
         List<Video> flowPoolRank = mergeAndRankFlowPoolRecall(param);
@@ -83,8 +81,6 @@ public class RankService {
         removeDuplicate(param, recallVideos, flowPoolRank);
         // convert List<RankItem> to List<Video>
 
-
-
         log.info("removeDuplicate rovRecallRank={}, flowPoolRank={}",
                 JSONUtils.toJson(rovRecallRank),
                 JSONUtils.toJson(flowPoolRank));
@@ -92,8 +88,6 @@ public class RankService {
         // 融合排序
         //TODO need to repair
         return mergeAndSort(param, recallVideos, flowPoolRank);
-
-
     }