|
@@ -0,0 +1,163 @@
|
|
|
+package com.tzld.piaoquan.recommend.server.service.filter.strategy;
|
|
|
+
|
|
|
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
+import com.google.common.cache.CacheBuilder;
|
|
|
+import com.google.common.cache.CacheLoader;
|
|
|
+import com.google.common.cache.LoadingCache;
|
|
|
+import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
|
|
|
+import com.tzld.piaoquan.recommend.server.repository.WxVideoStatus;
|
|
|
+import com.tzld.piaoquan.recommend.server.repository.WxVideoStatusRepository;
|
|
|
+import com.tzld.piaoquan.recommend.server.repository.WxVideoTagRel;
|
|
|
+import com.tzld.piaoquan.recommend.server.repository.WxVideoTagRelRepository;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
|
|
|
+import com.tzld.piaoquan.recommend.server.service.filter.FilterStrategy;
|
|
|
+import com.tzld.piaoquan.recommend.server.util.CommonCollectionUtils;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.collections4.CollectionUtils;
|
|
|
+import org.apache.commons.lang3.RandomUtils;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
+import org.springframework.dao.DataAccessException;
|
|
|
+import org.springframework.data.redis.core.RedisOperations;
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
+import org.springframework.data.redis.core.SessionCallback;
|
|
|
+import org.springframework.data.redis.core.ValueOperations;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+
|
|
|
+@Component
|
|
|
+@Slf4j
|
|
|
+public class GeneralSpiderStrategy implements FilterStrategy {
|
|
|
+ @Autowired
|
|
|
+ @Qualifier("redisTemplate")
|
|
|
+ private RedisTemplate<String, String> redisTemplate;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private WxVideoStatusRepository wxVideoStatusRepository;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 存储视频的uid
|
|
|
+ * key: video:uid:{videoId}
|
|
|
+ * value: uid
|
|
|
+ */
|
|
|
+ private final String keyFormat = "video:uid:%s";
|
|
|
+ /**
|
|
|
+ * 非ugc用户 大数据同步
|
|
|
+ *
|
|
|
+ */
|
|
|
+ public static final String GENERAL_SPIDER_USER_KEY = "general:spider:user";
|
|
|
+ private LoadingCache<String, Set<String>> generalSpiderUserCache = CacheBuilder.newBuilder()
|
|
|
+ .maximumSize(10000)
|
|
|
+ .refreshAfterWrite(600, TimeUnit.SECONDS)
|
|
|
+ .expireAfterWrite(600, TimeUnit.SECONDS)
|
|
|
+ .expireAfterAccess(600, TimeUnit.SECONDS)
|
|
|
+ .build(new CacheLoader<String, Set<String>>() {
|
|
|
+ @Override
|
|
|
+ public Set<String> load(String key) {
|
|
|
+ Set<String> result = redisTemplate.opsForSet().members(key);
|
|
|
+ if (CollectionUtils.isEmpty(result)) {
|
|
|
+ return Collections.emptySet();
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private WxVideoTagRelRepository repository;
|
|
|
+
|
|
|
+ @ApolloJsonValue("${general.spider.exclude.tagids:[]}")
|
|
|
+ private List<Long> generalSpiderExcludeTagIds;
|
|
|
+
|
|
|
+ private String generalSpiderExcludeVideoCacheKey = "general.spider.exclude.video";
|
|
|
+ // 内存持久保存不淘汰
|
|
|
+ private LoadingCache<String, Set<Long>> generalSpiderExcludeVideoCache = CacheBuilder.newBuilder()
|
|
|
+ .maximumSize(10)
|
|
|
+ .refreshAfterWrite(60, TimeUnit.SECONDS)
|
|
|
+ .expireAfterWrite(60, TimeUnit.SECONDS)
|
|
|
+ .expireAfterAccess(60, TimeUnit.SECONDS)
|
|
|
+ .build(new CacheLoader<String, Set<Long>>() {
|
|
|
+ @Override
|
|
|
+ public Set<Long> load(String key) {
|
|
|
+ List<WxVideoTagRel> rels = repository.findAllByTagIdIn(generalSpiderExcludeTagIds);
|
|
|
+ return CommonCollectionUtils.toSet(rels, WxVideoTagRel::getVideoId);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ Set<Long> data = generalSpiderExcludeVideoCache.getUnchecked(generalSpiderExcludeVideoCacheKey);
|
|
|
+ log.info("generalSpiderExcludeVideoCache size {}", data.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<Long> filter(FilterParam param) {
|
|
|
+ if (param == null) {
|
|
|
+ return Collections.emptyList();
|
|
|
+ }
|
|
|
+ if (StringUtils.isBlank(param.getMid())
|
|
|
+ || CollectionUtils.isEmpty(param.getVideoIds())) {
|
|
|
+ return param.getVideoIds();
|
|
|
+ }
|
|
|
+
|
|
|
+ // vid -> uid
|
|
|
+ List<String> keys = param.getVideoIds().stream()
|
|
|
+ .map(id -> String.format(keyFormat, id))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ List<String> uids = redisTemplate.opsForValue().multiGet(keys);
|
|
|
+ List<Long> cacheMissVideoIds = new ArrayList<>();
|
|
|
+ Map<Long, String> vid2UidMap = new HashMap<>();
|
|
|
+ for (int i = 0; i < param.getVideoIds().size(); i++) {
|
|
|
+ String value = uids.get(i);
|
|
|
+ if (StringUtils.isBlank(value)) {
|
|
|
+ cacheMissVideoIds.add(param.getVideoIds().get(i));
|
|
|
+ } else {
|
|
|
+ vid2UidMap.put(param.getVideoIds().get(i), value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (CollectionUtils.isNotEmpty(cacheMissVideoIds)) {
|
|
|
+ List<WxVideoStatus> status = wxVideoStatusRepository.findAllByVideoIdIn(cacheMissVideoIds);
|
|
|
+ if (CollectionUtils.isNotEmpty(status)) {
|
|
|
+ for (WxVideoStatus v : status) {
|
|
|
+ vid2UidMap.put(v.getVideoId(), String.valueOf(v.getVideoUid()));
|
|
|
+
|
|
|
+ // TODO 异步更新缓存
|
|
|
+ ThreadPoolFactory.defaultPool().execute(() -> {
|
|
|
+ redisTemplate.executePipelined(new SessionCallback<String>() {
|
|
|
+ @Override
|
|
|
+ public <A, B> String execute(RedisOperations<A, B> redisOperations) throws DataAccessException {
|
|
|
+ ValueOperations<String, String> operations =
|
|
|
+ (ValueOperations<String, String>) redisOperations.opsForValue();
|
|
|
+ status.forEach(v -> {
|
|
|
+ operations.set(String.format(keyFormat, v.getVideoId()),
|
|
|
+ String.valueOf(v.getVideoUid()), RandomUtils.nextInt(30, 60),
|
|
|
+ TimeUnit.SECONDS);
|
|
|
+ });
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ });
|
|
|
+ });
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //需要过滤的视频uid
|
|
|
+ Set<String> generalSpiderUserIds = generalSpiderUserCache.getUnchecked(GENERAL_SPIDER_USER_KEY);
|
|
|
+ //不过滤的视频
|
|
|
+ Set<Long> data = generalSpiderExcludeVideoCache.getUnchecked(generalSpiderExcludeVideoCacheKey);
|
|
|
+
|
|
|
+ List<Long> videoIds = param.getVideoIds().stream()
|
|
|
+ .filter(l -> data.contains(l)
|
|
|
+ || (vid2UidMap.containsKey(l) && !generalSpiderUserIds.contains(vid2UidMap.get(l))))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ return videoIds;
|
|
|
+ }
|
|
|
+}
|