| 
					
				 | 
			
			
				@@ -0,0 +1,99 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+package com.tzld.piaoquan.recommend.server.xxl; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.aliyun.odps.data.Record; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.common.RedisKeyConstants; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.service.odps.ODPSManager; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.tzld.piaoquan.recommend.server.util.DateUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.xxl.job.core.biz.model.ReturnT; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.xxl.job.core.handler.annotation.XxlJob; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import com.xxl.job.core.log.XxlJobLogger; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import lombok.extern.slf4j.Slf4j; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.apache.commons.collections4.CollectionUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.apache.commons.lang3.StringUtils; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.apache.commons.lang3.tuple.Pair; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.beans.factory.annotation.Autowired; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.beans.factory.annotation.Value; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.dao.DataAccessException; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.data.redis.core.RedisOperations; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.data.redis.core.RedisTemplate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.data.redis.core.SessionCallback; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.data.redis.core.SetOperations; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import org.springframework.stereotype.Component; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.text.SimpleDateFormat; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+import java.util.*; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+/** 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * 高风险内容 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ * @author supeng 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ */ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+@Component 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+@Slf4j 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+public class RickVideoCacheJob { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private static final String PROJECT = "loghubods"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Value("${wechat.risk.video.table:wechat_feed_block_videolist}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private String riskVideoTable; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private ODPSManager odpsManager; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @Autowired 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private RedisTemplate<String, String> redisTemplate; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private static final String TEMP = ":temp:"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    @XxlJob("updateRiskVideoCacheHandler") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    public ReturnT<String> updateRiskVideoCacheHandler(String params) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        XxlJobLogger.log("updateRiskVideoCacheHandler start..."); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            //1. 从大数据读取数据 最大不超过10万条 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+//            String dt = "MAX_PT('" + PROJECT + "." + riskVideoTable + "')"; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            String dt = new SimpleDateFormat("yyyyMMddHH").format(new Date()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            //支持手动指定分区 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (Objects.nonNull(params) && !Objects.equals("", params.trim())) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                dt = params; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            XxlJobLogger.log("dt = {}", dt); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            List<Record> records = odpsManager.tableTunnelQuery(PROJECT, riskVideoTable, "dt=" + dt); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (Objects.isNull(records) || records.isEmpty()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                XxlJobLogger.log("records is empty"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                //无数据 暂不清空缓存 避免大数据产出数据失败 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+//                redisTemplate.delete(RedisKeyConstants.Recommend.WECHAT_RISK_VIDEO_CACHE_KEY); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                return ReturnT.SUCCESS; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            XxlJobLogger.log("records size = {}", records.size()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            //2. 更新到redis缓存 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Set<String> videoSet = new HashSet<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            for (Record record : records) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                try { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    Long videoId = record.getBigint("videoid"); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    if (Objects.isNull(videoId) || videoId <= 0) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        continue; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    videoSet.add(videoId.toString()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } catch (Exception e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            XxlJobLogger.log("videoSet size = {}", videoSet.size()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if (videoSet.isEmpty()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                //暂不清空缓存 避免大数据产出数据失败 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+//                redisTemplate.delete(RedisKeyConstants.Recommend.WECHAT_RISK_VIDEO_CACHE_KEY); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                return ReturnT.SUCCESS; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            //需要保证原子性;由于数据量可能较大,不使用lua,采用临时key重命名方式;使用hashtag,保证rename在一个slot 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            String tempKey = RedisKeyConstants.Recommend.WECHAT_RISK_VIDEO_CACHE_KEY + TEMP + System.currentTimeMillis(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            redisTemplate.opsForSet().add(tempKey, videoSet.toArray(new String[0])); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            redisTemplate.rename(tempKey, RedisKeyConstants.Recommend.WECHAT_RISK_VIDEO_CACHE_KEY); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            XxlJobLogger.log("rename tempKey = {} newKey = {} finish", tempKey, RedisKeyConstants.Recommend.WECHAT_RISK_VIDEO_CACHE_KEY); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } catch (Exception e) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            XxlJobLogger.log(e); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            return ReturnT.FAIL; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } finally { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            XxlJobLogger.log("updateRiskVideoCacheHandler finish."); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        return ReturnT.SUCCESS; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+} 
			 |