|
@@ -1,18 +1,31 @@
|
|
|
package com.tzld.piaoquan.recommend.server.service.recall.strategy;
|
|
|
|
|
|
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import com.tzld.piaoquan.recommend.server.model.Video;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.filter.FilterResult;
|
|
|
import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConfigService;
|
|
|
import com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.recall.FilterParamFactory;
|
|
|
import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.score.ScorerUtils;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.score4recall.ScorerPipeline4Recall;
|
|
|
+import com.tzld.piaoquan.recommend.server.util.JSONUtils;
|
|
|
import lombok.Data;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.commons.lang3.RandomUtils;
|
|
|
+import org.apache.commons.lang3.math.NumberUtils;
|
|
|
import org.apache.commons.lang3.tuple.Pair;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.data.redis.core.ZSetOperations;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import java.math.BigDecimal;
|
|
|
import java.math.RoundingMode;
|
|
|
import java.util.*;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants.KEY_WITH_LEVEL_SCORE_FORMAT;
|
|
|
|
|
@@ -21,7 +34,8 @@ import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConsta
|
|
|
*/
|
|
|
@Service
|
|
|
public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLevelScoreRecallStrategy {
|
|
|
-
|
|
|
+ @ApolloJsonValue("${ifOneLevelRandom:true}")
|
|
|
+ private boolean ifOneLevelRandom;
|
|
|
@Autowired
|
|
|
private FlowPoolConfigService flowPoolConfigService;
|
|
|
|
|
@@ -29,11 +43,12 @@ public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLe
|
|
|
Pair<String, String> flowPoolKeyAndLevel(RecallParam param) {
|
|
|
//# 1. 获取流量池各层级分发概率权重
|
|
|
Map<String, Double> levelWeightMap = flowPoolConfigService.getLevelWeight();
|
|
|
+
|
|
|
// 2. 判断各层级是否有视频需分发
|
|
|
List<LevelWeight> availableLevels = new ArrayList<>();
|
|
|
for (Map.Entry<String, Double> entry : levelWeightMap.entrySet()) {
|
|
|
String levelKey = String.format(KEY_WITH_LEVEL_SCORE_FORMAT, param.getAppType(), entry.getKey());
|
|
|
- if (redisTemplate.hasKey(levelKey)) {
|
|
|
+ if (Boolean.TRUE.equals(redisTemplate.hasKey(levelKey))) {
|
|
|
LevelWeight lw = new LevelWeight();
|
|
|
lw.setLevel(entry.getKey());
|
|
|
lw.setLevelKey(levelKey);
|
|
@@ -41,6 +56,8 @@ public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLe
|
|
|
availableLevels.add(lw);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ //log.info("availableLevels {}", JSONUtils.toJson(availableLevels));
|
|
|
if (CollectionUtils.isEmpty(availableLevels)) {
|
|
|
return Pair.of("", "");
|
|
|
}
|
|
@@ -52,7 +69,7 @@ public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLe
|
|
|
BigDecimal weightSumBD = new BigDecimal(weightSum);
|
|
|
double level_p_low = 0;
|
|
|
double weight_temp = 0;
|
|
|
- double level_p_up;
|
|
|
+ double level_p_up = 0;
|
|
|
Map<String, LevelP> level_p_mapping = new HashMap<>();
|
|
|
for (LevelWeight lw : availableLevels) {
|
|
|
BigDecimal bd = new BigDecimal(weight_temp + lw.getWeight());
|
|
@@ -68,7 +85,6 @@ public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLe
|
|
|
}
|
|
|
|
|
|
// 4. 随机生成[0,1)之间数,返回相应概率区间的key
|
|
|
-
|
|
|
double random_p = RandomUtils.nextDouble(0, 1);
|
|
|
for (Map.Entry<String, LevelP> entry : level_p_mapping.entrySet()) {
|
|
|
if (random_p >= entry.getValue().getMin()
|
|
@@ -97,4 +113,68 @@ public class FlowPoolWithLevelScoreRecallStrategy extends AbstractFlowPoolWithLe
|
|
|
public String pushFrom() {
|
|
|
return FlowPoolConstants.PUSH_FORM;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<Video> recall(RecallParam param) {
|
|
|
+ Pair<String, String> flowPoolKeyAndLevel = flowPoolKeyAndLevel(param);
|
|
|
+ String flowPoolKey = flowPoolKeyAndLevel.getLeft();
|
|
|
+ String level = flowPoolKeyAndLevel.getRight();
|
|
|
+ Set<ZSetOperations.TypedTuple<String>> data = redisTemplate.opsForZSet().reverseRangeWithScores(flowPoolKey, 0, 1000);
|
|
|
+ if (CollectionUtils.isEmpty(data)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ Map<String, String> videoFlowPoolMap = new LinkedHashMap<>();
|
|
|
+ Map<Long, String> videoFlowPoolMap_ = new LinkedHashMap<>();
|
|
|
+ for (ZSetOperations.TypedTuple<String> value : data) {
|
|
|
+ String[] values = Objects.requireNonNull(value.getValue()).split("-");
|
|
|
+ videoFlowPoolMap.put(values[0], values[1]);
|
|
|
+ videoFlowPoolMap_.put(NumberUtils.toLong(values[0], 0), values[1]);
|
|
|
+ }
|
|
|
+ Map<Long, Double> resultmap = null;
|
|
|
+ if ("1".equals(level) && ifOneLevelRandom) {
|
|
|
+ // 流量池一层改为全随机
|
|
|
+ int limitSize = 60;
|
|
|
+ List<Long> keyList = new ArrayList<>(videoFlowPoolMap_.keySet());
|
|
|
+ Collections.shuffle(keyList);
|
|
|
+ resultmap = keyList.stream().limit(limitSize).collect(Collectors.toMap(
|
|
|
+ key -> key,
|
|
|
+ key -> Math.random()
|
|
|
+ ));
|
|
|
+ } else {
|
|
|
+ ScorerPipeline4Recall pipeline = ScorerUtils.getScorerPipeline4Recall("feeds_recall_config_tomson.conf");
|
|
|
+ List<List<Pair<Long, Double>>> results = pipeline.recall(videoFlowPoolMap);
|
|
|
+ List<Pair<Long, Double>> result = results.get(0);
|
|
|
+ resultmap = result.stream()
|
|
|
+ .collect(Collectors.toMap(
|
|
|
+ Pair::getLeft, // 键是Pair的left值
|
|
|
+ Pair::getRight, // 值是Pair的right值
|
|
|
+ (existingValue, newValue) -> existingValue, // 如果键冲突,选择保留现有的值(或者你可以根据需要定义其他合并策略)
|
|
|
+ LinkedHashMap::new // 使用LinkedHashMap来保持插入顺序(如果需要的话)
|
|
|
+ ));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 3 召回内部过滤
|
|
|
+ FilterParam filterParam = FilterParamFactory.create(param, new ArrayList<>(resultmap.keySet()));
|
|
|
+ filterParam.setForceTruncation(10000);
|
|
|
+ filterParam.setConcurrent(true);
|
|
|
+ filterParam.setNotUsePreView(false);
|
|
|
+ FilterResult filterResult = filterService.filter(filterParam);
|
|
|
+ List<Video> videosResult = new ArrayList<>();
|
|
|
+ if (filterResult != null && CollectionUtils.isNotEmpty(filterResult.getVideoIds())) {
|
|
|
+ Map<Long, Double> finalResultmap = resultmap;
|
|
|
+ filterResult.getVideoIds().forEach(vid -> {
|
|
|
+ Video recallData = new Video();
|
|
|
+ recallData.setVideoId(vid);
|
|
|
+ recallData.setAbCode(param.getAbCode());
|
|
|
+ recallData.setRovScore(finalResultmap.getOrDefault(vid, 0.0));
|
|
|
+ recallData.setPushFrom(pushFrom());
|
|
|
+ recallData.setFlowPool(videoFlowPoolMap_.get(vid));
|
|
|
+ recallData.setFlowPoolAbtestGroup(param.getFlowPoolAbtestGroup());
|
|
|
+ recallData.setLevel(level);
|
|
|
+ videosResult.add(recallData);
|
|
|
+ });
|
|
|
+ }
|
|
|
+ videosResult.sort(Comparator.comparingDouble(o -> -o.getRovScore()));
|
|
|
+ return videosResult;
|
|
|
+ }
|
|
|
}
|