Browse Source

Merge branch 'feature/20240402/sunxy/addRegionFilter' of algorithm/recommend-server into master

sunxiaoyi 1 year ago
parent
commit
2d45ec5e66

+ 12 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/ThreadPoolFactory.java

@@ -33,6 +33,14 @@ public final class ThreadPoolFactory {
             new ThreadFactoryBuilder().setNameFormat("FilterService-%d").build(),
             new ThreadPoolExecutor.AbortPolicy());
 
+    private final static ExecutorService LOG = new CommonThreadPoolExecutor(
+            128,
+            128,
+            0L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(1000),
+            new ThreadFactoryBuilder().setNameFormat("LogService-%d").build(),
+            new ThreadPoolExecutor.AbortPolicy());
+
     public static ExecutorService defaultPool() {
         return DEFAULT;
     }
@@ -45,4 +53,8 @@ public final class ThreadPoolFactory {
         return FILTER;
     }
 
+    public static ExecutorService logPool() {
+        return LOG;
+    }
+
 }

+ 1 - 9
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/merger/StrategyQueue.java

@@ -135,22 +135,14 @@ public abstract class StrategyQueue {
     public final int doDeDup() {
         List<RankItem> pureItems = new ArrayList<RankItem>();
         Set<String> deDupItems = new HashSet<String>();
-        int deDupCount = 0;
         for (RankItem item : items) {
-            if (deDupItems.contains(item.getVideoId())) {
-                if (LOGGER.isDebugEnabled()) {
-                    LOGGER.debug("merge queue dedup item, queue [{}] itemid [{}]",
-                            strategyQueueInfo.getQueueName(), item.getVideoId());
-                }
-                deDupCount++;
+            if (deDupItems.contains(item.getId())) {
                 continue;
             }
             deDupItems.add(item.getId());
             pureItems.add(item);
         }
         items = pureItems;
-        LOGGER.debug("dedup items, queue [{}], dedup number [{}], current number [{}]",
-                new Object[]{getStrategyQueueInfo().getQueueName(), deDupCount, items.size()});
         return items.size();
     }
 

+ 2 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/BaseRecaller.java

@@ -1,7 +1,6 @@
 package com.tzld.piaoquan.recommend.server.framework.recaller;
 
 
-import com.alibaba.fastjson.JSONObject;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
@@ -28,8 +27,8 @@ import java.util.stream.Collectors;
 public class BaseRecaller<Video> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(BaseRecaller.class);
-    private static final long DEFAULT_QUEUE_LOAD_TIMEOUT = 150; // ms
-    private static final long DEFAULT_PARALLEL_FILTER_TIMEOUT = 200; // ms
+    private static final long DEFAULT_QUEUE_LOAD_TIMEOUT = 1500; // ms
+    private static final long DEFAULT_PARALLEL_FILTER_TIMEOUT = 1500; // ms
 
     private static final String FILTER_CONF = "filter_config.conf"; // ms
     private static final ExecutorService filterExecutorService = new ThreadPoolExecutor(128, 128,

+ 22 - 6
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/TopRecommendPipeline.java

@@ -62,11 +62,17 @@ public class TopRecommendPipeline {
 
     public List<Video> feeds(final RecommendRequest requestData,
                              final int requestIndex,
-                             final User userInfo, Boolean logPrint) {
+                             final User userInfo, Boolean logPrint,
+                             Map<String, String> timeLogMap) {
         // Step 1: Attention extraction
         Stopwatch stopwatch = Stopwatch.createStarted();
         stopwatch.reset().start();
-        List<RankItem> rankItems = feedByRec(requestData, requestIndex, userInfo, logPrint);
+        timeLogMap.put("uid", userInfo.getUid());
+        timeLogMap.put("mid", userInfo.getMid());
+        timeLogMap.put("requestId", requestData.getRequestId());
+
+        List<RankItem> rankItems = feedByRec(requestData, requestIndex, userInfo, logPrint, timeLogMap);
+        timeLogMap.put("feedByRec", stopwatch.elapsed().toMillis() + "");
         if (rankItems == null || rankItems.isEmpty()) {
             return new ArrayList<>();
         }
@@ -76,6 +82,7 @@ public class TopRecommendPipeline {
         }
         stopwatch.reset().start();
         List<Video> videos = rankItem2Video(rankItems);
+        timeLogMap.put("rankItem2Video", stopwatch.elapsed().toMillis() + "");
         if (logPrint) {
             log.info("traceId = {}, cost = {}, videos = {}", requestData.getRequestId(),
                     stopwatch.elapsed().toMillis(), JSONUtils.toJson(videos));
@@ -116,9 +123,9 @@ public class TopRecommendPipeline {
 
     public List<RankItem> feedByRec(final RecommendRequest requestData,
                                     final int requestIndex,
-                                    final User userInfo, Boolean logPrint) {
+                                    final User userInfo, Boolean logPrint,
+                                    Map<String, String> timeLogMap) {
         Stopwatch stopwatch = Stopwatch.createStarted();
-
         // Step 2: create top queue
         stopwatch.reset().start();
         StrategyQueue topQueue = MergeUtils.createTopQueue(MERGE_CONF, "top-queue");
@@ -126,6 +133,7 @@ public class TopRecommendPipeline {
             log.info("traceId = {}, cost = {}, topQueue = {}", requestData.getRequestId(),
                     stopwatch.elapsed().toMillis(), JSONUtils.toJson(topQueue));
         }
+        timeLogMap.put("createTopQueue", stopwatch.elapsed().toMillis() + "");
 
         // Step 3: Candidate
         stopwatch.reset().start();
@@ -135,6 +143,7 @@ public class TopRecommendPipeline {
             log.info("traceId = {}, cost = {}, candidates = {}", requestData.getRequestId(),
                     stopwatch.elapsed().toMillis(), JSONUtils.toJson(candidates));
         }
+        timeLogMap.put("topQueue-candidate-cost", stopwatch.elapsed().toMillis() + "");
 
 
         // Step 4: Recalling & Basic Scoring
@@ -145,7 +154,8 @@ public class TopRecommendPipeline {
             log.info("traceId = {}, cost = {}, items = {}", requestData.getRequestId(),
                     stopwatch.elapsed().toMillis(), JSONUtils.toJson(items));
         }
-
+        timeLogMap.put("recalling-cost", stopwatch.elapsed().toMillis() + "");
+        timeLogMap.put("recalling-size", items == null ? "0" : items.size() + "");
 
         // Step 4: Advance Scoring
 //        timestamp = System.currentTimeMillis();
@@ -167,6 +177,9 @@ public class TopRecommendPipeline {
         }
         duplicate(mergeItems);
 
+        timeLogMap.put("mergeItems-cost", stopwatch.elapsed().toMillis() + "");
+        timeLogMap.put("mergeItems-size", mergeItems.size() + "");
+
         if (logPrint) {
             log.info("traceId = {}, cost = {}, mergeItems = {}", requestData.getRequestId(),
                     stopwatch.elapsed().toMillis(), JSONUtils.toJson(mergeItems));
@@ -177,11 +190,14 @@ public class TopRecommendPipeline {
         // TODO 前置和后置处理逻辑 hardcode,后续优化
         stopwatch.reset().start();
         List<RankItem> rovRecallRankNewScore = rankByScore(mergeItems, requestData);
+
+        timeLogMap.put("rankByScore-cost", stopwatch.elapsed().toMillis() + "");
+        timeLogMap.put("rankByScore-size", rovRecallRankNewScore.size() + "");
+
         if (logPrint) {
             log.info("traceId = {}, cost = {}, rovRecallRankNewScore = {}", requestData.getRequestId(),
                     stopwatch.elapsed().toMillis(), JSONUtils.toJson(rovRecallRankNewScore));
         }
-
         return rovRecallRankNewScore;
     }
 

+ 11 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/recall/ViewedHistoryFilter.java

@@ -9,6 +9,7 @@ import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
 import com.tzld.piaoquan.recommend.server.model.Video;
 import com.tzld.piaoquan.recommend.server.service.ServiceBeanFactory;
 import com.tzld.piaoquan.recommend.server.service.SpringContextHolder;
+import com.tzld.piaoquan.recommend.server.service.filter.VideoCityFilterService;
 import com.tzld.piaoquan.recommend.server.service.filter.strategy.VideoView;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -27,6 +28,7 @@ import java.util.stream.Collectors;
 
 public class ViewedHistoryFilter extends AbstractFilter<Video> {
 
+    private final VideoCityFilterService videoCityFilterService;
 
     protected Set<String> historySet;
 
@@ -46,6 +48,8 @@ public class ViewedHistoryFilter extends AbstractFilter<Video> {
         if (historySet == null) {
             historySet = new HashSet<>();
         }
+        videoCityFilterService = SpringContextHolder.getBean(VideoCityFilterService.class);
+
     }
 
     @Override
@@ -54,6 +58,13 @@ public class ViewedHistoryFilter extends AbstractFilter<Video> {
             return;
         }
         videoList.removeIf(video -> this.historySet.contains(String.valueOf(video.getVideoId())));
+        if (CollectionUtils.isEmpty(videoList)) {
+            return;
+        }
+        List<Long> filterVideosByCity = videoCityFilterService.filterVideosByCity(videoList.stream().map(Video::getVideoId).collect(Collectors.toList()),
+                new HashSet<>(requestContext.getAbExpCodeList()), requestContext.getHotSceneType(), requestContext.getCityCode());
+        Set<Long> filterVideosSet = new HashSet<>(filterVideosByCity);
+        videoList.removeIf(video -> !filterVideosSet.contains(video.getVideoId()));
     }
 
 

+ 82 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/TimerLogService.java

@@ -0,0 +1,82 @@
+package com.tzld.piaoquan.recommend.server.service;
+
+import com.aliyun.openservices.aliyun.log.producer.LogProducer;
+import com.aliyun.openservices.aliyun.log.producer.Producer;
+import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
+import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
+import com.aliyun.openservices.log.common.LogItem;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@Service
+@Slf4j
+public class TimerLogService {
+
+    @Value("${aliyun.timer.log.project}")
+    private String project;
+    @Value("${aliyun.log.endpoint}")
+    private String endpoint;
+    @Value("${aliyun.log.accessKeyId}")
+    private String accessKeyId;
+    @Value("${aliyun.log.accessKeySecret}")
+    private String accessKeySecret;
+
+    @Value("${aliyun.timer.log.logStore}")
+    private String logStore;
+
+    private Producer producer;
+
+
+    @PostConstruct
+    public void init() {
+        ProducerConfig producerConfig = new ProducerConfig();
+        producer = new LogProducer(producerConfig);
+        producer.putProjectConfig(new ProjectConfig(project, endpoint, accessKeyId, accessKeySecret));
+    }
+
+    public void log(Map<String, String> data) {
+        if (MapUtils.isEmpty(data)) {
+            return;
+        }
+        try {
+            LogItem logItem = new LogItem();
+            data.entrySet().stream().forEach(e -> {
+                logItem.PushBack(e.getKey(), e.getValue());
+            });
+            producer.send(project, logStore, logItem);
+        } catch (InterruptedException e) {
+            log.warn("The current thread has been interrupted during send logs.");
+        } catch (Exception e) {
+            log.error("Failed to send logs", e);
+        }
+    }
+
+    public void log(List<Map<String, String>> data) {
+        if (CollectionUtils.isEmpty(data)) {
+            return;
+        }
+        try {
+            List<LogItem> items = data.stream().map(d -> {
+                LogItem logItem = new LogItem();
+                d.entrySet().stream().forEach(e -> {
+                    logItem.PushBack(e.getKey(), e.getValue());
+                });
+                return logItem;
+            }).collect(Collectors.toList());
+            producer.send(project, logStore, items);
+        } catch (InterruptedException e) {
+            log.warn("The current thread has been interrupted during send logs.");
+        } catch (Exception e) {
+            log.error("Failed to send logs", e);
+        }
+    }
+
+}

+ 9 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/VideoRecommendService.java

@@ -3,6 +3,7 @@ package com.tzld.piaoquan.recommend.server.service;
 import com.alibaba.fastjson.JSONObject;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Strings;
+import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.server.framework.common.User;
 import com.tzld.piaoquan.recommend.server.gen.common.Result;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
@@ -26,7 +27,9 @@ import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -46,6 +49,8 @@ public class VideoRecommendService {
     private FlowPoolRecommendPipeline flowPoolRecommendPipeline;
     @Resource
     private RankStrategy4RegionMergeModelV547 rankStrategy4RegionMergeModelV547;
+    @Resource
+    private TimerLogService timerLogService;
     @Autowired
     @Qualifier("redisTemplate")
     private RedisTemplate<String, String> redisTemplate;
@@ -116,8 +121,9 @@ public class VideoRecommendService {
         User user = getUser(request);
         List<Video> topRecommendVideoList = new ArrayList<>();
         List<Video> flowPoolRecommendVideoList = new ArrayList<>();
+        Map<String, String> timeLogMap = new HashMap<>(32);
         try {
-            topRecommendVideoList = topRecommendPipeline.feeds(request, 0, user, newLogPrint);
+            topRecommendVideoList = topRecommendPipeline.feeds(request, 0, user, newLogPrint, timeLogMap);
             if (newLogPrint) {
                 log.info("traceId = {}, cost = {}, topRecommendVideoList [{}]", request.getRequestId(),
                         stopwatch.elapsed().toMillis(), JSONObject.toJSONString(topRecommendVideoList));
@@ -125,6 +131,8 @@ public class VideoRecommendService {
 
         } catch (Exception e) {
             log.error("traceId = {}, topRecommendVideoList error", request.getRequestId(), e);
+        } finally {
+            ThreadPoolFactory.logPool().submit(() -> timerLogService.log(timeLogMap));
         }
         try {
             stopwatch.reset().start();

+ 110 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/VideoCityFilterService.java

@@ -0,0 +1,110 @@
+package com.tzld.piaoquan.recommend.server.service.filter;
+
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
+import com.google.common.base.Stopwatch;
+import com.tzld.piaoquan.recommend.server.repository.WxVideoTagRel;
+import com.tzld.piaoquan.recommend.server.repository.WxVideoTagRelRepository;
+import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @author sunxy
+ */
+@Service
+public class VideoCityFilterService {
+
+//    private final Logger logger = Logger.getLogger(VideoCityFilterService.class.getName());
+
+    private final Logger logger = LoggerFactory.getLogger(VideoCityFilterService.class);
+
+//    @Value("${video.filter.city.tagid.json:}")
+//    private String videoFilterTagIdJson;
+
+    @ApolloJsonValue("${video.filter.city.tagid.json:{}}")
+    private Map<String, List<Long>> videoFilterCityTagIdMap;
+    @Value("#{'${block.hotscenetype.list:}'.split(',')}")
+    private Set<Long> excludeScenes;
+    @Value("${securityAbExpCode:625}")
+    private String securityAbExpCode;
+    @Resource
+    private WxVideoTagRelRepository wxVideoTagRelRepository;
+    private Map<String, Set<Long>> videoTagCache = new ConcurrentHashMap<>();
+
+    @PostConstruct
+    public void init() {
+        initCacheByValue();
+    }
+
+    @Scheduled(cron = "0 0 0/2 * * ? ")
+    public void cornInit() {
+        initCacheByValue();
+    }
+
+    private void initCacheByValue() {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        if (Objects.isNull(videoFilterCityTagIdMap) || videoFilterCityTagIdMap.isEmpty()) {
+            videoTagCache = new ConcurrentHashMap<>();
+            return;
+        }
+        Map<String, Set<Long>> tmp = new ConcurrentHashMap<>();
+        for (Map.Entry<String, List<Long>> entry : videoFilterCityTagIdMap.entrySet()) {
+            if (Objects.isNull(entry)) {
+                continue;
+            }
+            String cityCode = entry.getKey();
+            List<Long> tagList = entry.getValue();
+            if (Objects.isNull(cityCode) || Objects.isNull(tagList) || tagList.isEmpty()) {
+                continue;
+            }
+            Set<Long> videosByTag = wxVideoTagRelRepository.findAllByTagIdIn(tagList).stream()
+                    .map(WxVideoTagRel::getVideoId).collect(Collectors.toSet());
+            tmp.put(cityCode, videosByTag);
+        }
+        videoTagCache = tmp;
+        logger.info("initCacheByValue videoTagCache.size = {} execute time = {}", videoTagCache.size(), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+
+    }
+
+    public List<Long> filterVideosByCity(List<Long> videoIds, Set<String> abExpCodes,
+                                         Long hotSceneType, String cityCode) {
+        if (CollectionUtils.isEmpty(videoIds) || StringUtils.isBlank(cityCode) || hotSceneType == null) {
+            return videoIds;
+        }
+        if (CollectionUtils.isNotEmpty(excludeScenes) && excludeScenes.contains(hotSceneType)) {
+            return videoIds;
+        }
+        if (!CommonCollectionUtils.contains(abExpCodes, securityAbExpCode)) {
+            return videoIds;
+        }
+        List<Long> tagIdList = videoFilterCityTagIdMap.get(cityCode);
+        if (tagIdList == null || tagIdList.isEmpty()) {
+            return videoIds;
+        }
+
+        Set<Long> riskCityVideoIdSet = videoTagCache.get(cityCode);
+        if (riskCityVideoIdSet == null || riskCityVideoIdSet.isEmpty()) {
+            return videoIds;
+        }
+
+        videoIds.removeIf(riskCityVideoIdSet::contains);
+
+        return videoIds;
+    }
+
+}

+ 4 - 0
recommend-server-service/src/main/resources/application-dev.yml

@@ -110,6 +110,10 @@ aliyun:
     accessKeyId: LTAIP6x1l3DXfSxm
     accessKeySecret: KbTaM9ars4OX3PMS6Xm7rtxGr1FLon
     project: recommend-server-test
+  timer:
+    log:
+      project: recommend-server-test
+      logStore: timer
 
 logging:
   file:

+ 5 - 1
recommend-server-service/src/main/resources/application-pre.yml

@@ -108,4 +108,8 @@ aliyun:
     endpoint: cn-hangzhou-intranet.log.aliyuncs.com
     accessKeyId: LTAIP6x1l3DXfSxm
     accessKeySecret: KbTaM9ars4OX3PMS6Xm7rtxGr1FLon
-    project: recommend-server
+    project: recommend-server
+  timer:
+    log:
+      project: recommend-server
+      logStore: timer

+ 5 - 1
recommend-server-service/src/main/resources/application-prod.yml

@@ -108,4 +108,8 @@ aliyun:
     endpoint: cn-hangzhou-intranet.log.aliyuncs.com
     accessKeyId: LTAIP6x1l3DXfSxm
     accessKeySecret: KbTaM9ars4OX3PMS6Xm7rtxGr1FLon
-    project: recommend-server
+    project: recommend-server
+  timer:
+    log:
+      project: recommend-server
+      logStore: timer

+ 5 - 1
recommend-server-service/src/main/resources/application-test.yml

@@ -109,4 +109,8 @@ aliyun:
     endpoint: cn-hangzhou-intranet.log.aliyuncs.com
     accessKeyId: LTAIP6x1l3DXfSxm
     accessKeySecret: KbTaM9ars4OX3PMS6Xm7rtxGr1FLon
-    project: recommend-server-test
+    project: recommend-server-test
+  timer:
+    log:
+      project: recommend-server-test
+      logStore: timer

+ 20 - 12
recommend-server-service/src/main/resources/merge_config.conf

@@ -34,20 +34,20 @@ queue-config = {
   }
 }
 
-
-
 rule-config = {
   // 顶层队列
   top-queue = {
     merge-rule = {
       global-hot-queue = {
-        recall-percentage = 0.3
-        min-merge-num = 1
+        recall-percentage = 0.33
+        min-merge-num = 80
+        max-merge-num = 160
         priority = 1
       }
       region-hot-queue = {
-        recall-percentage = 0.6
-        min-merge-num = 1
+        recall-percentage = 0.66
+        min-merge-num = 120
+        max-merge-num = 240
         priority = 1
       }
     }
@@ -58,15 +58,18 @@ rule-config = {
     merge-rule = {
       global1h-index = {
         recall-percentage = 0.3
-        min-merge-num = 1
+        min-merge-num = 60
+        max-merge-num = 120
       }
       global3h-index = {
         recall-percentage = 0.3
-        min-merge-num = 1
+        min-merge-num = 60
+        max-merge-num = 120
       }
       global24h-index = {
         recall-percentage = 0.3
-        min-merge-num = 1
+        min-merge-num = 40
+        max-merge-num = 80
       }
     }
   }
@@ -76,17 +79,22 @@ rule-config = {
     merge-rule = {
       region1h-index = {
         recall-percentage = 0.3
-        min-merge-num = 1
+        min-merge-num = 100
+        max-merge-num = 200
+
       }
       region3h-index = {
         recall-percentage = 0.3
-        min-merge-num = 1
+        min-merge-num = 100
+        max-merge-num = 200
       }
       region24h-index = {
         recall-percentage = 0.3
-        min-merge-num = 1
+        min-merge-num = 40
+        max-merge-num = 80
       }
     }
   }
 
 }
+