|
@@ -0,0 +1,174 @@
|
|
|
+package com.tzld.piaoquan.recommend.server.service.rank.strategy;
|
|
|
+
|
|
|
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
+import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
|
|
|
+import com.tzld.piaoquan.recommend.server.common.base.RankItem;
|
|
|
+import com.tzld.piaoquan.recommend.server.model.Video;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.FeatureService;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.rank.RankParam;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.rank.tansform.FeatureV6;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.HotReturnUvRecallStrategy;
|
|
|
+import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils;
|
|
|
+import com.tzld.piaoquan.recommend.server.util.FeatureBucketUtils;
|
|
|
+import com.tzld.piaoquan.recommend.server.util.FeatureUtils;
|
|
|
+import com.tzld.piaoquan.recommend.server.util.RecallUtils;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.collections4.MapUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+@Service
|
|
|
+@Slf4j
|
|
|
+public class RankStrategy4RelevantModelV1 extends RankStrategy4RegionMergeModelBasic {
|
|
|
+ @ApolloJsonValue("${relevant.params.v1:}")
|
|
|
+ private Map<String, Double> apolloParams;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private FeatureService featureService;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<Video> mergeAndRankRovRecall(RankParam param) {
|
|
|
+ Map<String, Double> apolloParams = this.apolloParams != null ? this.apolloParams : new HashMap<>(0);
|
|
|
+
|
|
|
+ // ------------------- 召回 -------------------
|
|
|
+ Set<Long> setVideo = new HashSet<>();
|
|
|
+ List<Video> rovRecallRank = new ArrayList<>();
|
|
|
+ // return uv
|
|
|
+ RecallUtils.extractRecall(apolloParams.getOrDefault("returnUv", 100d).intValue(), param, HotReturnUvRecallStrategy.PUSH_FROM, setVideo, rovRecallRank);
|
|
|
+
|
|
|
+ // ------------------- 排序 -------------------
|
|
|
+ Map<String, String> rtFeatureDumpsMap = dumpsRtFeature(param.getUserRTShareList());
|
|
|
+
|
|
|
+ // 1. 批量获取特征
|
|
|
+ List<String> vids = CommonCollectionUtils.toListDistinct(rovRecallRank, v -> String.valueOf(v.getVideoId()));
|
|
|
+ String requestId = String.valueOf(param.getRequestVideoId());
|
|
|
+ Map<String, Map<String, Map<String, String>>> videoBaseInfoMap = featureService.getVideoBaseInfo(requestId, vids);
|
|
|
+ Map<String, String> requestVideoInfo = videoBaseInfoMap.getOrDefault(requestId, new HashMap<>()).getOrDefault("alg_vid_feature_basic_info", new HashMap<>());
|
|
|
+ String requestTitle = FeatureUtils.extractContent(requestVideoInfo.get("title"));
|
|
|
+ if (requestTitle.isEmpty()) {
|
|
|
+ return new ArrayList<>();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2. 特征处理
|
|
|
+ int batchSize = apolloParams.getOrDefault("batchSize", 10d).intValue();
|
|
|
+ long timeout = apolloParams.getOrDefault("timeout", 1000d).longValue();
|
|
|
+ List<RankItem> rankItems = CommonCollectionUtils.toList(rovRecallRank, RankItem::new);
|
|
|
+ parallelGetVideoFeature(requestVideoInfo, videoBaseInfoMap, rankItems, batchSize, timeout);
|
|
|
+ Map<String, Map<String, String>> vid2MapFeature = this.getVideoRedisFeature(vids, "feature_video_relevant:");
|
|
|
+
|
|
|
+ // 3. 排序公式特征
|
|
|
+ double titleThreshold = apolloParams.getOrDefault("titleThreshold", 0.9);
|
|
|
+ double titleWeight = apolloParams.getOrDefault("titleWeight", 0.5);
|
|
|
+ double kwWeight = apolloParams.getOrDefault("kwWeight", 0.2);
|
|
|
+ double cate1Weight = apolloParams.getOrDefault("cate1Weight", 0.1);
|
|
|
+ double cate2Weight = apolloParams.getOrDefault("cate2Weight", 0.1);
|
|
|
+ double rovnWeight = apolloParams.getOrDefault("rovnWeight", 0.1);
|
|
|
+ List<Video> result = new ArrayList<>();
|
|
|
+ for (RankItem item : rankItems) {
|
|
|
+ String title = item.getTitle();
|
|
|
+ if (null != title && title.equals(requestTitle)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ double score;
|
|
|
+ Map<String, Float> featureMap = item.getFeatureMap() != null ? item.getFeatureMap() : new HashMap<>(0);
|
|
|
+ double titleSim = featureMap.getOrDefault("sim@title", 0f);
|
|
|
+ double kwSim = featureMap.getOrDefault("sim@keywords", 0f);
|
|
|
+ double cate1Sim = featureMap.getOrDefault("sim@merge_first_level_cate", 0f);
|
|
|
+ double cate2Sim = featureMap.getOrDefault("sim@merge_second_level_cate", 0f);
|
|
|
+ double rovn24h = Double.parseDouble(vid2MapFeature.getOrDefault(item.getVideoId() + "", new HashMap<>()).getOrDefault("rovn", "0"));
|
|
|
+ if (titleSim > titleThreshold) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ item.getScoresMap().put("titleSim", titleSim);
|
|
|
+ item.getScoresMap().put("kwSim", kwSim);
|
|
|
+ item.getScoresMap().put("cate1Sim", cate1Sim);
|
|
|
+ item.getScoresMap().put("cate2Sim", cate2Sim);
|
|
|
+ item.getScoresMap().put("rovn", rovn24h);
|
|
|
+
|
|
|
+ score = titleWeight * titleSim
|
|
|
+ + kwWeight * (kwSim + 0.05)
|
|
|
+ + cate1Weight * (cate1Sim + 0.05)
|
|
|
+ + cate2Weight * (cate2Sim + 0.05)
|
|
|
+ + rovnWeight * rovn24h * 5;
|
|
|
+
|
|
|
+ Video video = item.getVideo();
|
|
|
+ video.setScore(score);
|
|
|
+ video.setSortScore(score);
|
|
|
+ video.setScoresMap(item.getScoresMap());
|
|
|
+ if (MapUtils.isNotEmpty(videoBaseInfoMap) && MapUtils.isNotEmpty(videoBaseInfoMap.get(item.getVideoId() + ""))) {
|
|
|
+ video.getMetaFeatureMap().putAll(videoBaseInfoMap.get(item.getVideoId() + ""));
|
|
|
+ }
|
|
|
+ if (MapUtils.isNotEmpty(requestVideoInfo)) {
|
|
|
+ video.getMetaFeatureMap().put("req_video", requestVideoInfo);
|
|
|
+ }
|
|
|
+ if (null != rtFeatureDumpsMap && !rtFeatureDumpsMap.isEmpty()) {
|
|
|
+ video.getMetaFeatureMap().put("rt", rtFeatureDumpsMap);
|
|
|
+ }
|
|
|
+ result.add(video);
|
|
|
+ }
|
|
|
+ result.sort(Comparator.comparingDouble(o -> -o.getSortScore()));
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, Float> getVideoFeature(Map<String, String> requestInfo, Map<String, String> rankInfo) {
|
|
|
+ Map<String, Double> featMap = new HashMap<>();
|
|
|
+ FeatureV6.getRequestRankVideoCrossFeature(requestInfo, rankInfo, featMap);
|
|
|
+ return FeatureBucketUtils.noBucketFeature(featMap);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void batchGetVideoFeature(Map<String, String> requestInfo,
|
|
|
+ Map<String, Map<String, Map<String, String>>> videoBaseInfoMap,
|
|
|
+ List<RankItem> rankItems) {
|
|
|
+ if (null == rankItems || rankItems.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ for (RankItem item : rankItems) {
|
|
|
+ String vid = item.getVideoId() + "";
|
|
|
+ Map<String, String> rankInfo = videoBaseInfoMap.getOrDefault(vid, new HashMap<>()).getOrDefault("alg_vid_feature_basic_info", new HashMap<>());
|
|
|
+ item.featureMap = getVideoFeature(requestInfo, rankInfo);
|
|
|
+ item.setTitle(FeatureUtils.extractContent(rankInfo.get("title")));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void parallelGetVideoFeature(Map<String, String> requestInfo,
|
|
|
+ Map<String, Map<String, Map<String, String>>> videoBaseInfoMap,
|
|
|
+ List<RankItem> rankItems,
|
|
|
+ int batchSize,
|
|
|
+ long timeout) {
|
|
|
+ if (null == rankItems || rankItems.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ List<Future<Integer>> futures = new ArrayList<>();
|
|
|
+ LinkedList<List<RankItem>> batchList = CommonCollectionUtils.splitListToBatch(rankItems, batchSize);
|
|
|
+ for (List<RankItem> batch : batchList) {
|
|
|
+ Future<Integer> future = ThreadPoolFactory.defaultPool().submit(() -> {
|
|
|
+ batchGetVideoFeature(requestInfo, videoBaseInfoMap, batch);
|
|
|
+ return 1;
|
|
|
+ });
|
|
|
+ futures.add(future);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ for (Future<Integer> future : futures) {
|
|
|
+ future.get(timeout, TimeUnit.MILLISECONDS);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("get feature error", e);
|
|
|
+ }
|
|
|
+ // 超时后取消
|
|
|
+ for (Future<Integer> future : futures) {
|
|
|
+ try {
|
|
|
+ if (!future.isDone()) {
|
|
|
+ future.cancel(true);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("cancel feature error", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|