Pārlūkot izejas kodu

Merge branch 'feature_filterUserUpload' of algorithm/recommend-server into master

dingyunpeng 5 mēneši atpakaļ
vecāks
revīzija
7822db2407

+ 4 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/AbstractFilterService.java

@@ -132,6 +132,10 @@ public abstract class AbstractFilterService {
             strategies.add(ServiceBeanFactory.getBean(VovLowerStrategy.class));
         }
 
+        if (CollectionUtils.isNotEmpty(param.getAbExpCodes()) && param.getAbExpCodes().contains("697")) {
+            strategies.add(ServiceBeanFactory.getBean(VideoSourceTypeStrategy.class));
+        }
+
         return strategies;
     }
 

+ 131 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/strategy/VideoSourceTypeStrategy.java

@@ -0,0 +1,131 @@
+package com.tzld.piaoquan.recommend.server.service.filter.strategy;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
+import com.tzld.piaoquan.recommend.server.repository.WxVideoStatus;
+import com.tzld.piaoquan.recommend.server.repository.WxVideoStatusRepository;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
+import com.tzld.piaoquan.recommend.server.service.filter.FilterStrategy;
+import com.tzld.piaoquan.recommend.server.util.JSONUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.dao.DataAccessException;
+import org.springframework.data.redis.core.RedisOperations;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.SessionCallback;
+import org.springframework.data.redis.core.ValueOperations;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @author dyp
+ */
+@Component
+@Slf4j
+public class VideoSourceTypeStrategy implements FilterStrategy {
+    @Autowired
+    @Qualifier("redisTemplate")
+    private RedisTemplate<String, String> redisTemplate;
+
+    @Autowired
+    private WxVideoStatusRepository wxVideoStatusRepository;
+
+    private final String keyFormat = "video:uid:%s";
+
+    public static final String NOT_USER_UPLOAD_USER_KEY = "not:user:upload:user";
+    private LoadingCache<String, Set<String>> notUserUploadUserCache = CacheBuilder.newBuilder()
+            .maximumSize(10000)
+            .refreshAfterWrite(600, TimeUnit.SECONDS)
+            .expireAfterWrite(600, TimeUnit.SECONDS)
+            .expireAfterAccess(600, TimeUnit.SECONDS)
+            .build(new CacheLoader<String, Set<String>>() {
+                @Override
+                public Set<String> load(String key) {
+                    Set<String> result = redisTemplate.opsForSet().members(key);
+                    if (CollectionUtils.isEmpty(result)) {
+                        return Collections.emptySet();
+                    }
+                    return result;
+                }
+            });
+
+    @Override
+    public List<Long> filter(FilterParam param) {
+
+        if (param == null) {
+            return Collections.emptyList();
+        }
+        if (StringUtils.isBlank(param.getMid())
+                || CollectionUtils.isEmpty(param.getVideoIds())) {
+            return param.getVideoIds();
+        }
+
+        // vid -> uid
+        List<String> keys = param.getVideoIds().stream()
+                .map(id -> String.format(keyFormat, id))
+                .collect(Collectors.toList());
+
+        List<String> uids = redisTemplate.opsForValue().multiGet(keys);
+        List<Long> cacheMissVideoIds = new ArrayList<>();
+        Map<Long, String> vid2UidMap = new HashMap<>();
+        for (int i = 0; i < param.getVideoIds().size(); i++) {
+            String value = uids.get(i);
+            if (StringUtils.isBlank(value)) {
+                cacheMissVideoIds.add(param.getVideoIds().get(i));
+            } else {
+                vid2UidMap.put(param.getVideoIds().get(i), value);
+            }
+        }
+
+        if (CollectionUtils.isNotEmpty(cacheMissVideoIds)) {
+            List<WxVideoStatus> status = wxVideoStatusRepository.findAllByVideoIdIn(cacheMissVideoIds);
+            if (CollectionUtils.isNotEmpty(status)) {
+                for (WxVideoStatus v : status) {
+                    vid2UidMap.put(v.getVideoId(), String.valueOf(v.getVideoUid()));
+
+                    // TODO 异步更新缓存
+                    ThreadPoolFactory.defaultPool().execute(() -> {
+                        redisTemplate.executePipelined(new SessionCallback<String>() {
+                            @Override
+                            public <A, B> String execute(RedisOperations<A, B> redisOperations) throws DataAccessException {
+                                ValueOperations<String, String> operations =
+                                        (ValueOperations<String, String>) redisOperations.opsForValue();
+                                status.forEach(v -> {
+                                    operations.set(String.format(keyFormat, v.getVideoId()),
+                                            String.valueOf(v.getRecommendStatus()), RandomUtils.nextInt(30, 60),
+                                            TimeUnit.SECONDS);
+                                });
+
+                                return null;
+                            }
+                        });
+                    });
+                }
+            }
+        }
+
+        Set<String> notUserUploadUserIds = notUserUploadUserCache.getUnchecked(NOT_USER_UPLOAD_USER_KEY);
+
+        List<Long> videoIds = param.getVideoIds().stream()
+                .filter(l -> !vid2UidMap.containsKey(l)
+                        || !notUserUploadUserIds.contains(vid2UidMap.get(l)))
+                .collect(Collectors.toList());
+//        log.info("VideoSourceTypeStrategy \t param={} \t vid2Uid={} \t before={} \t " +
+//                        "after={}",
+//                JSONUtils.toJson(param),
+//                JSONUtils.toJson(vid2UidMap),
+//                JSONUtils.toJson(param.getVideoIds()),
+//                JSONUtils.toJson(videoIds));
+
+        return videoIds;
+    }
+}

+ 65 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/xxl/NotUserUploadUserCacheJob.java

@@ -0,0 +1,65 @@
+package com.tzld.piaoquan.recommend.server.xxl;
+
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.recommend.server.common.base.Constant;
+import com.tzld.piaoquan.recommend.server.service.filter.strategy.VideoSourceTypeStrategy;
+import com.tzld.piaoquan.recommend.server.service.odps.ODPSManager;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.FlowPoolLastDayTopRecallStrategy;
+import com.tzld.piaoquan.recommend.server.service.recall.strategy.TopGoodPerformanceVideoRecallStrategy;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import com.xxl.job.core.log.XxlJobLogger;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @author sunxy
+ */
+@Component
+@Slf4j
+public class NotUserUploadUserCacheJob {
+
+    @Autowired
+    private ODPSManager odpsManager;
+
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
+    @XxlJob("updateNotUserUploadUserCache")
+    public ReturnT<String> updateNotUserUploadUserCache(String params) {
+        // 执行 ODPS Sql,获取数据
+        List<String> uids = queryNotUserUploadUser();
+        if (CollectionUtils.isEmpty(uids)) {
+            XxlJobLogger.log("updateNotUserUploadUserCache, uids is empty");
+            return ReturnT.SUCCESS;
+        }
+        redisTemplate.delete(VideoSourceTypeStrategy.NOT_USER_UPLOAD_USER_KEY);
+        redisTemplate.opsForSet().add(VideoSourceTypeStrategy.NOT_USER_UPLOAD_USER_KEY,
+                uids.toArray(new String[uids.size()]));
+        redisTemplate.expire(VideoSourceTypeStrategy.NOT_USER_UPLOAD_USER_KEY, Duration.ofDays(2));
+        
+        XxlJobLogger.log("updateNotUserUploadUserCache, uids:{}", uids);
+        return ReturnT.SUCCESS;
+    }
+
+    private List<String> queryNotUserUploadUser() {
+        String sql = "SELECT distinct uid FROM loghubods.operators_channel_day where type != 'userupload' order by uid desc;";
+        List<Record> records = odpsManager.query(sql);
+        if (records == null || records.size() == 0) {
+            return Collections.emptyList();
+        }
+        return records.stream().map(record -> record.getString(0)).collect(Collectors.toList());
+    }
+
+
+}