Pārlūkot izejas kodu

homepage recommend

丁云鹏 1 gadu atpakaļ
vecāks
revīzija
5515437f8b

+ 32 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/model/TripleConsumer.java

@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
+ * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ */
+package com.tzld.piaoquan.recommend.server.model;
+
+@FunctionalInterface
+public interface TripleConsumer<A, B, C> {
+
+    void accept(A a, B b, C c);
+
+}

+ 5 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/model/Video.java

@@ -12,9 +12,14 @@ public class Video {
     private double sortScore;
     private String pushFrom;
     private String abCode;
+
+    // 流量池相关 start
     private String flowPool;
     private String level;
     private String flowPoolAbtestGroup;
     private boolean inFlowPool;
+    // 流量池相关 end
+
     private double rand;
+    private String lastVideoKey;
 }

+ 52 - 82
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -22,20 +22,20 @@ import com.tzld.piaoquan.recommend.server.service.recall.strategy.Region24HRecal
 import com.tzld.piaoquan.recommend.server.service.recall.strategy.RegionHRecallStrategy;
 import com.tzld.piaoquan.recommend.server.service.recall.strategy.RegionRelative24HDupRecallStrategy;
 import com.tzld.piaoquan.recommend.server.service.recall.strategy.RegionRelative24HRecallStrategy;
-import com.tzld.piaoquan.recommend.server.util.DateUtils;
+import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils;
 import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.redis.core.RedisTemplate;
-import org.springframework.data.redis.core.ZSetOperations;
 import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
 
 import javax.annotation.PostConstruct;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
@@ -113,7 +113,7 @@ public class RecommendService {
                     .setAbCode(Strings.nullToEmpty(videos.get(i).getAbCode()))
                     .setVideoId(videos.get(i).getVideoId())
                     .setRovScore(videos.get(i).getRovScore())
-                    .setSortScore(videos.get(i).getRovScore())
+                    .setSortScore(videos.get(i).getSortScore())
                     .setFlowPool(Strings.nullToEmpty(videos.get(i).getFlowPool()))
                     .setIsInFlowPool(videos.get(i).isInFlowPool() ? 1 : 0)
                     .setRand(videos.get(i).getRand())
@@ -128,54 +128,48 @@ public class RecommendService {
     }
 
     private RecommendResponse specialMidRecommend(RecommendRequest request) {
-        String keyNamePrefix = "special:videos:item:";
-        String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
-        String specialKeyName = keyNamePrefix + dateStr;
-        if (!redisTemplate.hasKey(specialKeyName)) {
-            dateStr = DateUtils.getBeforeDaysDateStr("yyyyMMdd", 1);
-            specialKeyName = keyNamePrefix + dateStr;
-        }
+        RecallParam recallParam = new RecallParam();
+        recallParam.setAppType(request.getAppType());
+        recallParam.setMid(request.getMid());
+        recallParam.setSpecialRecommend(true);
+        recallParam.setSize(request.getSize());
 
-        String lastSpecialRecallKey = String.format("recall:last:special:%s:%s:%s", request.getAppType(), request.getMid(), dateStr);
-        String value = redisTemplate.opsForValue().get(lastSpecialRecallKey);
-        Long idx = 0L;
-        if (StringUtils.isNotBlank(value)) {
-            idx = redisTemplate.opsForZSet().reverseRank(specialKeyName, value);
-            if (idx == null) {
-                idx = 0L;
-            }
-        }
+        RecallResult recallResult = recallService.recall(recallParam);
+        log.info("recallResult {}", recallResult);
 
-        int getSize = request.getSize() * 5;
-        int freq = 0;
-        List<VideoProto> results = new ArrayList<>();
-        while (results.size() < request.getSize()) {
-            freq += 1;
-            if (freq > 2) {
-                break;
-            }
-            Set<ZSetOperations.TypedTuple<String>> data = redisTemplate.opsForZSet().reverseRangeWithScores(specialKeyName, idx, idx + getSize - 1);
-            if (CollectionUtils.isEmpty(data)) {
-                break;
-            }
-            idx += getSize;
-            data.stream().forEach(t ->
-                    results.add(VideoProto.newBuilder()
-                            .setVideoId(Long.getLong(t.getValue(), 0L))
-                            .setRovScore(t.getScore())
-                            .setAbCode("99999")
-                            .setPushFrom("special_mid_videos")
-                            .build())
-            );
+        RankParam rankParam = new RankParam();
+        rankParam.setRecallResult(recallResult);
+        rankParam.setSize(request.getSize());
+        rankParam.setSpecialRecommend(true);
+
+        RankResult rankResult = rankService.rank(rankParam);
+        log.info("rankResult {}", rankResult);
+
+        if (rankResult == null || CollectionUtils.isEmpty(rankResult.getVideos())) {
+            return RecommendResponse.newBuilder()
+                    .setResult(Result.newBuilder().setCode(1).setMessage("success"))
+                    .build();
         }
 
-        if (StringUtils.isNotBlank(request.getMid()) && !CollectionUtils.isEmpty(results)) {
-            redisTemplate.opsForValue().set(lastSpecialRecallKey, String.valueOf(results.get(results.size() - 1).getVideoId()), 1, TimeUnit.DAYS);
+        // 只返回size条数据
+        List<Video> videos = rankResult.getVideos();
+        if (recallParam.getSize() < rankResult.getVideos().size()) {
+            videos = rankResult.getVideos().subList(0, recallParam.getSize());
+        }
+        if (StringUtils.isNotBlank(request.getMid()) && !CollectionUtils.isEmpty(videos)) {
+            Video lastVideo = videos.get(videos.size() - 1);
+            redisTemplate.opsForValue().set(lastVideo.getLastVideoKey(), String.valueOf(lastVideo.getVideoId()), 1
+                    , TimeUnit.DAYS);
         }
 
         return RecommendResponse.newBuilder()
                 .setResult(Result.newBuilder().setCode(1).setMessage("success"))
-                .addAllVideo(results)
+                .addAllVideo(CommonCollectionUtils.toList(videos, v -> VideoProto.newBuilder()
+                        .setPushFrom(Strings.nullToEmpty(v.getPushFrom()))
+                        .setAbCode(Strings.nullToEmpty(v.getAbCode()))
+                        .setVideoId(v.getVideoId())
+                        .setRovScore(v.getRovScore())
+                        .build()))
                 .build();
     }
 
@@ -344,7 +338,8 @@ public class RecommendService {
         }
 
         preViewedService.updateCache(request.getAppType(), request.getMid(), videos);
-        updateLastVideoCache(request, param, videos);
+        updateLastVideoCache(videos);
+
         updateFlowPoolCache(request, param, videos);
 
     }
@@ -379,47 +374,22 @@ public class RecommendService {
         }
     }
 
-    private void updateLastVideoCache(RecommendRequest request, RecommendParam param,
-                                      List<Video> videos) {
-        // 2 地域小时最后一个视频
-        for (int i = videos.size() - 1; i >= 0; i--) {
-            if (videos.get(i).getPushFrom().equals(RegionHRecallStrategy.PUSH_FORM)) {
-                redisTemplate.opsForValue().set(String.format(RegionHRecallStrategy.LAST_VIDEO_KEY_FORMAT,
-                                request.getAppType(), request.getMid()), String.valueOf(videos.get(i).getVideoId()),
-                        24, TimeUnit.HOURS);
-                break;
-            }
-        }
-
-        // 3 地域24小时最后一个视频
-        for (int i = videos.size() - 1; i >= 0; i--) {
-            if (videos.get(i).getPushFrom().equals(Region24HRecallStrategy.PUSH_FORM)) {
-                redisTemplate.opsForValue().set(String.format(Region24HRecallStrategy.LAST_VIDEO_KEY_FORMAT,
-                                request.getAppType(), request.getMid()), String.valueOf(videos.get(i).getVideoId()),
-                        24, TimeUnit.HOURS);
-                break;
-            }
-        }
+    private void updateLastVideoCache(List<Video> videos) {
 
-        // 4 地域相对24小时最后一个视频
-        for (int i = videos.size() - 1; i >= 0; i--) {
-            if (videos.get(i).getPushFrom().equals(RegionRelative24HRecallStrategy.PUSH_FORM)) {
-                redisTemplate.opsForValue().set(String.format(RegionRelative24HRecallStrategy.LAST_VIDEO_KEY_FORMAT,
-                                request.getAppType(), request.getMid()), String.valueOf(videos.get(i).getVideoId()),
-                        24, TimeUnit.HOURS);
-                break;
+        Consumer<String> consumer = (p) -> {
+            for (int i = videos.size() - 1; i >= 0; i--) {
+                if (videos.get(i).getPushFrom().equals(p)) {
+                    redisTemplate.opsForValue().set(videos.get(i).getLastVideoKey(), String.valueOf(videos.get(i).getVideoId()),
+                            24, TimeUnit.HOURS);
+                    break;
+                }
             }
-        }
+        };
 
-        // 5 地域相对24小时最后一个视频
-        for (int i = videos.size() - 1; i >= 0; i--) {
-            if (videos.get(i).getPushFrom().equals(RegionRelative24HDupRecallStrategy.PUSH_FORM)) {
-                redisTemplate.opsForValue().set(String.format(RegionRelative24HDupRecallStrategy.LAST_VIDEO_KEY_FORMAT,
-                                request.getAppType(), request.getMid()), String.valueOf(videos.get(i).getVideoId()),
-                        24, TimeUnit.HOURS);
-                break;
-            }
-        }
+        consumer.accept(RegionHRecallStrategy.PUSH_FORM);
+        consumer.accept(Region24HRecallStrategy.PUSH_FORM);
+        consumer.accept(RegionRelative24HRecallStrategy.PUSH_FORM);
+        consumer.accept(RegionRelative24HDupRecallStrategy.PUSH_FORM);
     }
 
 }

+ 16 - 40
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/flowpool/FlowPoolService.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.service.flowpool;
 
+import com.tzld.piaoquan.recommend.server.model.TripleConsumer;
 import com.tzld.piaoquan.recommend.server.model.Video;
 import com.tzld.piaoquan.recommend.server.service.ThreadPoolFactory;
 import lombok.extern.slf4j.Slf4j;
@@ -70,6 +71,14 @@ public class FlowPoolService {
     }
 
     private void asyncDelDistributeCountWithLevel(Map<Long, String> videoFlowPoolMap) {
+        asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
+            redisTemplate.opsForSet().remove(String.format(KEY_WITH_LEVEL_FORMAT, appType, level), values);
+            redisTemplate.opsForSet().remove(String.format(KEY_QUICK_WITH_LEVEL_FORMAT, appType), values);
+        });
+    }
+
+    private void asyncDelDistributeCount(Map<Long, String> videoFlowPoolMap,
+                                         TripleConsumer<Integer, String, List<String>> flowPoolRemoveConsumer) {
         if (MapUtils.isEmpty(videoFlowPoolMap)) {
             return;
         }
@@ -85,8 +94,7 @@ public class FlowPoolService {
                     .collect(Collectors.toList());
             for (String level : levelWeight.keySet()) {
                 for (int appType : appTypes) {
-                    redisTemplate.opsForSet().remove(String.format(KEY_WITH_LEVEL_FORMAT, appType, level), values);
-                    redisTemplate.opsForSet().remove(String.format(KEY_QUICK_WITH_LEVEL_FORMAT, appType), values);
+                    flowPoolRemoveConsumer.accept(appType, level, values);
                 }
             }
         });
@@ -120,25 +128,9 @@ public class FlowPoolService {
     }
 
     private void asyncLocalDistributeCountWithLevelScore(Map<Long, String> videoFlowPoolMap) {
-        if (MapUtils.isEmpty(videoFlowPoolMap)) {
-            return;
-        }
-        pool.execute(() -> {
-            List<String> keys = videoFlowPoolMap.entrySet().stream()
-                    .map(v -> String.format(localDistributeCountFormat, v.getKey(), v.getValue()))
-                    .collect(Collectors.toList());
-            redisTemplate.delete(keys);
-
-            Map<String, Double> levelWeight = flowPoolConfigService.getLevelWeight();
-            List<String> values = videoFlowPoolMap.entrySet().stream()
-                    .map(v -> String.format(valueFormat, v.getKey(), v.getValue()))
-                    .collect(Collectors.toList());
-            for (String level : levelWeight.keySet()) {
-                for (int appType : appTypes) {
-                    redisTemplate.opsForZSet().remove(String.format(KEY_WITH_LEVEL_SCORE_FORMAT, appType, level), values);
-                    redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT, appType), values);
-                }
-            }
+        asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
+            redisTemplate.opsForZSet().remove(String.format(KEY_WITH_LEVEL_SCORE_FORMAT, appType, level), values);
+            redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT, appType), values);
         });
     }
 
@@ -167,25 +159,9 @@ public class FlowPoolService {
     }
 
     private void asyncDelDistributeCountWithScore(Map<Long, String> videoFlowPoolMap) {
-        if (MapUtils.isEmpty(videoFlowPoolMap)) {
-            return;
-        }
-        pool.execute(() -> {
-            List<String> keys = videoFlowPoolMap.entrySet().stream()
-                    .map(v -> String.format(localDistributeCountFormat, v.getKey(), v.getValue()))
-                    .collect(Collectors.toList());
-            redisTemplate.delete(keys);
-
-            Map<String, Double> levelWeight = flowPoolConfigService.getLevelWeight();
-            List<String> values = videoFlowPoolMap.entrySet().stream()
-                    .map(v -> String.format(valueFormat, v.getKey(), v.getValue()))
-                    .collect(Collectors.toList());
-            for (String level : levelWeight.keySet()) {
-                for (int appType : appTypes) {
-                    redisTemplate.opsForZSet().remove(String.format(KEY_WITH_SCORE_FORMAT, appType, level), values);
-                    redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_SCORE_FORMAT, appType), values);
-                }
-            }
+        asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
+            redisTemplate.opsForZSet().remove(String.format(KEY_WITH_SCORE_FORMAT, appType, level), values);
+            redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_SCORE_FORMAT, appType), values);
         });
     }
 

+ 1 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankParam.java

@@ -15,4 +15,5 @@ public class RankParam {
     private double flowPoolP;
     private String abCode;
     private int appType;
+    private boolean specialRecommend;
 }

+ 11 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java

@@ -27,13 +27,23 @@ public class RankService {
     private RedisTemplate<String, String> redisTemplate;
 
     public RankResult rank(RankParam param) {
-
         if (param == null
                 || param.getRecallResult() == null
                 || CollectionUtils.isEmpty(param.getRecallResult().getData())) {
             return null;
         }
 
+        if (param.isSpecialRecommend()) {
+            Optional<RecallResult.RecallData> data = param.getRecallResult().getData().stream()
+                    .filter(d -> d.getPushForm().equals(SpecialRecallStrategy.PUSH_FROM))
+                    .findFirst();
+            if (data.isPresent()
+                    && data.get() != null) {
+                return new RankResult(data.get().getVideos());
+            }
+            return null;
+        }
+
         List<Video> rovRecallRank = mergeAndRankRovRecall(param);
         log.info("mergeAndRankRovRecall rovRecallRank={}", JSONUtils.toJson(rovRecallRank));
         List<Video> flowPoolRank = mergeAndRankFlowPoolRecall(param);

+ 1 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallParam.java

@@ -18,5 +18,6 @@ public class RecallParam {
     private String flowPoolAbtestGroup;
     private Long videoId;
     private String uid;
+    private boolean specialRecommend;
 
 }

+ 5 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -80,7 +80,11 @@ public class RecallService implements ApplicationContextAware {
 
     private List<RecallStrategy> getRecallStrategy(RecallParam param) {
         List<RecallStrategy> strategies = new ArrayList<>();
-
+        if (param.isSpecialRecommend()) {
+            strategies.add(strategyMap.get(SpecialRecallStrategy.class.getSimpleName()));
+            return strategies;
+        }
+        
         if (param.getAppType() == AppTypeEnum.LAO_HAO_KAN_VIDEO.getCode()
                 || param.getAppType() == AppTypeEnum.ZUI_JING_QI.getCode()) {
             strategies.addAll(getRegionRecallStrategy(param));

+ 7 - 9
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractRegionRecallStrategy.java

@@ -51,9 +51,6 @@ public abstract class AbstractRegionRecallStrategy implements RecallStrategy {
             Map<String, String> recordMap = JSONUtils.fromJson(record, new TypeToken<Map<String, String>>() {
             }, Collections.emptyMap());
             if (MapUtils.isNotEmpty(recordMap)) {
-//                redisTemplate.delete(lastVideoKey);
-//                poolKey = updateLastVideoRecord(recordKey, param);
-//            } else {
                 String record_dt = recordMap.get("date");
                 String record_h = recordMap.get("h");
                 String now_dt = DateUtils.getCurrentDateStr("yyyyMMdd");
@@ -114,12 +111,13 @@ public abstract class AbstractRegionRecallStrategy implements RecallStrategy {
 
             if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
                 filterResult.getVideoIds().stream().forEach(vid -> {
-                    Video recallData = new Video();
-                    recallData.setVideoId(vid);
-                    recallData.setAbCode(param.getAbCode());
-                    recallData.setRovScore(videoMap.get(vid));
-                    recallData.setPushFrom(pushFrom());
-                    results.add(recallData);
+                    Video video = new Video();
+                    video.setVideoId(vid);
+                    video.setAbCode(param.getAbCode());
+                    video.setRovScore(videoMap.get(vid));
+                    video.setPushFrom(pushFrom());
+                    video.setLastVideoKey(lastVideoKey);
+                    results.add(video);
                 });
             }
         }

+ 81 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/SpecialRecallStrategy.java

@@ -0,0 +1,81 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
+import com.tzld.piaoquan.recommend.server.util.DateUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.ZSetOperations;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author dyp
+ */
+@Service
+@Slf4j
+public class SpecialRecallStrategy implements RecallStrategy {
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
+    public static final String LAST_VIDEO_KEY_FORMAT = "recall:last:special:%s:%s:%s";
+    public static final String PUSH_FROM = "special_mid_videos";
+
+    @Override
+    public List<Video> recall(RecallParam param) {
+        String keyNamePrefix = "special:videos:item:";
+        String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
+        String specialKeyName = keyNamePrefix + dateStr;
+        if (!redisTemplate.hasKey(specialKeyName)) {
+            dateStr = DateUtils.getBeforeDaysDateStr("yyyyMMdd", 1);
+            specialKeyName = keyNamePrefix + dateStr;
+        }
+
+        String lastSpecialRecallKey = String.format(LAST_VIDEO_KEY_FORMAT, param.getAppType(), param.getMid(), dateStr);
+        String value = redisTemplate.opsForValue().get(lastSpecialRecallKey);
+        Long idx = 0L;
+        if (StringUtils.isNotBlank(value)) {
+            idx = redisTemplate.opsForZSet().reverseRank(specialKeyName, value);
+            if (idx == null) {
+                idx = 0L;
+            }
+        }
+
+        int getSize = param.getSize() * 5;
+        int freq = 0;
+        List<Video> results = new ArrayList<>();
+        while (results.size() < param.getSize()) {
+            freq += 1;
+            if (freq > 2) {
+                break;
+            }
+            Set<ZSetOperations.TypedTuple<String>> data = redisTemplate.opsForZSet().reverseRangeWithScores(specialKeyName, idx, idx + getSize - 1);
+            if (CollectionUtils.isEmpty(data)) {
+                break;
+            }
+            idx += getSize;
+            data.stream().forEach(t -> {
+                Video video = new Video();
+                video.setVideoId(Long.getLong(t.getValue(), 0L));
+                video.setRovScore(t.getScore());
+                video.setAbCode("99999");
+                video.setPushFrom(PUSH_FROM);
+                video.setLastVideoKey(lastSpecialRecallKey);
+                results.add(video);
+            });
+        }
+        return results;
+    }
+
+    @Override
+    public String pushFrom() {
+        return PUSH_FROM;
+    }
+}