Pārlūkot izejas kodu

Merge branch 'feature_20241107_vovh24_rank' of algorithm/recommend-server into master

zhaohaipeng 5 mēneši atpakaļ
vecāks
revīzija
c875ceebb0

+ 5 - 0
.gitignore

@@ -48,3 +48,8 @@ build/
 
 ### log ###
 logs/*
+
+.DS_Store
+xgboost
+recommend-server/logs
+LOG_PATH_IS_UNDEFINED

+ 1 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/RankItem.java

@@ -31,6 +31,7 @@ public class RankItem implements Comparable<RankItem> {
     private double scoreRos; // 记录ros的score
     private double scoreStr; // 记录str的score
     private double scoreRov; // 记录rov的score
+    private double vovScore; // 记录vov的score
 
     // 记录Item侧用到的特征
     private ItemFeature itemFeature;

+ 42 - 68
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RegionMergeModelV563.java

@@ -17,12 +17,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.util.*;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -36,10 +31,7 @@ public class RankStrategy4RegionMergeModelV563 extends RankStrategy4RegionMergeM
     @Autowired
     private FeatureService featureService;
 
-    Map<String, double[]> bucketsMap = new HashMap<>();
-    Map<String, Double> bucketsLen = new HashMap<>();
-
-    @Value("${similarity.concurrent: false}")
+    @Value("${similarity.concurrent: true}")
     private boolean similarityConcurrent;
 
     @Override
@@ -67,13 +59,13 @@ public class RankStrategy4RegionMergeModelV563 extends RankStrategy4RegionMergeM
         List<Video> rovRecallRank = new ArrayList<>(v0);
         //-------------------return相似召回------------------
         List<Video> v6 = extractAndSort(param, ReturnVideoRecallStrategy.PUSH_FORM);
-        v6 = v6.stream().filter(r-> !setVideo.contains(r.getVideoId())).collect(Collectors.toList());
+        v6 = v6.stream().filter(r -> !setVideo.contains(r.getVideoId())).collect(Collectors.toList());
         v6 = v6.subList(0, Math.min(mergeWeight.getOrDefault("v6", 5.0).intValue(), v6.size()));
         rovRecallRank.addAll(v6);
         setVideo.addAll(v6.stream().map(Video::getVideoId).collect(Collectors.toSet()));
         //-------------------新地域召回------------------
-        List<Video> v1 = extractAndSort(param, RegionRealtimeRecallStrategyV1_sort.PUSH_FORM);
-        v1 = v1.stream().filter(r-> !setVideo.contains(r.getVideoId())).collect(Collectors.toList());
+        List<Video> v1 = extractAndSort(param, RegionRealtimeRecallStrategyV1.PUSH_FORM);
+        v1 = v1.stream().filter(r -> !setVideo.contains(r.getVideoId())).collect(Collectors.toList());
         v1 = v1.subList(0, Math.min(mergeWeight.getOrDefault("v1", 5.0).intValue(), v1.size()));
         rovRecallRank.addAll(v1);
         setVideo.addAll(v1.stream().map(Video::getVideoId).collect(Collectors.toSet()));
@@ -238,7 +230,12 @@ public class RankStrategy4RegionMergeModelV563 extends RankStrategy4RegionMergeM
                             String tags = c34567Map.getOrDefault(key, "");
                             if (!tags.isEmpty()) {
                                 Future<Pair<String, Double[]>> future = ThreadPoolFactory.defaultPool().submit(() -> {
-                                    Double[] doubles = ExtractorUtils.funcC34567ForTags(tags, title);
+                                    Double[] doubles = null;
+                                    if (param.getAbExpCodes().contains(word2vecExp)) {
+                                        doubles = ExtractorUtils.funcC34567ForTagsNew(tags, title);
+                                    } else {
+                                        doubles = ExtractorUtils.funcC34567ForTags(tags, title);
+                                    }
                                     return Pair.create(key, doubles);
                                 });
                                 futures.add(future);
@@ -260,7 +257,12 @@ public class RankStrategy4RegionMergeModelV563 extends RankStrategy4RegionMergeM
                         for (String key_time : Arrays.asList("tags_1d", "tags_3d", "tags_7d")) {
                             String tags = c34567Map.getOrDefault(name + "_" + key_time, "");
                             if (!tags.isEmpty()) {
-                                Double[] doubles = ExtractorUtils.funcC34567ForTags(tags, title);
+                                Double[] doubles = null;
+                                if (param.getAbExpCodes().contains(word2vecExp)) {
+                                    doubles = ExtractorUtils.funcC34567ForTagsNew(tags, title);
+                                } else {
+                                    doubles = ExtractorUtils.funcC34567ForTags(tags, title);
+                                }
                                 featureMap.put(name + "_" + key_time + "_matchnum", doubles[0]);
                                 featureMap.put(name + "_" + key_time + "_maxscore", doubles[1]);
                                 featureMap.put(name + "_" + key_time + "_avgscore", doubles[2]);
@@ -328,32 +330,40 @@ public class RankStrategy4RegionMergeModelV563 extends RankStrategy4RegionMergeM
             item.featureMap = featureMap;
         }
 
-        // 3 排序
-        Map<String, String> sceneFeatureMap = new HashMap<>(0);
 
-        List<RankItem> items = ScorerUtils.getScorerPipeline("feeds_score_config_20240807.conf")
-                .scoring(sceneFeatureMap, userFeatureMap, rankItems);
-        String redisScoreKey =  mergeWeight.getOrDefault("redisScoreKey", 0.0) < 0.5 ? "redis:vid_hasreturn_rov:" : "redis:vid_hasreturn_rov_7d:";
-        Map<String, Map<String, String>> vid2MapFeature = this.getVideoRedisFeature(vids, redisScoreKey);
-        List<Video> result = new ArrayList<>();
-        String hasReturnRovKey = mergeWeight.getOrDefault("hasReturnRovKey", 1.0) < 0.5 ? "rate_1" : "rate_n";
-        Double chooseFunction = mergeWeight.getOrDefault("chooseFunction", 0.0);
+        // vovh24特征
+        String partition = redisTemplate.opsForValue().get("redis:vid_vovh24pred_time:partition");
+        Map<String, Map<String, String>> vid2VovFeatureMap = this.getVideoRedisFeature(vids, "redis:vid_vovh24pred_time:" + partition + ":");
+        for (RankItem rankItem : rankItems) {
+            if (vid2VovFeatureMap.containsKey(String.valueOf(rankItem.getVideoId()))) {
+                rankItem.getFeatureMap().putAll(vid2VovFeatureMap.get(String.valueOf(rankItem.getVideoId())));
+            }
+        }
 
+        // 4 排序模型计算
+        Map<String, String> sceneFeatureMap = new HashMap<>(0);
+        sceneFeatureMap.put("weightKey", partition.substring(partition.length() - 2));
+        List<RankItem> items = ScorerUtils.getScorerPipeline("feeds_score_config_20241107.conf").scoring(sceneFeatureMap, userFeatureMap, rankItems);
+        // 5 排序公式特征
+        Map<String, Map<String, String>> vid2MapFeature = this.getVideoRedisFeature(vids, "redis:vid_hasreturn_rov:");
+        double alpha_vov = mergeWeight.getOrDefault("alpha_vov", 0.1);
+        double func = mergeWeight.getOrDefault("func", 1.0);
+        List<Video> result = new ArrayList<>();
         for (RankItem item : items) {
+            item.getScoresMap().put("alpha_vov", alpha_vov);
             double score = 0.0;
-            double hasReturnRovScore = Double.parseDouble(vid2MapFeature.getOrDefault(item.getVideoId() + "", new HashMap<>())
-                    .getOrDefault(hasReturnRovKey, "0"));
-            item.getScoresMap().put("hasReturnRovScore", hasReturnRovScore);
             double fmRovOrigin = item.getScoreRov();
             item.getScoresMap().put("fmRovOrigin", fmRovOrigin);
             double fmRov = restoreScore(fmRovOrigin);
             item.getScoresMap().put("fmRov", fmRov);
-            if (chooseFunction == 0){
-                score = fmRov * (1 + hasReturnRovScore);
-            }else if (chooseFunction == 1){
-                score = fmRov * (1 + Math.log(hasReturnRovScore + 1));
-            }else {
-                score = fmRov * ExtractorUtils.sigmoid(hasReturnRovScore);
+            double hasReturnRovScore = Double.parseDouble(vid2MapFeature.getOrDefault(item.getVideoId() + "", new HashMap<>()).getOrDefault("rate_n", "0"));
+            item.getScoresMap().put("hasReturnRovScore", hasReturnRovScore);
+            double vovScore = item.getVovScore();
+            item.getScoresMap().put("vovScore", vovScore);
+            if (func == 1) {
+                score = fmRov * (1 + hasReturnRovScore) + alpha_vov * vovScore;
+            } else {
+                score = fmRov * (1 + hasReturnRovScore) * (1.0 + alpha_vov * vovScore);
             }
 
             Video video = item.getVideo();
@@ -373,42 +383,6 @@ public class RankStrategy4RegionMergeModelV563 extends RankStrategy4RegionMergeM
             result.add(video);
         }
         result.sort(Comparator.comparingDouble(o -> -o.getSortScore()));
-
         return result;
     }
-
-    public void readBucketFile() {
-        InputStream resourceStream = RankStrategy4RegionMergeModelV552.class.getClassLoader().getResourceAsStream("20240609_bucket_274.txt");
-        if (resourceStream != null) {
-            try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceStream))) {
-                Map<String, double[]> bucketsMap = new HashMap<>();
-                Map<String, Double> bucketsLen = new HashMap<>();
-                String line;
-                while ((line = reader.readLine()) != null) {
-                    // 替换空格和换行符,过滤空行
-                    line = line.replace(" ", "").replaceAll("\n", "");
-                    if (!line.isEmpty()) {
-                        String[] rList = line.split("\t");
-                        if (rList.length == 3) {
-                            String key = rList[0];
-                            double value1 = Double.parseDouble(rList[1]);
-                            bucketsLen.put(key, value1);
-                            double[] value2 = Arrays.stream(rList[2].split(","))
-                                    .mapToDouble(Double::valueOf)
-                                    .toArray();
-                            bucketsMap.put(key, value2);
-                        }
-                    }
-                }
-                this.bucketsMap = bucketsMap;
-                this.bucketsLen = bucketsLen;
-            } catch (IOException e) {
-                log.error("something is wrong in parse bucket file:" + e);
-            }
-        } else {
-            log.error("no bucket file");
-        }
-    }
-
-
 }

+ 1 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/ScorerUtils.java

@@ -38,6 +38,7 @@ public final class ScorerUtils {
         ScorerUtils.init("feeds_score_config_20240711.conf");
         ScorerUtils.init("feeds_score_config_20240806.conf");
         ScorerUtils.init("feeds_score_config_20240807.conf");
+        ScorerUtils.init("feeds_score_config_20241107.conf");
         ScorerUtils.init("feeds_score_config_xgb_20240828.conf");
         ScorerUtils.init4Recall("feeds_recall_config_region_v1.conf");
         ScorerUtils.init4Recall("feeds_recall_config_region_v1_vov.conf");

+ 190 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/VovH24WeightScorer.java

@@ -0,0 +1,190 @@
+package com.tzld.piaoquan.recommend.server.service.score;
+
+import com.tzld.piaoquan.recommend.feature.domain.video.base.UserFeature;
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.service.score.model.VovH24WeightModel;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+public class VovH24WeightScorer extends AbstractScorer {
+
+    private static final int LOCAL_TIME_OUT = 150;
+    private final static Logger LOGGER = LoggerFactory.getLogger(VlogRovFMScorer.class);
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(128);
+
+    private static final Map<String, String> numeratorAndDenominatorMap = new HashMap<String, String>(32) {{
+        put("1_vovh0分子", "1_vovh分母");
+        put("2_vovh0分子", "2_vovh分母");
+        put("2_vovh1分子", "2_vovh分母");
+        put("3_vovh0分子", "3_vovh分母");
+        put("3_vovh1分子", "3_vovh分母");
+        put("3_vovh2分子", "3_vovh分母");
+        put("4_vovh0分子", "4_vovh分母");
+        put("4_vovh1分子", "4_vovh分母");
+        put("4_vovh3分子", "4_vovh分母");
+        put("7_vovh0分子", "7_vovh分母");
+        put("7_vovh1分子", "7_vovh分母");
+        put("7_vovh6分子", "7_vovh分母");
+        put("13_vovh0分子", "13_vovh分母");
+        put("13_vovh1分子", "13_vovh分母");
+        put("13_vovh12分子", "13_vovh分母");
+        put("25_vovh0分子", "25_vovh分母");
+        put("25_vovh1分子", "25_vovh分母");
+        put("25_vovh24分子", "25_vovh分母");
+        put("1_vovd0分子", "1_vovd分母");
+        put("2_vovd0分子", "2_vovd分母");
+        put("2_vovd1分子", "2_vovd分母");
+        put("3_vovd0分子", "3_vovd分母");
+        put("3_vovd1分子", "3_vovd分母");
+        put("3_vovd2分子", "3_vovd分母");
+    }};
+
+    public VovH24WeightScorer(ScorerConfigInfo scorerConfigInfo) {
+        super(scorerConfigInfo);
+    }
+
+    @Override
+    public void loadModel() {
+        doLoadModel(VovH24WeightModel.class);
+    }
+
+    @Override
+    public List<RankItem> scoring(ScoreParam param, UserFeature userFeature, List<RankItem> rankItems) {
+        throw new NoSuchMethodError();
+    }
+
+    @Override
+    public List<RankItem> scoring(final Map<String, String> sceneFeatureMap,
+                                  final Map<String, String> userFeatureMap,
+                                  final List<RankItem> rankItems) {
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        VovH24WeightModel model = (VovH24WeightModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankByJava(
+                sceneFeatureMap, userFeatureMap, rankItems
+        );
+
+        LOGGER.debug("vovh24 scorer time java items size={}, time={} ",
+                result.size(), System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final Map<String, String> sceneFeatureMap,
+                                      final Map<String, String> userFeatureMap,
+                                      final List<RankItem> items) {
+        long startTime = System.currentTimeMillis();
+        VovH24WeightModel model = (VovH24WeightModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userFeatureMap, sceneFeatureMap, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (RankItem item : items) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", item, item);
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("[vovh24 scorer time java] items size={}, cost={} ",
+                items.size(), System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final Map<String, String> userFeatureMap,
+                                  final Map<String, String> sceneFeatureMap,
+                                  final VovH24WeightModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userFeatureMap, sceneFeatureMap);
+                    } catch (Exception e) {
+                        LOGGER.error("vovh24 scorer exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        // 等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException: ", e);
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {}, ", sceneFeatureMap.size(), e);
+                }
+            }
+        }
+    }
+
+    public double calcScore(final VovH24WeightModel model,
+                            final RankItem item,
+                            final Map<String, String> userFeatureMap,
+                            final Map<String, String> sceneFeatureMap) {
+
+        double vovScore = 0.0;
+        Map<String, String> featureMap = item.getFeatureMap();
+        String weightKey = sceneFeatureMap.get("weightKey");
+        Map<String, Double> weightMap = model.getWeight(weightKey);
+        if (MapUtils.isNotEmpty(featureMap) && MapUtils.isNotEmpty(weightMap)) {
+            try {
+                double numerator = 0d;
+                double denominator = 0d;
+                for (Map.Entry<String, String> entry : numeratorAndDenominatorMap.entrySet()) {
+                    String numeratorKey = entry.getKey();
+                    String denominatorKey = entry.getValue();
+
+                    double up = Double.parseDouble(featureMap.getOrDefault(numeratorKey, "0d"));
+                    double down = Double.parseDouble(featureMap.getOrDefault(denominatorKey, "0d"));
+                    double weight = weightMap.getOrDefault(numeratorKey, 0d);
+
+                    numerator += up * weight;
+                    denominator += down * weight;
+
+                }
+
+                item.getScoresMap().put("numerator", numerator);
+                item.getScoresMap().put("denominator", denominator);
+                vovScore = denominator != 0.0 ? numerator / denominator : 0.0;
+            } catch (Exception e) {
+                LOGGER.error("vovh24 scorer error for doc={} exception={}", item.getVideoId(), ExceptionUtils.getFullStackTrace(e));
+            }
+        }
+        item.getScoresMap().put("vovScore", vovScore);
+        item.setVovScore(vovScore);
+        return vovScore;
+    }
+}

+ 46 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/model/VovH24WeightModel.java

@@ -0,0 +1,46 @@
+package com.tzld.piaoquan.recommend.server.service.score.model;
+
+import com.google.common.reflect.TypeToken;
+import com.tzld.piaoquan.recommend.server.util.JSONUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class VovH24WeightModel extends Model {
+    private static final Logger LOGGER = LoggerFactory.getLogger(VovH24WeightModel.class);
+
+    // <小时, <Key, weight>>
+    private Map<String, Map<String, Double>> model = new HashMap<>();
+
+    @Override
+    public int getModelSize() {
+        return model.size();
+    }
+
+    @Override
+    public boolean loadFromStream(InputStreamReader in) throws Exception {
+        Map<String, Map<String, Double>> initModel = new HashMap<>();
+        try (BufferedReader reader = new BufferedReader(in)) {
+            StringBuilder buffer = new StringBuilder();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                buffer.append(line).append("\n");
+            }
+            initModel = JSONUtils.fromJson(buffer.toString(), new TypeToken<Map<String, Map<String, Double>>>() {
+            }, initModel);
+            this.model = initModel;
+        } catch (Exception e) {
+            LOGGER.error("read vovh24 weight file error: ", e);
+            return false;
+        }
+        return true;
+    }
+
+    public Map<String, Double> getWeight(String key) {
+        return model.get(key);
+    }
+}

+ 13 - 0
recommend-server-service/src/main/resources/feeds_score_config_20241107.conf

@@ -0,0 +1,13 @@
+scorer-config = {
+  rov-score-config = {
+    scorer-name = "com.tzld.piaoquan.recommend.server.service.score.VlogRovFMScorer"
+    scorer-priority = 96
+    model-path = "zhangbo/model_aka8_new2.txt"
+  }
+
+  vov-score-config = {
+    scorer-name = "com.tzld.piaoquan.recommend.server.service.score.VovH24WeightScorer"
+    scorer-priority = 95
+    model-path = "zhangbo/vovh24_hour_weight.json"
+  }
+}