Pārlūkot izejas kodu

视频审核分发 消费修改 addLock

wangyunpeng 4 mēneši atpakaļ
vecāks
revīzija
196ff7a2d3

+ 74 - 63
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/ArticleVideoAuditService.java

@@ -417,78 +417,89 @@ public class ArticleVideoAuditService {
 
     @XxlJob("articlePoolAuditQueueJob")
     public ReturnT<String> articlePoolAuditQueueJob(String param) {
-        Long now = System.currentTimeMillis();
-        String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
-        String nextListRedisKey = "article-pool-audit-next-list";
-        String auditQueueRedisKey = "article-pool-audit-queue-" + dateStr;
-        // 判断是否审核超时,重新加入待审核队列
-        while (true) {
-            List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, 0);
-            if (CollectionUtils.isEmpty(nextList)) {
-                break;
+        String lockKey = "article-pool-audit-queue-job-lock";
+        String requestId = UUID.randomUUID().toString();
+        Boolean lockAcquired = redisUtil.tryAcquireLock(lockKey, requestId);
+        if (!lockAcquired) {
+            return ReturnT.SUCCESS;
+        }
+        try {
+            Long now = System.currentTimeMillis();
+            String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
+            String nextListRedisKey = "article-pool-audit-next-list";
+            String auditQueueRedisKey = "article-pool-audit-queue-" + dateStr;
+            // 判断是否审核超时,重新加入待审核队列
+            while (true) {
+                List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, 0);
+                if (CollectionUtils.isEmpty(nextList)) {
+                    break;
+                }
+                JSONObject firstObj = JSONObject.parseObject(nextList.get(0));
+                if (now > firstObj.getLong("timestamp")) {
+                    redisTemplate.opsForList().leftPop(nextListRedisKey);
+                    String id = firstObj.getString("id");
+                    LongArticleTitleAudit videoAudit = titleAuditRepository.getByContentId(id);
+                    ContentPoolEnum poolEnum = ContentPoolEnum.from(videoAudit.getFlowPoolLevel());
+                    redisTemplate.opsForZSet().add(auditQueueRedisKey, id, poolEnum.getWeight());
+                } else {
+                    break;
+                }
             }
-            JSONObject firstObj = JSONObject.parseObject(nextList.get(0));
-            if (now > firstObj.getLong("timestamp")) {
-                redisTemplate.opsForList().leftPop(nextListRedisKey);
-                String id = firstObj.getString("id");
-                LongArticleTitleAudit videoAudit = titleAuditRepository.getByContentId(id);
-                ContentPoolEnum poolEnum = ContentPoolEnum.from(videoAudit.getFlowPoolLevel());
-                redisTemplate.opsForZSet().add(auditQueueRedisKey, id, poolEnum.getWeight());
-            } else {
-                break;
+            List<String> excludeContentIds = new ArrayList<>();
+            List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, -1);
+            Map<String, List<LongArticleTitleAudit>> nextListPoolCountMap = new HashMap<>();
+            if (CollectionUtils.isNotEmpty(nextList)) {
+                List<String> nextListIds = nextList.stream().map(item -> JSONObject.parseObject(item).getString("id")).collect(Collectors.toList());
+                List<LongArticleTitleAudit> auditQueueList = titleAuditRepository.getByContentIdIn(new ArrayList<>(nextListIds));
+                nextListPoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+                excludeContentIds.addAll(nextListIds);
             }
-        }
-        List<String> excludeContentIds = new ArrayList<>();
-        List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, -1);
-        Map<String, List<LongArticleTitleAudit>> nextListPoolCountMap = new HashMap<>();
-        if (CollectionUtils.isNotEmpty(nextList)) {
-            List<String> nextListIds = nextList.stream().map(item -> JSONObject.parseObject(item).getString("id")).collect(Collectors.toList());
-            List<LongArticleTitleAudit> auditQueueList = titleAuditRepository.getByContentIdIn(new ArrayList<>(nextListIds));
-            nextListPoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
-            excludeContentIds.addAll(nextListIds);
-        }
-        Set<String> auditQueueIds = redisTemplate.opsForZSet().rangeByScore(auditQueueRedisKey, 0, 100);
-        Map<String, List<LongArticleTitleAudit>> auditQueuePoolCountMap = new HashMap<>();
-        if (CollectionUtils.isNotEmpty(auditQueueIds)) {
-            List<LongArticleTitleAudit> auditQueueList = titleAuditRepository.getByContentIdIn(new ArrayList<>(auditQueueIds));
-            auditQueuePoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
-            excludeContentIds.addAll(auditQueueIds);
-        }
-        // 每日配置发送量不足添加
-        for (Map.Entry<String, Integer> entry : dailyAuditPoolCount.entrySet()) {
-            ContentPoolEnum poolEnum = ContentPoolEnum.from(entry.getKey());
-            String key = "article_audit_count_" + dateStr + "_" + poolEnum.getContentPool();
-            int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
-            if (totalCount < entry.getValue()) {
-                List<LongArticleTitleAudit> auditQueueList = auditQueuePoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
-                List<LongArticleTitleAudit> nextListPool = nextListPoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
-                int needCount = entry.getValue() - (totalCount + auditQueueList.size() + nextListPool.size());
-                if (needCount > 0) {
-                    List<ArticleVideoAuditListVO> addList = articleAuditMapper.articleVideoWatingAuditList(
-                            Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()),
-                            Arrays.asList(poolEnum.getContentPool()), excludeContentIds, needCount);
-                    if (CollectionUtils.isNotEmpty(addList)) {
-                        for (ArticleVideoAuditListVO item : addList) {
-                            redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentId(), poolEnum.getWeight());
-                            excludeContentIds.add(item.getContentId());
+            Set<String> auditQueueIds = redisTemplate.opsForZSet().rangeByScore(auditQueueRedisKey, 0, 100);
+            Map<String, List<LongArticleTitleAudit>> auditQueuePoolCountMap = new HashMap<>();
+            if (CollectionUtils.isNotEmpty(auditQueueIds)) {
+                List<LongArticleTitleAudit> auditQueueList = titleAuditRepository.getByContentIdIn(new ArrayList<>(auditQueueIds));
+                auditQueuePoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+                excludeContentIds.addAll(auditQueueIds);
+            }
+            // 每日配置发送量不足添加
+            for (Map.Entry<String, Integer> entry : dailyAuditPoolCount.entrySet()) {
+                ContentPoolEnum poolEnum = ContentPoolEnum.from(entry.getKey());
+                String key = "article_audit_count_" + dateStr + "_" + poolEnum.getContentPool();
+                int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
+                if (totalCount < entry.getValue()) {
+                    List<LongArticleTitleAudit> auditQueueList = auditQueuePoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                    List<LongArticleTitleAudit> nextListPool = nextListPoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                    int needCount = entry.getValue() - (totalCount + auditQueueList.size() + nextListPool.size());
+                    if (needCount > 0) {
+                        List<ArticleVideoAuditListVO> addList = articleAuditMapper.articleVideoWatingAuditList(
+                                Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()),
+                                Arrays.asList(poolEnum.getContentPool()), excludeContentIds, needCount);
+                        if (CollectionUtils.isNotEmpty(addList)) {
+                            for (ArticleVideoAuditListVO item : addList) {
+                                redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentId(), poolEnum.getWeight());
+                                excludeContentIds.add(item.getContentId());
+                            }
                         }
                     }
                 }
             }
-        }
-        // 待发布内容不足添加
-        Long auditQueueSize = redisTemplate.opsForZSet().size(auditQueueRedisKey);
-        if (Objects.isNull(auditQueueSize) || auditQueueSize < 20) {
-            List<ArticleVideoAuditListVO> list = articleAuditMapper.articleVideoWatingAuditList(
-                    Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()), null, excludeContentIds, 40);
-            if (CollectionUtils.isNotEmpty(list)) {
-                for (ArticleVideoAuditListVO item : list) {
-                    ContentPoolEnum poolEnum = ContentPoolEnum.from(item.getFlowPoolLevel());
-                    redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentId(), poolEnum.getWeight());
+            // 待发布内容不足添加
+            Long auditQueueSize = redisTemplate.opsForZSet().size(auditQueueRedisKey);
+            if (Objects.isNull(auditQueueSize) || auditQueueSize < 20) {
+                List<ArticleVideoAuditListVO> list = articleAuditMapper.articleVideoWatingAuditList(
+                        Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()), null, excludeContentIds, 40);
+                if (CollectionUtils.isNotEmpty(list)) {
+                    for (ArticleVideoAuditListVO item : list) {
+                        ContentPoolEnum poolEnum = ContentPoolEnum.from(item.getFlowPoolLevel());
+                        redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentId(), poolEnum.getWeight());
+                    }
                 }
             }
+            redisTemplate.expire(auditQueueRedisKey, 12, TimeUnit.HOURS);
+
+        } finally {
+            redisUtil.releaseLock(lockKey, requestId);
         }
-        redisTemplate.expire(auditQueueRedisKey, 12, TimeUnit.HOURS);
         return ReturnT.SUCCESS;
     }
 }

+ 73 - 63
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/VideoPoolAuditService.java

@@ -251,78 +251,88 @@ public class VideoPoolAuditService {
 
     @XxlJob("videoPoolAuditQueueJob")
     public ReturnT<String> videoPoolAuditQueueJob(String param) {
-        Long now = System.currentTimeMillis();
-        String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
-        String nextListRedisKey = "video-pool-audit-next-list";
-        String auditQueueRedisKey = "video-pool-audit-queue-" + dateStr;
-        // 判断是否审核超时,重新加入待审核队列
-        while (true) {
-            List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, 0);
-            if (CollectionUtils.isEmpty(nextList)) {
-                break;
+        String lockKey = "video-pool-audit-queue-job-lock";
+        String requestId = UUID.randomUUID().toString();
+        Boolean lockAcquired = redisUtil.tryAcquireLock(lockKey, requestId);
+        if (!lockAcquired) {
+            return ReturnT.SUCCESS;
+        }
+        try {
+            Long now = System.currentTimeMillis();
+            String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
+            String nextListRedisKey = "video-pool-audit-next-list";
+            String auditQueueRedisKey = "video-pool-audit-queue-" + dateStr;
+            // 判断是否审核超时,重新加入待审核队列
+            while (true) {
+                List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, 0);
+                if (CollectionUtils.isEmpty(nextList)) {
+                    break;
+                }
+                JSONObject firstObj = JSONObject.parseObject(nextList.get(0));
+                if (now > firstObj.getLong("timestamp")) {
+                    redisTemplate.opsForList().leftPop(nextListRedisKey);
+                    String id = firstObj.getString("id");
+                    PublishSingleVideoSource videoAudit = videoSourceRepository.getByContentTraceId(id);
+                    ContentPoolEnum poolEnum = ContentPoolEnum.from(videoAudit.getFlowPoolLevel());
+                    redisTemplate.opsForZSet().add(auditQueueRedisKey, id, poolEnum.getWeight());
+                } else {
+                    break;
+                }
             }
-            JSONObject firstObj = JSONObject.parseObject(nextList.get(0));
-            if (now > firstObj.getLong("timestamp")) {
-                redisTemplate.opsForList().leftPop(nextListRedisKey);
-                String id = firstObj.getString("id");
-                PublishSingleVideoSource videoAudit = videoSourceRepository.getByContentTraceId(id);
-                ContentPoolEnum poolEnum = ContentPoolEnum.from(videoAudit.getFlowPoolLevel());
-                redisTemplate.opsForZSet().add(auditQueueRedisKey, id, poolEnum.getWeight());
-            } else {
-                break;
+            List<String> excludeContentIds = new ArrayList<>();
+            List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, -1);
+            Map<String, List<PublishSingleVideoSource>> nextListPoolCountMap = new HashMap<>();
+            if (CollectionUtils.isNotEmpty(nextList)) {
+                List<String> nextListIds = nextList.stream().map(item -> JSONObject.parseObject(item).getString("id")).collect(Collectors.toList());
+                List<PublishSingleVideoSource> auditQueueList = videoSourceRepository.getByContentTraceIdIn(new ArrayList<>(nextListIds));
+                nextListPoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+                excludeContentIds.addAll(nextListIds);
             }
-        }
-        List<String> excludeContentIds = new ArrayList<>();
-        List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, -1);
-        Map<String, List<PublishSingleVideoSource>> nextListPoolCountMap = new HashMap<>();
-        if (CollectionUtils.isNotEmpty(nextList)) {
-            List<String> nextListIds = nextList.stream().map(item -> JSONObject.parseObject(item).getString("id")).collect(Collectors.toList());
-            List<PublishSingleVideoSource> auditQueueList = videoSourceRepository.getByContentTraceIdIn(new ArrayList<>(nextListIds));
-            nextListPoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
-            excludeContentIds.addAll(nextListIds);
-        }
-        Set<String> auditQueueIds = redisTemplate.opsForZSet().rangeByScore(auditQueueRedisKey, 0, 100);
-        Map<String, List<PublishSingleVideoSource>> auditQueuePoolCountMap = new HashMap<>();
-        if (CollectionUtils.isNotEmpty(auditQueueIds)) {
-            List<PublishSingleVideoSource> auditQueueList = videoSourceRepository.getByContentTraceIdIn(new ArrayList<>(auditQueueIds));
-            auditQueuePoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
-            excludeContentIds.addAll(auditQueueIds);
-        }
-        // 每日配置发送量不足添加
-        for (Map.Entry<String, Integer> entry : dailyAuditPoolCount.entrySet()) {
-            ContentPoolEnum poolEnum = ContentPoolEnum.from(entry.getKey());
-            String key = "video_audit_count_" + dateStr + "_" + poolEnum.getContentPool();
-            int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
-            if (totalCount < entry.getValue()) {
-                List<PublishSingleVideoSource> auditQueueList = auditQueuePoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
-                List<PublishSingleVideoSource> nextListPool = nextListPoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
-                int needCount = entry.getValue() - (totalCount + auditQueueList.size() + nextListPool.size());
-                if (needCount > 0) {
-                    List<PublishSingleVideoSource> addList = videoPoolAuditMapper.articleVideoWatingAuditList(
-                            Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()),
-                            Arrays.asList(poolEnum.getValue()), excludeContentIds, needCount);
-                    if (CollectionUtils.isNotEmpty(addList)) {
-                        for (PublishSingleVideoSource item : addList) {
-                            redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentTraceId(), poolEnum.getWeight());
-                            excludeContentIds.add(item.getContentTraceId());
+            Set<String> auditQueueIds = redisTemplate.opsForZSet().rangeByScore(auditQueueRedisKey, 0, 100);
+            Map<String, List<PublishSingleVideoSource>> auditQueuePoolCountMap = new HashMap<>();
+            if (CollectionUtils.isNotEmpty(auditQueueIds)) {
+                List<PublishSingleVideoSource> auditQueueList = videoSourceRepository.getByContentTraceIdIn(new ArrayList<>(auditQueueIds));
+                auditQueuePoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+                excludeContentIds.addAll(auditQueueIds);
+            }
+            // 每日配置发送量不足添加
+            for (Map.Entry<String, Integer> entry : dailyAuditPoolCount.entrySet()) {
+                ContentPoolEnum poolEnum = ContentPoolEnum.from(entry.getKey());
+                String key = "video_audit_count_" + dateStr + "_" + poolEnum.getContentPool();
+                int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
+                if (totalCount < entry.getValue()) {
+                    List<PublishSingleVideoSource> auditQueueList = auditQueuePoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                    List<PublishSingleVideoSource> nextListPool = nextListPoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                    int needCount = entry.getValue() - (totalCount + auditQueueList.size() + nextListPool.size());
+                    if (needCount > 0) {
+                        List<PublishSingleVideoSource> addList = videoPoolAuditMapper.articleVideoWatingAuditList(
+                                Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()),
+                                Arrays.asList(poolEnum.getValue()), excludeContentIds, needCount);
+                        if (CollectionUtils.isNotEmpty(addList)) {
+                            for (PublishSingleVideoSource item : addList) {
+                                redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentTraceId(), poolEnum.getWeight());
+                                excludeContentIds.add(item.getContentTraceId());
+                            }
                         }
                     }
                 }
             }
-        }
-        // 待发布内容不足添加
-        Long auditQueueSize = redisTemplate.opsForZSet().size(auditQueueRedisKey);
-        if (Objects.isNull(auditQueueSize) || auditQueueSize < 20) {
-            List<PublishSingleVideoSource> list = videoPoolAuditMapper.articleVideoWatingAuditList(
-                    Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()), null, excludeContentIds, 40);
-            if (CollectionUtils.isNotEmpty(list)) {
-                for (PublishSingleVideoSource item : list) {
-                    ContentPoolEnum poolEnum = ContentPoolEnum.from(item.getFlowPoolLevel());
-                    redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentTraceId(), poolEnum.getWeight());
+            // 待发布内容不足添加
+            Long auditQueueSize = redisTemplate.opsForZSet().size(auditQueueRedisKey);
+            if (Objects.isNull(auditQueueSize) || auditQueueSize < 20) {
+                List<PublishSingleVideoSource> list = videoPoolAuditMapper.articleVideoWatingAuditList(
+                        Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()), null, excludeContentIds, 40);
+                if (CollectionUtils.isNotEmpty(list)) {
+                    for (PublishSingleVideoSource item : list) {
+                        ContentPoolEnum poolEnum = ContentPoolEnum.from(item.getFlowPoolLevel());
+                        redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentTraceId(), poolEnum.getWeight());
+                    }
                 }
             }
+            redisTemplate.expire(auditQueueRedisKey, 12, TimeUnit.HOURS);
+        } finally {
+            redisUtil.releaseLock(lockKey, requestId);
         }
-        redisTemplate.expire(auditQueueRedisKey, 12, TimeUnit.HOURS);
         return ReturnT.SUCCESS;
     }
 }