Browse Source

推荐过滤风险视频

supeng 6 days ago
parent
commit
6cc6b9314d

+ 2 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/RedisKeyConstants.java

@@ -8,6 +8,8 @@ public class RedisKeyConstants {
     public static class Recommend {
         public static String riskUserMid = "risk:user:mid";
         public static String riskUserUid = "risk:user:uid";
+
+        public static final String WECHAT_RISK_VIDEO_CACHE_KEY = "wechat:risk:video:set";
     }
 
     public static class DouHot {

+ 14 - 5
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/strategy/RiskVideoStrategy.java

@@ -4,6 +4,7 @@ import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.tzld.piaoquan.recommend.server.common.RedisKeyConstants;
 import com.tzld.piaoquan.recommend.server.repository.WxVideoTagRel;
 import com.tzld.piaoquan.recommend.server.repository.WxVideoTagRelRepository;
 import com.tzld.piaoquan.recommend.server.service.filter.FilterParam;
@@ -13,11 +14,10 @@ import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -31,6 +31,9 @@ public class RiskVideoStrategy implements FilterStrategy {
     @ApolloJsonValue("${risk.video.tag:[]}")
     private List<Long> riskVideoTagIds;
 
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
     @Autowired
     private WxVideoTagRelRepository repository;
     // 内存持久保存不淘汰
@@ -42,8 +45,14 @@ public class RiskVideoStrategy implements FilterStrategy {
             .build(new CacheLoader<String, Set<Long>>() {
                 @Override
                 public Set<Long> load(String key) {
-                    List<WxVideoTagRel> rels = repository.findAllByTagIdIn(riskVideoTagIds);
-                    return CommonCollectionUtils.toSet(rels, WxVideoTagRel::getVideoId);
+//                    List<WxVideoTagRel> rels = repository.findAllByTagIdIn(riskVideoTagIds);
+//                    return CommonCollectionUtils.toSet(rels, WxVideoTagRel::getVideoId);
+                    //改为视频list 不超过10w条
+                    Set<String> videoIdSet = redisTemplate.opsForSet().members(RedisKeyConstants.Recommend.WECHAT_RISK_VIDEO_CACHE_KEY);
+                    if (Objects.isNull(videoIdSet) || videoIdSet.isEmpty()) {
+                        return new HashSet<>();
+                    }
+                    return videoIdSet.stream().map(Long::parseLong).collect(Collectors.toSet());
                 }
             });
 

+ 36 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/odps/ODPSManager.java

@@ -3,13 +3,18 @@ package com.tzld.piaoquan.recommend.server.service.odps;
 import com.aliyun.odps.Instance;
 import com.aliyun.odps.Odps;
 import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.PartitionSpec;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.Record;
+import com.aliyun.odps.data.RecordReader;
 import com.aliyun.odps.task.SQLTask;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
+import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
@@ -24,6 +29,7 @@ public class ODPSManager {
     private final static String ACCESSID = "LTAIWYUujJAm7CbH";
     private final static String ACCESSKEY = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
     private final static String ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api";
+    private final static String VPC_ENDPOINT = "http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api";
 
     public List<Record> query(String sql) {
         Account account = new AliyunAccount(ACCESSID, ACCESSKEY);
@@ -48,6 +54,36 @@ public class ODPSManager {
         return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
     }
 
+    public List<Record> tableTunnelQuery(String project, String table, String partition){
+        Account account = new AliyunAccount(ACCESSID, ACCESSKEY);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(VPC_ENDPOINT);
+        odps.setDefaultProject(project);
+        TableTunnel tunnel = new TableTunnel(odps);
+        List<Record> records = new ArrayList<>();
+        RecordReader recordReader = null;
+        try {
+            TableTunnel.DownloadSession downloadSession = tunnel.createDownloadSession(project, table, new PartitionSpec(partition));
+            long count = downloadSession.getRecordCount();
+            recordReader = downloadSession.openRecordReader(0, count);
+            Record record;
+            while ((record = recordReader.read()) != null) {
+                records.add(record);
+            }
+        } catch (Exception e) {
+            log.error("tableTunnelQuery error", e);
+        } finally {
+            if (!Objects.isNull(recordReader)) {
+                try {
+                    recordReader.close();
+                } catch (Exception e) {
+                    log.error("error: ", e);
+                }
+            }
+        }
+        return records;
+    }
+
     public static void main(String[] args) {
         Calendar cal = Calendar.getInstance();
         cal.setTime(new Date());

+ 90 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/xxl/RickVideoCacheJob.java

@@ -0,0 +1,90 @@
+package com.tzld.piaoquan.recommend.server.xxl;
+
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.recommend.server.common.RedisKeyConstants;
+import com.tzld.piaoquan.recommend.server.service.odps.ODPSManager;
+import com.tzld.piaoquan.recommend.server.util.DateUtils;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import com.xxl.job.core.log.XxlJobLogger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.dao.DataAccessException;
+import org.springframework.data.redis.core.RedisOperations;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.SessionCallback;
+import org.springframework.data.redis.core.SetOperations;
+import org.springframework.stereotype.Component;
+
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * 高风险内容
+ *
+ * @author supeng
+ */
+@Component
+@Slf4j
+public class RickVideoCacheJob {
+
+    @Value("${wechat.risk.video.table:loghubods.wechat_feed_block_videolist}")
+    private String riskVideoTable;
+
+    @Autowired
+    private ODPSManager odpsManager;
+
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
+    private static final String TEMP = "temp:";
+
+    @XxlJob("updateRiskVideoCacheHandler")
+    public ReturnT<String> updateRiskVideoCacheHandler(String params) {
+        XxlJobLogger.log("updateRiskVideoCacheHandler start...");
+        try {
+            //1. 从大数据读取数据 最大不超过10万条
+            String sql = "SELECT videoid FROM " + riskVideoTable + " WHERE dt = MAX_PT('" + riskVideoTable + "');";
+            List<Record> records = odpsManager.query(sql);
+            if (Objects.isNull(records) || records.isEmpty()) {
+                XxlJobLogger.log("records is empty");
+                //无数据 清空缓存
+                redisTemplate.delete(RedisKeyConstants.Recommend.WECHAT_RISK_VIDEO_CACHE_KEY);
+                return ReturnT.SUCCESS;
+            }
+            XxlJobLogger.log("records size = {}", records.size());
+            //2. 更新到redis缓存
+            Set<String> videoSet = new HashSet<>();
+            for (Record record : records) {
+                try {
+                    Long videoId = record.getBigint("videoid");
+                    if (Objects.isNull(videoId) || videoId <= 0) {
+                        continue;
+                    }
+                    videoSet.add(videoId.toString());
+                } catch (Exception e) {
+                }
+            }
+            XxlJobLogger.log("videoSet size = {}", videoSet.size());
+            if (videoSet.isEmpty()) {
+                redisTemplate.delete(RedisKeyConstants.Recommend.WECHAT_RISK_VIDEO_CACHE_KEY);
+                return ReturnT.SUCCESS;
+            }
+            //需要保证原子性;由于数据量可能较大,不使用lua,采用临时key重命名方式
+            String tempKey = TEMP + RedisKeyConstants.Recommend.WECHAT_RISK_VIDEO_CACHE_KEY + System.currentTimeMillis();
+            redisTemplate.opsForSet().add(tempKey, videoSet.toArray(new String[0]));
+            redisTemplate.rename(tempKey, RedisKeyConstants.Recommend.WECHAT_RISK_VIDEO_CACHE_KEY);
+            XxlJobLogger.log("rename tempKey = {} newKey = {} finish", tempKey, RedisKeyConstants.Recommend.WECHAT_RISK_VIDEO_CACHE_KEY);
+        } catch (Exception e) {
+            XxlJobLogger.log(e);
+            return ReturnT.FAIL;
+        } finally {
+            XxlJobLogger.log("updateRiskVideoCacheHandler finish.");
+        }
+        return ReturnT.SUCCESS;
+    }
+}