Преглед на файлове

视频审核分发 消费修改

wangyunpeng преди 4 месеца
родител
ревизия
81bc142255

+ 7 - 5
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/common/enums/recommend/ContentPoolEnum.java

@@ -8,21 +8,23 @@ import java.util.Objects;
 
 @Getter
 public enum ContentPoolEnum {
-    autoArticlePoolLevel1("autoArticlePoolLevel1", 1, "头条"),
-    autoArticlePoolLevel2("autoArticlePoolLevel2", 2, "头条"),
-    autoArticlePoolLevel3("autoArticlePoolLevel3", 3, "次条"),
-    autoArticlePoolLevel4("autoArticlePoolLevel4", 4, "冷启层"),
+    autoArticlePoolLevel1("autoArticlePoolLevel1", 1, "头条", 100),
+    autoArticlePoolLevel2("autoArticlePoolLevel2", 2, "头条", 50),
+    autoArticlePoolLevel3("autoArticlePoolLevel3", 3, "次条", 25),
+    autoArticlePoolLevel4("autoArticlePoolLevel4", 4, "冷启层", 1),
 
     ;
 
     private final String contentPool;
     private final Integer value;
     private final String description;
+    private final Integer weight;
 
-    ContentPoolEnum(String contentPool, Integer value, String description) {
+    ContentPoolEnum(String contentPool, Integer value, String description, Integer weight) {
         this.contentPool = contentPool;
         this.value = value;
         this.description = description;
+        this.weight = weight;
     }
 
     public static ContentPoolEnum from(String contentPool) {

+ 2 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/mapper/longArticle/ArticleAuditMapper.java

@@ -13,11 +13,13 @@ public interface ArticleAuditMapper {
 
     int articleVideoAuditListCount(List<String> contentId, List<Integer> status,
                                    List<String> title, List<String> auditAccount, List<String> producePlanIds,
+                                   List<String> flowPoolLevel,
                                    ListItemFilterOrderParam auditTimestamp);
 
     List<ArticleVideoAuditListVO> articleVideoAuditList(List<String> contentId, List<Integer> status,
                                                         List<String> title, List<String> auditAccount,
                                                         List<String> producePlanIds,
+                                                        List<String> flowPoolLevel,
                                                         ListItemFilterOrderParam auditTimestamp,
                                                         int offset, Integer pageSize, String poolLevelDesc);
 

+ 1 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/model/param/videoAudit/ArticleVideoAuditListParam.java

@@ -12,6 +12,7 @@ public class ArticleVideoAuditListParam {
     private List<Integer> status;
     private List<String> auditAccount;
     private List<String> sourceProducePlan;
+    private List<String> flowPoolLevel;
     private ListItemFilterOrderParam auditTimestamp;
 
     private Integer pageNum = 1;

+ 114 - 52
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/ArticleVideoAuditService.java

@@ -1,6 +1,7 @@
 package com.tzld.longarticle.recommend.server.service.recommend;
 
 import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.tzld.longarticle.recommend.server.common.enums.aigc.PublishContentStatusEnum;
 import com.tzld.longarticle.recommend.server.common.enums.longArticle.ArticleVideoAuditStatusEnum;
@@ -22,7 +23,6 @@ import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
@@ -75,10 +75,10 @@ public class ArticleVideoAuditService {
     public Page<ArticleVideoAuditListVO> list(ArticleVideoAuditListParam param) {
         int offset = (param.getPageNum() - 1) * param.getPageSize();
         int count = articleAuditMapper.articleVideoAuditListCount(param.getContentId(), param.getStatus(),
-                param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan(), param.getAuditTimestamp());
+                param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan(), param.getFlowPoolLevel(), param.getAuditTimestamp());
         List<ArticleVideoAuditListVO> list = articleAuditMapper.articleVideoAuditList(param.getContentId(),
                 param.getStatus(), param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan()
-                , param.getAuditTimestamp(), offset, param.getPageSize(), poolLevelDesc);
+                , param.getFlowPoolLevel(), param.getAuditTimestamp(), offset, param.getPageSize(), poolLevelDesc);
         buildArticleVideoAuditListVO(list);
         Page<ArticleVideoAuditListVO> page = new Page<>(param.getPageNum(), param.getPageSize());
         page.setTotalSize(count);
@@ -152,74 +152,60 @@ public class ArticleVideoAuditService {
             return result; // 返回空结果
         }
 
+        long now = System.currentTimeMillis();
+        String id;
+        String nextListRedisKey = "article-pool-audit-next-list";
         ArticleVideoAuditListVO item = null;
         try {
-            Long now = System.currentTimeMillis();
-            String redisKey = "article-pool-audit-next-list";
-            Map<Object, Object> entries = redisTemplate.opsForHash().entries(redisKey);
-            List<String> excludeContentIds = new ArrayList<>();
-            entries.forEach((k, v) -> {
-                long timestamp = Long.parseLong((String) v);
-                if (now > timestamp) {
-                    redisTemplate.opsForHash().delete(redisKey, k);
-                } else {
+            String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
+            String auditQueueRedisKey = "article-pool-audit-queue-" + dateStr;
+            // 从待审核队列中获取数据,如未获取到则从数据库查询一条
+            Long size = redisTemplate.opsForZSet().size(auditQueueRedisKey);
+            if (Objects.isNull(size) || size == 0) {
+                Map<Object, Object> entries = redisTemplate.opsForHash().entries(nextListRedisKey);
+                List<String> excludeContentIds = new ArrayList<>();
+                entries.forEach((k, v) -> {
                     excludeContentIds.add((String) k);
-                }
-            });
-            // 根据配置判断当日是否审核完成 并 选择内容池返回
-            List<String> excludePoolLevel = new ArrayList<>();
-            String poolLevel = getAuditPoolLevel(excludePoolLevel);
-            if (Objects.isNull(poolLevel)) {
+                });
                 item = articleAuditMapper.articleVideoAuditNext(param.getContentId(),
                         param.getStatus(), param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan(),
-                        poolLevel, excludeContentIds);
+                        null, excludeContentIds);
             } else {
-                do {
-                    item = articleAuditMapper.articleVideoAuditNext(param.getContentId(),
-                            param.getStatus(), param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan(),
-                            poolLevel, excludeContentIds);
-                    if (Objects.nonNull(item)) {
+                // 从待审核队列中获取数据, 如获取到的内容已审核,则重新获取
+                while (true) {
+                    Set<String> ids = redisTemplate.opsForZSet().reverseRangeByScore(auditQueueRedisKey, 0, 100, 0, 1);
+                    if (CollectionUtils.isNotEmpty(ids)) {
+                        id = ids.iterator().next();
+                        redisTemplate.opsForZSet().remove(auditQueueRedisKey, id);
+                        item = articleAuditMapper.articleVideoAuditNext(Arrays.asList(id), null, null,
+                                null, null, null, null);
+                        if (item.getStatus() == ArticleVideoAuditStatusEnum.WAITING.getCode()) {
+                            break;
+                        }
+                    } else {
                         break;
                     }
-                    excludePoolLevel.add(poolLevel);
-                    poolLevel = getAuditPoolLevel(excludePoolLevel);
-                } while (Objects.nonNull(poolLevel));
-            }
-            if (Objects.isNull(item)) {
-                return result;
+                }
             }
-            redisTemplate.opsForHash().put(redisKey, item.getContentId(), String.valueOf(now + 600000));
         } finally {
             // 释放锁
             redisUtil.releaseLock(lockKey, requestId);
         }
+        if (Objects.isNull(item)) {
+            return result;
+        }
+
+        // 添加到超时队列
+        JSONObject json = new JSONObject();
+        json.put("id", item.getContentId());
+        json.put("timestamp", now + 900000);
+        redisTemplate.opsForList().rightPush(nextListRedisKey, json.toJSONString());
         List<ArticleVideoAuditListVO> list = Collections.singletonList(item);
         buildArticleVideoAuditListVO(list);
         result.setObjs(list);
         return result;
     }
 
-    private String getAuditPoolLevel(List<String> excludePoolLevel) {
-        if (MapUtils.isEmpty(dailyAuditPoolCount)) {
-            return null;
-        }
-        String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
-        Set<String> keySet = dailyAuditPoolCount.keySet();
-        keySet = keySet.stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new));
-        for (String poolLevel : keySet) {
-            if (excludePoolLevel.contains(poolLevel)) {
-                continue;
-            }
-            int target = dailyAuditPoolCount.get(poolLevel);
-            String key = "article_audit_count_" + dateStr + "_" + poolLevel;
-            int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
-            if (target > totalCount) {
-                return poolLevel;
-            }
-        }
-        return null;
-    }
-
     public void auditArticle(ArticleAuditParam param) {
         LongArticleTitleAudit titleAudit = titleAuditRepository.getByContentId(param.getContentId());
         Long now = System.currentTimeMillis();
@@ -236,6 +222,7 @@ public class ArticleVideoAuditService {
         }
         // 当日审核数+1
         addAuditCount(titleAudit.getFlowPoolLevel());
+        redisTemplate.opsForHash().delete("article-pool-audit-next-list", titleAudit.getContentId());
     }
 
     private void addAuditCount(String poolLevel) {
@@ -431,4 +418,79 @@ public class ArticleVideoAuditService {
         item.setAuditAccount(auditAccount);
         titleAuditRepository.save(item);
     }
+
+    @XxlJob("articlePoolAuditQueueJob")
+    public ReturnT<String> articlePoolAuditQueueJob(String param) {
+        Long now = System.currentTimeMillis();
+        String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
+        String nextListRedisKey = "article-pool-audit-next-list";
+        String auditQueueRedisKey = "article-pool-audit-queue-" + dateStr;
+        // 判断是否审核超时,重新加入待审核队列
+        while (true) {
+            List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, 0);
+            if (CollectionUtils.isEmpty(nextList)) {
+                break;
+            }
+            JSONObject firstObj = JSONObject.parseObject(nextList.get(0));
+            if (now > firstObj.getLong("timestamp")) {
+                redisTemplate.opsForList().leftPop(nextListRedisKey);
+                String id = firstObj.getString("id");
+                LongArticleTitleAudit videoAudit = titleAuditRepository.getByContentId(id);
+                ContentPoolEnum poolEnum = ContentPoolEnum.from(videoAudit.getFlowPoolLevel());
+                redisTemplate.opsForZSet().add(auditQueueRedisKey, id, poolEnum.getWeight());
+            } else {
+                break;
+            }
+        }
+        List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, -1);
+        Map<String, List<LongArticleTitleAudit>> nextListPoolCountMap = new HashMap<>();
+        if (CollectionUtils.isNotEmpty(nextList)) {
+            List<String> nextListIds = nextList.stream().map(item -> JSONObject.parseObject(item).getString("id")).collect(Collectors.toList());
+            List<LongArticleTitleAudit> auditQueueList = titleAuditRepository.getByContentIdIn(new ArrayList<>(nextListIds));
+            nextListPoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+        }
+        Set<String> auditQueueIds = redisTemplate.opsForZSet().rangeByScore(auditQueueRedisKey, 0, 100);
+        Map<String, List<LongArticleTitleAudit>> auditQueuePoolCountMap = new HashMap<>();
+        if (CollectionUtils.isNotEmpty(auditQueueIds)) {
+            List<LongArticleTitleAudit> auditQueueList = titleAuditRepository.getByContentIdIn(new ArrayList<>(auditQueueIds));
+            auditQueuePoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+        }
+        // 每日配置发送量不足添加
+        for (Map.Entry<String, Integer> entry : dailyAuditPoolCount.entrySet()) {
+            ContentPoolEnum poolEnum = ContentPoolEnum.from(entry.getKey());
+            String key = "article_audit_count_" + dateStr + "_" + poolEnum.getContentPool();
+            int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
+            if (totalCount < entry.getValue()) {
+                List<LongArticleTitleAudit> auditQueueList = auditQueuePoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                List<LongArticleTitleAudit> nextListPool = nextListPoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                int needCount = entry.getValue() - (totalCount + auditQueueList.size() + nextListPool.size());
+                if (needCount > 0) {
+                    List<ArticleVideoAuditListVO> addList = articleAuditMapper.articleVideoAuditList(null,
+                            Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()), null, null,
+                            null, Arrays.asList(poolEnum.getContentPool()), null,
+                            0, needCount, poolLevelDesc);
+                    if (CollectionUtils.isNotEmpty(addList)) {
+                        for (ArticleVideoAuditListVO item : addList) {
+                            redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentId(), poolEnum.getWeight());
+                        }
+                    }
+                }
+            }
+        }
+        // 待发布内容不足添加
+        Long auditQueueSize = redisTemplate.opsForZSet().size(auditQueueRedisKey);
+        if (Objects.isNull(auditQueueSize) || auditQueueSize < 20) {
+            List<ArticleVideoAuditListVO> list = articleAuditMapper.articleVideoAuditList(null,
+                    Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()), null, null, null
+                    , null, null, 0, 40, poolLevelDesc);
+            if (CollectionUtils.isNotEmpty(list)) {
+                for (ArticleVideoAuditListVO item : list) {
+                    ContentPoolEnum poolEnum = ContentPoolEnum.from(item.getFlowPoolLevel());
+                    redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentId(), poolEnum.getWeight());
+                }
+            }
+        }
+        redisTemplate.expire(auditQueueRedisKey, 12, TimeUnit.HOURS);
+        return ReturnT.SUCCESS;
+    }
 }

+ 110 - 52
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/VideoPoolAuditService.java

@@ -1,5 +1,6 @@
 package com.tzld.longarticle.recommend.server.service.recommend;
 
+import com.alibaba.fastjson.JSONObject;
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.tzld.longarticle.recommend.server.common.enums.aigc.PublishContentStatusEnum;
 import com.tzld.longarticle.recommend.server.common.enums.longArticle.ArticleVideoAuditStatusEnum;
@@ -18,7 +19,6 @@ import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
@@ -101,7 +101,6 @@ public class VideoPoolAuditService {
             return list(param);
         }
         Page<VideoPoolAuditListVO> result = new Page<>();
-        // 排除已分配内容
         String lockKey = "video-pool-audit-lock";
         String requestId = UUID.randomUUID().toString();
         int retryCount = 0;
@@ -125,73 +124,57 @@ public class VideoPoolAuditService {
         if (!lockAcquired) {
             return result; // 返回空结果
         }
-
+        long now = System.currentTimeMillis();
+        String id;
         PublishSingleVideoSource obj = null;
+        String nextListRedisKey = "video-pool-audit-next-list";
         try {
-            Long now = System.currentTimeMillis();
-            String redisKey = "video-pool-audit-next-list";
-            Map<Object, Object> entries = redisTemplate.opsForHash().entries(redisKey);
-            List<String> excludeContentIds = new ArrayList<>();
-            entries.forEach((k, v) -> {
-                long timestamp = Long.parseLong((String) v);
-                if (now > timestamp) {
-                    redisTemplate.opsForHash().delete(redisKey, k);
-                } else {
+            String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
+            String auditQueueRedisKey = "video-pool-audit-queue-" + dateStr;
+            // 从待审核队列中获取数据,如未获取到则从数据库查询一条
+            Long size = redisTemplate.opsForZSet().size(auditQueueRedisKey);
+            if (Objects.isNull(size) || size == 0) {
+                Map<Object, Object> entries = redisTemplate.opsForHash().entries(nextListRedisKey);
+                List<String> excludeContentIds = new ArrayList<>();
+                entries.forEach((k, v) -> {
                     excludeContentIds.add((String) k);
-                }
-            });
-            // 根据配置判断当日是否审核完成 并 选择内容池返回
-            List<String> excludePoolLevel = new ArrayList<>();
-            Integer poolLevel = getAuditPoolLevel(excludePoolLevel);
-            if (Objects.isNull(poolLevel)) {
+                });
                 obj = videoPoolAuditMapper.articleVideoAuditNext(param.getContentId(),
-                        param.getStatus(), param.getTitle(), param.getAuditAccount(), poolLevel, excludeContentIds);
+                        param.getStatus(), param.getTitle(), param.getAuditAccount(), null, excludeContentIds);
             } else {
-                do {
-                    obj = videoPoolAuditMapper.articleVideoAuditNext(param.getContentId(),
-                            param.getStatus(), param.getTitle(), param.getAuditAccount(), poolLevel, excludeContentIds);
-                    if (Objects.nonNull(obj)) {
+                // 从待审核队列中获取数据, 如获取到的内容已审核,则重新获取
+                while (true) {
+                    Set<String> ids = redisTemplate.opsForZSet().reverseRangeByScore(auditQueueRedisKey, 0, 100, 0, 1);
+                    if (CollectionUtils.isNotEmpty(ids)) {
+                        id = ids.iterator().next();
+                        redisTemplate.opsForZSet().remove(auditQueueRedisKey, id);
+                        obj = videoSourceRepository.getByContentTraceId(id);
+                        if (obj.getVideoPoolAuditStatus() == ArticleVideoAuditStatusEnum.WAITING.getCode()) {
+                            break;
+                        }
+                    } else {
                         break;
                     }
-                    excludePoolLevel.add(ContentPoolEnum.from(poolLevel).getContentPool());
-                    poolLevel = getAuditPoolLevel(excludePoolLevel);
-                } while (Objects.nonNull(poolLevel));
-            }
-            if (Objects.isNull(obj)) {
-                return result;
+                }
             }
-            redisTemplate.opsForHash().put(redisKey, obj.getContentTraceId(), String.valueOf(now + 600000));
         } finally {
             // 释放锁
             redisUtil.releaseLock(lockKey, requestId);
         }
+        if (Objects.isNull(obj)) {
+            return result;
+        }
+        // 添加到超时队列
+        JSONObject json = new JSONObject();
+        json.put("id", obj.getContentTraceId());
+        json.put("timestamp", now + 600000);
+        redisTemplate.opsForList().rightPush(nextListRedisKey, json.toJSONString());
+        // 填充数据
         List<VideoPoolAuditListVO> list = buildVideoPoolAuditListVO(Collections.singletonList(obj));
         result.setObjs(list);
         return result;
     }
 
-    private Integer getAuditPoolLevel(List<String> excludePoolLevel) {
-        if (MapUtils.isEmpty(dailyAuditPoolCount)) {
-            return null;
-        }
-        String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
-        Set<String> keySet = dailyAuditPoolCount.keySet();
-        keySet = keySet.stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new));
-        for (String poolLevel : keySet) {
-            if (excludePoolLevel.contains(poolLevel)) {
-                continue;
-            }
-            int target = dailyAuditPoolCount.get(poolLevel);
-            String key = "video_audit_count_" + dateStr + "_" + poolLevel;
-            int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
-            if (target > totalCount) {
-                ContentPoolEnum poolEnum = ContentPoolEnum.from(poolLevel);
-                return poolEnum.getValue();
-            }
-        }
-        return null;
-    }
-
     public void auditArticle(ArticleAuditParam param) {
         PublishSingleVideoSource videoAudit = videoSourceRepository.getByContentTraceId(param.getContentId());
         Long now = System.currentTimeMillis();
@@ -213,6 +196,7 @@ public class VideoPoolAuditService {
         }
         // 当日审核数+1
         addAuditCount(ContentPoolEnum.from(videoAudit.getFlowPoolLevel()).getContentPool());
+        redisTemplate.opsForHash().delete("video-pool-audit-next-list", videoAudit.getContentTraceId());
     }
 
     private void addAuditCount(String poolLevel) {
@@ -268,4 +252,78 @@ public class VideoPoolAuditService {
         }
         return ReturnT.SUCCESS;
     }
+
+    @XxlJob("videoPoolAuditQueueJob")
+    public ReturnT<String> videoPoolAuditQueueJob(String param) {
+        Long now = System.currentTimeMillis();
+        String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
+        String nextListRedisKey = "video-pool-audit-next-list";
+        String auditQueueRedisKey = "video-pool-audit-queue-" + dateStr;
+        // 判断是否审核超时,重新加入待审核队列
+        while (true) {
+            List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, 0);
+            if (CollectionUtils.isEmpty(nextList)) {
+                break;
+            }
+            JSONObject firstObj = JSONObject.parseObject(nextList.get(0));
+            if (now > firstObj.getLong("timestamp")) {
+                redisTemplate.opsForList().leftPop(nextListRedisKey);
+                String id = firstObj.getString("id");
+                PublishSingleVideoSource videoAudit = videoSourceRepository.getByContentTraceId(id);
+                ContentPoolEnum poolEnum = ContentPoolEnum.from(videoAudit.getFlowPoolLevel());
+                redisTemplate.opsForZSet().add(auditQueueRedisKey, id, poolEnum.getWeight());
+            } else {
+                break;
+            }
+        }
+        List<String> nextList = redisTemplate.opsForList().range(nextListRedisKey, 0, -1);
+        Map<String, List<PublishSingleVideoSource>> nextListPoolCountMap = new HashMap<>();
+        if (CollectionUtils.isNotEmpty(nextList)) {
+            List<String> nextListIds = nextList.stream().map(item -> JSONObject.parseObject(item).getString("id")).collect(Collectors.toList());
+            List<PublishSingleVideoSource> auditQueueList = videoSourceRepository.getByContentTraceIdIn(new ArrayList<>(nextListIds));
+            nextListPoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+        }
+        Set<String> auditQueueIds = redisTemplate.opsForZSet().rangeByScore(auditQueueRedisKey, 0, 100);
+        Map<String, List<PublishSingleVideoSource>> auditQueuePoolCountMap = new HashMap<>();
+        if (CollectionUtils.isNotEmpty(auditQueueIds)) {
+            List<PublishSingleVideoSource> auditQueueList = videoSourceRepository.getByContentTraceIdIn(new ArrayList<>(auditQueueIds));
+            auditQueuePoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+        }
+        // 每日配置发送量不足添加
+        for (Map.Entry<String, Integer> entry : dailyAuditPoolCount.entrySet()) {
+            ContentPoolEnum poolEnum = ContentPoolEnum.from(entry.getKey());
+            String key = "video_audit_count_" + dateStr + "_" + poolEnum.getContentPool();
+            int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
+            if (totalCount < entry.getValue()) {
+                List<PublishSingleVideoSource> auditQueueList = auditQueuePoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                List<PublishSingleVideoSource> nextListPool = nextListPoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                int needCount = entry.getValue() - (totalCount + auditQueueList.size() + nextListPool.size());
+                if (needCount > 0) {
+                    List<PublishSingleVideoSource> addList = videoPoolAuditMapper.articleVideoAuditList(null,
+                            Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()), null, null,
+                            Arrays.asList(poolEnum.getValue()), null, 0, needCount, poolLevelDesc);
+                    if (CollectionUtils.isNotEmpty(addList)) {
+                        for (PublishSingleVideoSource item : addList) {
+                            redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentTraceId(), poolEnum.getWeight());
+                        }
+                    }
+                }
+            }
+        }
+        // 待发布内容不足添加
+        Long auditQueueSize = redisTemplate.opsForZSet().size(auditQueueRedisKey);
+        if (Objects.isNull(auditQueueSize) || auditQueueSize < 20) {
+            List<PublishSingleVideoSource> list = videoPoolAuditMapper.articleVideoAuditList(null,
+                    Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()), null, null, null
+                    , null, 0, 40, poolLevelDesc);
+            if (CollectionUtils.isNotEmpty(list)) {
+                for (PublishSingleVideoSource item : list) {
+                    ContentPoolEnum poolEnum = ContentPoolEnum.from(item.getFlowPoolLevel());
+                    redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentTraceId(), poolEnum.getWeight());
+                }
+            }
+        }
+        redisTemplate.expire(auditQueueRedisKey, 12, TimeUnit.HOURS);
+        return ReturnT.SUCCESS;
+    }
 }

+ 6 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/web/recommend/ArticleVideoAuditController.java

@@ -80,4 +80,10 @@ public class ArticleVideoAuditController {
         return CommonResponse.success();
     }
 
+    @GetMapping("/articlePoolAuditQueueJob")
+    public CommonResponse<Void> articlePoolAuditQueueJob() {
+        service.articlePoolAuditQueueJob(null);
+        return CommonResponse.success();
+    }
+
 }

+ 6 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/web/recommend/VideoPoolAuditController.java

@@ -58,4 +58,10 @@ public class VideoPoolAuditController {
         return CommonResponse.success();
     }
 
+    @GetMapping("/videoPoolAuditQueueJob")
+    public CommonResponse<Void> videoPoolAuditQueueJob() {
+        service.videoPoolAuditQueueJob(null);
+        return CommonResponse.success();
+    }
+
 }

+ 12 - 0
long-article-recommend-service/src/main/resources/mapper/longArticle/ArticleAuditMapper.xml

@@ -37,6 +37,12 @@
                     #{item}
                 </foreach>
             </if>
+            <if test="flowPoolLevel!= null and flowPoolLevel.size() > 0">
+                and lata.flow_pool_level in
+                <foreach collection="flowPoolLevel" item="item" separator="," open="(" close=")">
+                    #{item}
+                </foreach>
+            </if>
             <if test="auditTimestamp!= null">
                 <if test="auditTimestamp.conditionOperator != null and auditTimestamp.conditionOperator != ''">
                     <choose>
@@ -95,6 +101,12 @@
                     #{item}
                 </foreach>
             </if>
+            <if test="flowPoolLevel!= null and flowPoolLevel.size() > 0">
+                and lata.flow_pool_level in
+                <foreach collection="flowPoolLevel" item="item" separator="," open="(" close=")">
+                    #{item}
+                </foreach>
+            </if>
             <if test="auditTimestamp!= null">
                 <if test="auditTimestamp.conditionOperator != null and auditTimestamp.conditionOperator != ''">
                     <choose>