浏览代码

ADD: 接入历史Rank逻辑

sunxy 1 年之前
父节点
当前提交
63e1929f69

+ 164 - 48
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/FlowPoolRecommendPipeline.java

@@ -1,84 +1,200 @@
 package com.tzld.piaoquan.recommend.server.implement;
 
 
+import com.tzld.piaoquan.recommend.feature.domain.video.base.RequestContext;
+import com.tzld.piaoquan.recommend.feature.domain.video.base.UserFeature;
+import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.common.base.RankItem;
-import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
 import com.tzld.piaoquan.recommend.server.framework.common.User;
-import com.tzld.piaoquan.recommend.server.framework.merger.MergeUtils;
-import com.tzld.piaoquan.recommend.server.framework.merger.SimilarityUtils;
-import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueue;
-import com.tzld.piaoquan.recommend.server.framework.recaller.BaseRecaller;
-import com.tzld.piaoquan.recommend.server.framework.recaller.provider.RedisBackedQueue;
-import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionExtractorPipeline;
-import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionExtractorUtils;
-import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+import com.tzld.piaoquan.recommend.server.model.MachineInfo;
+import com.tzld.piaoquan.recommend.server.model.RecommendParam;
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.RecommendService;
+import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants;
+import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallResult;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.FlowPoolWithLevelRecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.FlowPoolWithLevelScoreRecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.FlowPoolWithScoreRecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.score.ScoreParam;
+import com.tzld.piaoquan.recommend.server.service.score.ScorerUtils;
+import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils;
+import org.apache.commons.collections4.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 @Service
-public class FlowPoolRecommendPipeline {
+public class FlowPoolRecommendPipeline implements ApplicationContextAware {
 
-    public static final String MERGE_CONF = "flow_merge_config.conf";
+    private final Logger log = LoggerFactory.getLogger(FlowPoolRecommendPipeline.class);
 
-    public static final String PREFIX = "";
+    private final static Map<String, RecallStrategy> strategyMap = new HashMap<>();
 
     @Resource
-    private RedisSmartClient client;
+    private RecommendService recommendService;
 
-    private List<RankItem> feedByRec(final RecommendRequest requestData,
-                                     final int requestIndex,
-                                     final User userInfo) {
-        int recallNum = 200;
+    private ApplicationContext applicationContext;
 
-        // Step 1: Attention extraction
-        long timestamp = System.currentTimeMillis();
-        UserAttentionExtractorPipeline attentionExtractorPipeline = UserAttentionExtractorUtils.getAtttentionPipeline(UserAttentionExtractorUtils.BASE_CONF);
-        attentionExtractorPipeline.extractAttention(requestData, userInfo);
-
-        // Step 2: create top queue
-        StrategyQueue topQueue = MergeUtils.createTopQueue(MERGE_CONF, "top-queue");
+    private ExecutorService pool = ThreadPoolFactory.recallPool();
 
+    @PostConstruct
+    public void init() {
+        Map<String, RecallStrategy> type = applicationContext.getBeansOfType(RecallStrategy.class);
+        for (Map.Entry<String, RecallStrategy> entry : type.entrySet()) {
+            RecallStrategy value = entry.getValue();
+            strategyMap.put(value.getClass().getSimpleName(), value);
+        }
+    }
 
-        // Step 3: Candidate
-        Map<String, Candidate> candidates = new HashMap<String, Candidate>();
-        topQueue.candidate(candidates, recallNum, userInfo, requestData, 0, 0);
 
+    private List<Video> feedByRec(final RecommendRequest requestData,
+                                     final int recommendType) {
+        List<RecallStrategy> strategies = new ArrayList<>();
 
-        // Step 4: Recalling & Basic Scoring
-        RedisBackedQueue queueProvider = new RedisBackedQueue(client, 1000L);
+        RecommendParam param = recommendService.genRecommendParam(requestData, recommendType);
+        RecallParam recallParam = recommendService.convertToRecallParam(param);
+        RecallResult recallResult = getRecallResult(strategies, recallParam);
 
+        RankParam rankParam = recommendService.convertToRankParam(param, recallResult);
+        return mergeAndRankFlowPoolRecall(rankParam);
+    }
 
-        BaseRecaller recaller = new BaseRecaller(queueProvider);
-        List<RankItem> items = recaller.recalling(requestData, userInfo, requestIndex, new ArrayList<>(candidates.values()));
+    public List<Video> mergeAndRankFlowPoolRecall(RankParam param) {
+        List<Video> quickFlowPoolVideos = sortFlowPoolByThompson(param, FlowPoolConstants.QUICK_PUSH_FORM);
+        if (CollectionUtils.isNotEmpty(quickFlowPoolVideos)) {
+            return quickFlowPoolVideos;
+        } else {
+            return sortFlowPoolByThompson(param, FlowPoolConstants.PUSH_FORM);
+        }
+    }
 
+    public List<Video> sortFlowPoolByThompson(RankParam param, String pushFrom) {
+
+        //初始化 userid
+        UserFeature userFeature = new UserFeature();
+        userFeature.setMid(param.getMid());
+
+        // 初始化RankItem
+        Optional<RecallResult.RecallData> data = param.getRecallResult().getData().stream()
+                .filter(d -> d.getPushFrom().equals(pushFrom))
+                .findFirst();
+        List<Video> videoList = data.get().getVideos();
+        if (videoList == null) {
+            return Collections.emptyList();
+        }
+        List<RankItem> rankItems = new ArrayList<>();
+        for (int i = 0; i < videoList.size(); i++) {
+            RankItem rankItem = new RankItem(videoList.get(i));
+            rankItems.add(rankItem);
+        }
+
+        // 初始化上下文参数
+        ScoreParam scoreParam = convert(param);
+        List<RankItem> rovRecallScore = ScorerUtils.getScorerPipeline(ScorerUtils.FLOWPOOL_CONF)
+                .scoring(scoreParam, userFeature, rankItems);
+
+        if (rovRecallScore == null) {
+            return Collections.emptyList();
+        }
+
+        return CommonCollectionUtils.toList(rovRecallScore, i -> {
+            // hard code 将排序分数 赋值给video的sortScore
+            Video v = i.getVideo();
+            v.setSortScore(i.getScore());
+            return v;
+        });
+    }
 
-        // Step 4: Advance Scoring
-        timestamp = System.currentTimeMillis();
-//        ScorerPipeline scorerPipeline = getScorerPipeline(requestData);
-//        items = scorerPipeline.scoring(requestData, userInfo, requestIndex, items);
+    protected ScoreParam convert(RankParam param) {
+        ScoreParam scoreParam = new ScoreParam();
 
+        scoreParam.setMid(param.getMid());
 
-        // Step 5: Merger
-        MergeUtils.distributeItemsToMultiQueues(topQueue, items);
-        topQueue.merge(recallNum * 3, userInfo, requestData, requestIndex, 0);
+        // TODO hardcode 为了兼容写入逻辑
+        RequestContext context = new RequestContext();
+        context.setApptype(param.getAppType() + "");
 
-        // step 6:多样性融合
-        List<RankItem> mergeItems = topQueue.getItems();
-        MergeUtils.diversityRerank(mergeItems, SimilarityUtils.getIsSameUserTagOrCategoryFunc(), recallNum, 6, 2);
+        // TODO 地域转换
+        context.setRegion(param.getProvince());
+        context.setCity(param.getCity());
 
+        Calendar calendar = Calendar.getInstance();
+        context.setWeek((calendar.get(Calendar.DAY_OF_WEEK) + 6) % 7 + "");
+        context.setDay(new SimpleDateFormat("yyyyMMdd").format(calendar.getTime()));
+        context.setHour(new SimpleDateFormat("HH").format(calendar.getTime()));
 
-        // Step 6: Global Rank & subList
-        // TODO: Global Rank
+        MachineInfo machineInfo = param.getMachineInfo();
+        if (machineInfo != null) {
+            context.setMachineinfo_brand(machineInfo.getBrand());
+            context.setMachineinfo_model(machineInfo.getModel());
+            context.setMachineinfo_platform(machineInfo.getPlatform());
+            context.setMachineinfo_sdkversion(machineInfo.getSdkVersion());
+            context.setMachineinfo_system(machineInfo.getSystem());
+            context.setMachineinfo_wechatversion(machineInfo.getWechatVersion());
+        }
 
+        scoreParam.setRequestContext(context);
+        return scoreParam;
+    }
 
-        return items;
+    private RecallResult getRecallResult(List<RecallStrategy> strategies, RecallParam param) {
+        if (param.getFlowPoolAbtestGroup().equals(FlowPoolConstants.EXPERIMENTAL_FLOW_SET_LEVEL)) {
+            strategies.add(strategyMap.get(FlowPoolWithLevelRecallStrategy.class.getSimpleName()));
+        } else if (param.getFlowPoolAbtestGroup().equals(FlowPoolConstants.EXPERIMENTAL_FLOW_SET_LEVEL_SCORE)) {
+            strategies.add(strategyMap.get(FlowPoolWithLevelScoreRecallStrategy.class.getSimpleName()));
+        } else {
+            strategies.add(strategyMap.get(FlowPoolWithScoreRecallStrategy.class.getSimpleName()));
+        }
+
+        CountDownLatch cdl = new CountDownLatch(strategies.size());
+        List<Future<RecallResult.RecallData>> recallResultFutures = new ArrayList<>();
+        for (final RecallStrategy strategy : strategies) {
+            Future<RecallResult.RecallData> future = pool.submit(() -> {
+                List<Video> result = strategy.recall(param);
+                cdl.countDown();
+                return new RecallResult.RecallData(strategy.pushFrom(), result);
+            });
+            recallResultFutures.add(future);
+        }
+        try {
+            cdl.await(3000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.error("recall error", e);
+            return null;
+        }
+
+        List<RecallResult.RecallData> results = new ArrayList<>();
+        for (Future<RecallResult.RecallData> f : recallResultFutures) {
+            try {
+                RecallResult.RecallData data = f.get();
+                results.add(data);
+            } catch (Exception e) {
+                log.error("future get error ", e);
+            }
+        }
+
+        return new RecallResult(results);
     }
 
 
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
 }

+ 23 - 25
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/TopRecommendPipeline.java

@@ -13,16 +13,12 @@ import com.tzld.piaoquan.recommend.server.framework.recaller.provider.RedisBacke
 import com.tzld.piaoquan.recommend.server.framework.score.ScorerUtils;
 import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
-import com.tzld.piaoquan.recommend.server.model.Video;
-import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
 import com.tzld.piaoquan.recommend.server.service.rank.extractor.RankExtractorItemFeature;
 import com.tzld.piaoquan.recommend.server.service.rank.extractor.RankExtractorUserFeature;
 import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils;
 import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 
@@ -216,24 +212,25 @@ public class TopRecommendPipeline {
         if (videoRtFeatures != null){
             int j = 0;
             for (RankItem item: rankItems){
-                String vF = videoRtFeatures.get(j);
                 ++j;
-                if (vF == null){
-                    continue;
-                }
                 Map<String, String> vfMap = new HashMap<>();
                 Map<String, Map<String, Double>> vfMapNew = new HashMap<>();
-                try{
-                    vfMap = JSONUtils.fromJson(vF, new TypeToken<Map<String, String>>() {}, vfMap);
-                    for (Map.Entry<String, String> entry : vfMap.entrySet()){
+                try {
+                    String vF = videoRtFeatures.get(j);
+                    if (vF == null) {
+                        continue;
+                    }
+                    vfMap = JSONUtils.fromJson(vF, new TypeToken<Map<String, String>>() {
+                    }, vfMap);
+                    for (Map.Entry<String, String> entry : vfMap.entrySet()) {
                         String value = entry.getValue();
-                        if (value == null){
+                        if (value == null) {
                             continue;
                         }
-                        String [] var1 = value.split(",");
+                        String[] var1 = value.split(",");
                         Map<String, Double> tmp = new HashMap<>();
-                        for (String var2 : var1){
-                            String [] var3 = var2.split(":");
+                        for (String var2 : var1) {
+                            String[] var3 = var2.split(":");
                             tmp.put(var3[0], Double.valueOf(var3[1]));
                         }
                         vfMapNew.put(entry.getKey(), tmp);
@@ -246,24 +243,25 @@ public class TopRecommendPipeline {
                 item.getFeatureMap().putAll(f8);
             }
             for (RankItem item: rankItems){
-                String vF = videoRtFeatures.get(j);
                 ++j;
-                if (vF == null){
-                    continue;
-                }
                 Map<String, String> vfMap = new HashMap<>();
                 Map<String, Map<String, Double>> vfMapNew = new HashMap<>();
-                try{
-                    vfMap = JSONUtils.fromJson(vF, new TypeToken<Map<String, String>>() {}, vfMap);
+                try {
+                    String vF = videoRtFeatures.get(j);
+                    if (vF == null) {
+                        continue;
+                    }
+                    vfMap = JSONUtils.fromJson(vF, new TypeToken<Map<String, String>>() {
+                    }, vfMap);
 
-                    for (Map.Entry<String, String> entry : vfMap.entrySet()){
+                    for (Map.Entry<String, String> entry : vfMap.entrySet()) {
                         String value = entry.getValue();
-                        if (value == null){
+                        if (value == null) {
                             continue;
                         }
-                        String [] var1 = value.split(",");
+                        String[] var1 = value.split(",");
                         Map<String, Double> tmp = new HashMap<>();
-                        for (String var2 : var1){
+                        for (String var2 : var1) {
                             String [] var3 = var2.split(":");
                             tmp.put(var3[0], Double.valueOf(var3[1]));
                         }

+ 3 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -208,7 +208,7 @@ public class RecommendService {
                 .build();
     }
 
-    private RecommendParam genRecommendParam(RecommendRequest request, int recommendType) {
+    public RecommendParam genRecommendParam(RecommendRequest request, int recommendType) {
         RecommendParam param = new RecommendParam();
         param.setTopK(3);
         param.setFlowPoolP(0.3);
@@ -418,7 +418,7 @@ public class RecommendService {
         return videos;
     }
 
-    private RecallParam convertToRecallParam(RecommendParam param) {
+    public RecallParam convertToRecallParam(RecommendParam param) {
         RecallParam recallParam = new RecallParam();
         recallParam.setAppType(param.getAppType());
         // hard code 算法实验配置化之前,复用abcode做AB验证
@@ -470,7 +470,7 @@ public class RecommendService {
     }
 
 
-    private RankParam convertToRankParam(RecommendParam param, RecallResult recallResult) {
+    public RankParam convertToRankParam(RecommendParam param, RecallResult recallResult) {
         RankParam rankParam = new RankParam();
         rankParam.setRecallResult(recallResult);
         // hard code 算法实验配置化之前,复用abcode做AB验证

+ 249 - 5
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/ParserUtils.java

@@ -1,13 +1,14 @@
 package com.tzld.piaoquan.recommend.server.util;
+import com.alibaba.fastjson.JSONObject;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.List;
+import java.util.*;
+import java.util.stream.Collectors;
+
 public class ParserUtils {
 
     public static Map<Integer, List<String>> parseJsonForRiskRule(String s) {
@@ -46,7 +47,250 @@ public class ParserUtils {
     }
 
     public static void main(String[] args) {
-        String json = "{\"VLOG\": [\"北京\"]}";
+        List<Long> videoIdList = new ArrayList<>(Arrays.asList(9041871L,
+                9041870L,
+                9041869L,
+                9041868L,
+                9041867L,
+                9041866L,
+                9041865L,
+                9041864L,
+                9041863L,
+                9041862L,
+                9041861L,
+                9041860L,
+                9041859L,
+                9041858L,
+                9041857L,
+                9041856L,
+                9041855L,
+                9041854L,
+                9041853L,
+                9041852L,
+                9041851L,
+                9041850L,
+                9041849L,
+                9041848L,
+                9041847L,
+                9041846L,
+                9041845L,
+                9041844L,
+                9041843L,
+                9041842L,
+                9041841L,
+                9041840L,
+                9041839L,
+                9041838L,
+                9041837L,
+                9041836L,
+                9041835L,
+                9041834L,
+                9041833L,
+                9041832L,
+                9041831L,
+                9041830L,
+                9041829L,
+                9041828L,
+                9041827L,
+                9041826L,
+                9041825L,
+                9041824L,
+                9041823L,
+                9041822L,
+                9041821L,
+                9041820L,
+                9041819L,
+                9041818L,
+                9041817L,
+                9041816L,
+                9041815L,
+                9041814L,
+                9041813L,
+                9041812L,
+                9041811L,
+                9041810L,
+                9041809L,
+                9041808L,
+                9041807L,
+                9041806L,
+                9041805L,
+                9041804L,
+                9041803L,
+                9041802L,
+                9041801L,
+                9041800L,
+                9041799L,
+                9041798L,
+                9041797L,
+                9041796L,
+                9041795L,
+                9041794L,
+                9041793L,
+                9041792L,
+                9041791L,
+                9041790L,
+                9041789L,
+                9041788L,
+                9041787L,
+                9041786L,
+                9041785L,
+                9041784L,
+                9041783L,
+                9041782L,
+                9041781L,
+                9041780L,
+                9041779L,
+                9041778L,
+                9041777L,
+                9041776L,
+                9041775L,
+                9041774L,
+                9041773L,
+                9041772L,
+                9041771L,
+                9041770L,
+                9041769L,
+                9041768L,
+                9041767L,
+                9041766L,
+                9041765L,
+                9041764L,
+                9041763L,
+                9041762L,
+                9041761L,
+                9041760L,
+                9041759L,
+                9041758L,
+                9041757L,
+                9041756L,
+                9041755L,
+                9041754L,
+                9041753L,
+                9041752L,
+                9041751L,
+                9041750L,
+                9041749L,
+                9041748L,
+                9041747L,
+                9041746L,
+                9041745L,
+                9041744L,
+                9041743L,
+                9041742L,
+                9041741L,
+                9041740L,
+                9041739L,
+                9041738L,
+                9041737L,
+                9041736L,
+                9041735L,
+                9041734L,
+                9041733L,
+                9041732L,
+                9041731L,
+                9041730L,
+                9041729L,
+                9041728L,
+                9041727L,
+                9041726L,
+                9041725L,
+                9041724L,
+                9041723L,
+                9041722L,
+                9041721L,
+                9041720L,
+                9041719L,
+                9041718L,
+                9041717L,
+                9041716L,
+                9041715L,
+                9041714L,
+                9041713L,
+                9041712L,
+                9041711L,
+                9041710L,
+                9041709L,
+                9041708L,
+                9041707L,
+                9041706L,
+                9041705L,
+                9041704L,
+                9041703L,
+                9041702L,
+                9041701L,
+                9041700L,
+                9041699L,
+                9041698L,
+                9041697L,
+                9041696L,
+                9041695L,
+                9041694L,
+                9041693L,
+                9041692L,
+                9041691L,
+                9041690L,
+                9041689L,
+                9041688L,
+                9041687L,
+                9041686L,
+                9041685L,
+                9041684L,
+                9041683L,
+                9041682L,
+                9041681L,
+                9041680L,
+                9041679L,
+                9041678L,
+                9041677L,
+                9041676L,
+                9041675L,
+                9041674L,
+                9041673L,
+                9041672L,
+                9041671L,
+                9041670L,
+                9041669L,
+                9041668L,
+                9041667L,
+                9041666L,
+                9041665L,
+                9041664L,
+                9041663L,
+                9041662L,
+                9041661L,
+                9041660L,
+                9041659L,
+                9041658L,
+                9041657L,
+                9041656L,
+                9041655L,
+                9041654L,
+                9041653L,
+                9041652L,
+                9041651L,
+                9041650L,
+                9041649L,
+                9041648L,
+                9041647L,
+                9041646L,
+                9041645L,
+                9041644L,
+                9041643L,
+                9041642L,
+                9041641L,
+                9041640L,
+                9041639L,
+                9041638L,
+                9041637L,
+                9041636L,
+                9041635L,
+                9041634L,
+                9041633L,
+                9041632L
+        ));
+        List<List<Long>> partition = Lists.partition(videoIdList, 20);
+        partition.stream().map(list -> list.stream().map(videoId -> Arrays.asList(videoId + "", "0.5")) .collect(Collectors.toList()))
+                .forEach(list -> System.out.println(JSONObject.toJSONString(list)));
 
     }
 }

+ 5 - 5
recommend-server-service/src/main/resources/merge_config.conf

@@ -35,12 +35,12 @@ rule-config = {
   top-queue = {
     merge-rule = {
       hot-queue = {
-        recall-percentage = 0.5
+        recall-percentage = 0.9
         min-merge-num = 1
         priority = 1
       }
       explore-queue = {
-        recall-percentage = 0.5
+        recall-percentage = 0.1
         min-merge-num = 1
         priority = 5
       }
@@ -51,15 +51,15 @@ rule-config = {
   hot-queue = {
     merge-rule = {
       global1h-index = {
-        recall-percentage = 0.5
+        recall-percentage = 0.3
         min-merge-num = 1
       }
       region1h-index = {
-        recall-percentage = 0.5
+        recall-percentage = 0.6
         min-merge-num = 1
       }
       region24h-index = {
-        recall-percentage = 0.5
+        recall-percentage = 0.1
         min-merge-num = 1
       }
     }