Переглянути джерело

Merge branch 'wyp/0226-videoAuditRedisExclude' of Server/long-article-recommend into master

wangyunpeng 3 місяців тому
батько
коміт
50ab0e5efd
11 змінених файлів з 465 додано та 121 видалено
  1. 7 5
      long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/common/enums/recommend/ContentPoolEnum.java
  2. 9 0
      long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/mapper/longArticle/ArticleAuditMapper.java
  3. 7 2
      long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/mapper/longArticle/VideoPoolAuditMapper.java
  4. 1 0
      long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/model/param/videoAudit/ArticleVideoAuditListParam.java
  5. 167 57
      long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/ArticleVideoAuditService.java
  6. 159 54
      long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/VideoPoolAuditService.java
  7. 31 0
      long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/util/RedisUtil.java
  8. 6 0
      long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/web/recommend/ArticleVideoAuditController.java
  9. 6 0
      long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/web/recommend/VideoPoolAuditController.java
  10. 42 0
      long-article-recommend-service/src/main/resources/mapper/longArticle/ArticleAuditMapper.xml
  11. 30 3
      long-article-recommend-service/src/main/resources/mapper/longArticle/VideoPoolAuditMapper.xml

+ 7 - 5
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/common/enums/recommend/ContentPoolEnum.java

@@ -8,21 +8,23 @@ import java.util.Objects;
 
 @Getter
 public enum ContentPoolEnum {
-    autoArticlePoolLevel1("autoArticlePoolLevel1", 1, "头条"),
-    autoArticlePoolLevel2("autoArticlePoolLevel2", 2, "头条"),
-    autoArticlePoolLevel3("autoArticlePoolLevel3", 3, "次条"),
-    autoArticlePoolLevel4("autoArticlePoolLevel4", 4, "冷启层"),
+    autoArticlePoolLevel1("autoArticlePoolLevel1", 1, "头条", 100),
+    autoArticlePoolLevel2("autoArticlePoolLevel2", 2, "头条", 50),
+    autoArticlePoolLevel3("autoArticlePoolLevel3", 3, "次条", 25),
+    autoArticlePoolLevel4("autoArticlePoolLevel4", 4, "冷启层", 1),
 
     ;
 
     private final String contentPool;
     private final Integer value;
     private final String description;
+    private final Integer weight;
 
-    ContentPoolEnum(String contentPool, Integer value, String description) {
+    ContentPoolEnum(String contentPool, Integer value, String description, Integer weight) {
         this.contentPool = contentPool;
         this.value = value;
         this.description = description;
+        this.weight = weight;
     }
 
     public static ContentPoolEnum from(String contentPool) {

+ 9 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/mapper/longArticle/ArticleAuditMapper.java

@@ -13,11 +13,13 @@ public interface ArticleAuditMapper {
 
     int articleVideoAuditListCount(List<String> contentId, List<Integer> status,
                                    List<String> title, List<String> auditAccount, List<String> producePlanIds,
+                                   List<String> flowPoolLevel,
                                    ListItemFilterOrderParam auditTimestamp);
 
     List<ArticleVideoAuditListVO> articleVideoAuditList(List<String> contentId, List<Integer> status,
                                                         List<String> title, List<String> auditAccount,
                                                         List<String> producePlanIds,
+                                                        List<String> flowPoolLevel,
                                                         ListItemFilterOrderParam auditTimestamp,
                                                         int offset, Integer pageSize, String poolLevelDesc);
 
@@ -26,6 +28,12 @@ public interface ArticleAuditMapper {
                                                   List<String> producePlanIds, String poolLevel,
                                                   List<String> excludeContentIds);
 
+    List<ArticleVideoAuditListVO> articleVideoWatingAuditList(List<Integer> status,
+                                                              List<String> flowPoolLevels,
+                                                              List<String> excludeContentIds,
+                                                              Integer size);
+
+
     void updateCrawlerVideoIsIllegal(List<Integer> illegalVideoIds, Integer isIllegal);
 
     List<String> searchFilterValueByItemName(String itemName, String searchKeyword);
@@ -35,4 +43,5 @@ public interface ArticleAuditMapper {
     void updateVideoTitle(VideoTitleUpdateParam param, Long updateTime);
 
     void updateTitleAuditFlowPoolLevel();
+
 }

+ 7 - 2
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/mapper/longArticle/VideoPoolAuditMapper.java

@@ -22,8 +22,13 @@ public interface VideoPoolAuditMapper {
                                                          int offset, Integer pageSize, String poolLevelDesc);
 
     PublishSingleVideoSource articleVideoAuditNext(List<String> contentId, List<Integer> status,
-                                                         List<String> title, List<String> auditAccount,
-                                                         Integer flowPoolLevel, List<String> excludeContentIds);
+                                                   List<String> title, List<String> auditAccount,
+                                                   Integer flowPoolLevel, List<String> excludeContentIds);
+
+    List<PublishSingleVideoSource> articleVideoWatingAuditList(List<Integer> status,
+                                                               List<Integer> flowPoolLevels,
+                                                               List<String> excludeContentIds,
+                                                               Integer size);
 
     List<String> searchFilterValueByItemName(String itemName, String searchKeyword);
 

+ 1 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/model/param/videoAudit/ArticleVideoAuditListParam.java

@@ -12,6 +12,7 @@ public class ArticleVideoAuditListParam {
     private List<Integer> status;
     private List<String> auditAccount;
     private List<String> sourceProducePlan;
+    private List<String> flowPoolLevel;
     private ListItemFilterOrderParam auditTimestamp;
 
     private Integer pageNum = 1;

+ 167 - 57
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/ArticleVideoAuditService.java

@@ -1,6 +1,7 @@
 package com.tzld.longarticle.recommend.server.service.recommend;
 
 import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.tzld.longarticle.recommend.server.common.enums.aigc.PublishContentStatusEnum;
 import com.tzld.longarticle.recommend.server.common.enums.longArticle.ArticleVideoAuditStatusEnum;
@@ -16,12 +17,12 @@ import com.tzld.longarticle.recommend.server.model.vo.ArticleVideoAuditListVO;
 import com.tzld.longarticle.recommend.server.repository.aigc.ProducePlanExeRecordRepository;
 import com.tzld.longarticle.recommend.server.repository.longArticle.*;
 import com.tzld.longarticle.recommend.server.util.DateUtils;
+import com.tzld.longarticle.recommend.server.util.RedisUtil;
 import com.tzld.longarticle.recommend.server.util.page.Page;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
@@ -56,6 +57,8 @@ public class ArticleVideoAuditService {
     private ProducePlanExeRecordRepository producePlanExeRecordRepository;
     @Autowired
     private ArticlePoolPromotionSourceRepository articlePoolPromotionSourceRepository;
+    @Autowired
+    private RedisUtil redisUtil;
 
     @Autowired
     private RedisTemplate<String, String> redisTemplate;
@@ -72,10 +75,10 @@ public class ArticleVideoAuditService {
     public Page<ArticleVideoAuditListVO> list(ArticleVideoAuditListParam param) {
         int offset = (param.getPageNum() - 1) * param.getPageSize();
         int count = articleAuditMapper.articleVideoAuditListCount(param.getContentId(), param.getStatus(),
-                param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan(), param.getAuditTimestamp());
+                param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan(), param.getFlowPoolLevel(), param.getAuditTimestamp());
         List<ArticleVideoAuditListVO> list = articleAuditMapper.articleVideoAuditList(param.getContentId(),
                 param.getStatus(), param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan()
-                , param.getAuditTimestamp(), offset, param.getPageSize(), poolLevelDesc);
+                , param.getFlowPoolLevel(), param.getAuditTimestamp(), offset, param.getPageSize(), poolLevelDesc);
         buildArticleVideoAuditListVO(list);
         Page<ArticleVideoAuditListVO> page = new Page<>(param.getPageNum(), param.getPageSize());
         page.setTotalSize(count);
@@ -119,74 +122,89 @@ public class ArticleVideoAuditService {
     }
 
     public Page<ArticleVideoAuditListVO> next(ArticleVideoAuditListParam param) {
-        if (Objects.nonNull(param.getSourceProducePlan())) {
-            param.setPageSize(1);
-            return list(param);
-        }
         Page<ArticleVideoAuditListVO> result = new Page<>();
-        Long now = System.currentTimeMillis();
-        String redisKey = "article-pool-audit-next-list";
-        Map<Object, Object> entries = redisTemplate.opsForHash().entries(redisKey);
-        List<String> excludeContentIds = new ArrayList<>();
-        entries.forEach((k, v) -> {
-            long timestamp = Long.parseLong((String) v);
-            if (now > timestamp) {
-                redisTemplate.opsForHash().delete(redisKey, k);
-            } else {
-                excludeContentIds.add((String) k);
+
+        String lockKey = "article-pool-audit-lock";
+        String requestId = UUID.randomUUID().toString();
+        int retryCount = 0;
+        boolean lockAcquired = false;
+
+        // 尝试获取分布式锁,最多重试10次,每次间隔200ms
+        while (retryCount < 10) {
+            lockAcquired = redisUtil.tryAcquireLock(lockKey, requestId);
+            if (lockAcquired) {
+                break;
             }
-        });
-        // 根据配置判断当日是否审核完成 并 选择内容池返回
+            retryCount++;
+            try {
+                Thread.sleep(200); // 等待200ms后重试
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return result; // 返回空结果
+            }
+        }
+
+        if (!lockAcquired) {
+            return result; // 返回空结果
+        }
+
+        long now = System.currentTimeMillis();
+        String id;
+        String inAuditListRedisKey = "article-pool-in-audit-list";
         ArticleVideoAuditListVO item = null;
-        List<String> excludePoolLevel = new ArrayList<>();
-        String poolLevel = getAuditPoolLevel(excludePoolLevel);
-        if (Objects.isNull(poolLevel)) {
-            item = articleAuditMapper.articleVideoAuditNext(param.getContentId(),
-                    param.getStatus(), param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan(),
-                    poolLevel, excludeContentIds);
-        } else {
-            do {
+        try {
+            String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
+            String auditQueueRedisKey = "article-pool-audit-queue-" + dateStr;
+            // 从待审核队列中获取数据,如未获取到则从数据库查询一条
+            Long size = redisTemplate.opsForZSet().size(auditQueueRedisKey);
+            if (Objects.isNull(size) || size == 0) {
+                List<String> entries = redisTemplate.opsForList().range(inAuditListRedisKey, 0, -1);
+                List<String> excludeContentIds = new ArrayList<>();
+                if (CollectionUtils.isNotEmpty(entries)) {
+                    excludeContentIds = entries.stream().map(o -> {
+                        JSONObject json = JSONObject.parseObject(o);
+                        return json.getString("id");
+                    }).collect(Collectors.toList());
+                }
                 item = articleAuditMapper.articleVideoAuditNext(param.getContentId(),
                         param.getStatus(), param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan(),
-                        poolLevel, excludeContentIds);
-                if (Objects.nonNull(item)) {
-                    break;
+                        null, excludeContentIds);
+            } else {
+                // 从待审核队列中获取数据, 如获取到的内容已审核,则重新获取
+                while (true) {
+                    Set<String> ids = redisTemplate.opsForZSet().reverseRangeByScore(auditQueueRedisKey, 0, 100, 0, 1);
+                    if (CollectionUtils.isNotEmpty(ids)) {
+                        id = ids.iterator().next();
+                        redisTemplate.opsForZSet().remove(auditQueueRedisKey, id);
+                        item = articleAuditMapper.articleVideoAuditNext(Arrays.asList(id), null, null,
+                                null, null, null, null);
+                        if (item.getStatus() == ArticleVideoAuditStatusEnum.WAITING.getCode()) {
+                            break;
+                        }
+                    } else {
+                        break;
+                    }
                 }
-                excludePoolLevel.add(poolLevel);
-                poolLevel = getAuditPoolLevel(excludePoolLevel);
-            } while (Objects.nonNull(poolLevel));
+            }
+        } finally {
+            // 释放锁
+            redisUtil.releaseLock(lockKey, requestId);
         }
         if (Objects.isNull(item)) {
             return result;
         }
-        redisTemplate.opsForHash().put(redisKey, item.getContentId(), String.valueOf(now + 600000));
+
+        // 添加到超时队列
+        JSONObject json = new JSONObject();
+        json.put("id", item.getContentId());
+        json.put("timestamp", now + (15 * 60 * 1000));
+        redisTemplate.opsForList().rightPush(inAuditListRedisKey, json.toJSONString());
         List<ArticleVideoAuditListVO> list = Collections.singletonList(item);
         buildArticleVideoAuditListVO(list);
         result.setObjs(list);
         return result;
     }
 
-    private String getAuditPoolLevel(List<String> excludePoolLevel) {
-        if (MapUtils.isEmpty(dailyAuditPoolCount)) {
-            return null;
-        }
-        String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
-        Set<String> keySet = dailyAuditPoolCount.keySet();
-        keySet = keySet.stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new));
-        for (String poolLevel : keySet) {
-            if (excludePoolLevel.contains(poolLevel)) {
-                continue;
-            }
-            int target = dailyAuditPoolCount.get(poolLevel);
-            String key = "article_audit_count_" + dateStr + "_" + poolLevel;
-            int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
-            if (target > totalCount) {
-                return poolLevel;
-            }
-        }
-        return null;
-    }
-
     public void auditArticle(ArticleAuditParam param) {
         LongArticleTitleAudit titleAudit = titleAuditRepository.getByContentId(param.getContentId());
         Long now = System.currentTimeMillis();
@@ -359,7 +377,7 @@ public class ArticleVideoAuditService {
 
     @XxlJob("shuffleAuditGroup")
     public ReturnT<String> shuffleAuditGroup(String param) {
-        List<String> auditUser = Arrays.asList("a","b","c","d","e","f","g","h","i","j");
+        List<String> auditUser = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
         List<LongArticleTitleAudit> contentIds = titleAuditRepository.getByStatus(ArticleVideoAuditStatusEnum.WAITING.getCode());
         for (int i = 0; i < contentIds.size(); i++) {
             int per = i % auditUser.size();
@@ -372,7 +390,7 @@ public class ArticleVideoAuditService {
 
     public void addAudit(ArticleVideoAuditAddAuditParam param) {
         ProducePlanExeRecord exeRecord = producePlanExeRecordRepository.getByPlanExeId(param.getContentId());
-        List<String> auditUser = Arrays.asList("a","b","c","d","e","f","g","h","i","j");
+        List<String> auditUser = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
         Random random = new Random();
         int index = random.nextInt(auditUser.size());
         String auditAccount = auditUser.get(index);
@@ -398,4 +416,96 @@ public class ArticleVideoAuditService {
         item.setAuditAccount(auditAccount);
         titleAuditRepository.save(item);
     }
+
+    @XxlJob("articlePoolAuditQueueJob")
+    public ReturnT<String> articlePoolAuditQueueJob(String param) {
+        String lockKey = "article-pool-audit-queue-job-lock";
+        String requestId = UUID.randomUUID().toString();
+        Boolean lockAcquired = redisUtil.tryAcquireLock(lockKey, requestId);
+        if (!lockAcquired) {
+            return ReturnT.SUCCESS;
+        }
+        try {
+            Long now = System.currentTimeMillis();
+            String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
+            String inAuditListRedisKey = "article-pool-in-audit-list";
+            String auditQueueRedisKey = "article-pool-audit-queue-" + dateStr;
+            // 判断是否审核超时,重新加入待审核队列
+            while (true) {
+                List<String> inAuditList = redisTemplate.opsForList().range(inAuditListRedisKey, 0, 0);
+                if (CollectionUtils.isEmpty(inAuditList)) {
+                    break;
+                }
+                JSONObject firstObj = JSONObject.parseObject(inAuditList.get(0));
+                if (now > firstObj.getLong("timestamp")) {
+                    redisTemplate.opsForList().leftPop(inAuditListRedisKey);
+                    String id = firstObj.getString("id");
+                    LongArticleTitleAudit videoAudit = titleAuditRepository.getByContentId(id);
+                    if (videoAudit.getStatus() == ArticleVideoAuditStatusEnum.WAITING.getCode()) {
+                        ContentPoolEnum poolEnum = ContentPoolEnum.from(videoAudit.getFlowPoolLevel());
+                        redisTemplate.opsForZSet().add(auditQueueRedisKey, id, poolEnum.getWeight());
+                    }
+                } else {
+                    break;
+                }
+            }
+            List<String> excludeContentIds = new ArrayList<>();
+            // 审核中列表获取,按内容池分组
+            List<String> inAuditList = redisTemplate.opsForList().range(inAuditListRedisKey, 0, -1);
+            Map<String, List<LongArticleTitleAudit>> inAuditListPoolCountMap = new HashMap<>();
+            if (CollectionUtils.isNotEmpty(inAuditList)) {
+                List<String> inAuditListIds = inAuditList.stream().map(item -> JSONObject.parseObject(item).getString("id")).collect(Collectors.toList());
+                List<LongArticleTitleAudit> auditQueueList = titleAuditRepository.getByContentIdIn(new ArrayList<>(inAuditListIds));
+                inAuditListPoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+                excludeContentIds.addAll(inAuditListIds);
+            }
+            // 待审核列表获取,按内容池分组
+            Set<String> auditQueueIds = redisTemplate.opsForZSet().rangeByScore(auditQueueRedisKey, 0, 100);
+            Map<String, List<LongArticleTitleAudit>> auditQueuePoolCountMap = new HashMap<>();
+            if (CollectionUtils.isNotEmpty(auditQueueIds)) {
+                List<LongArticleTitleAudit> auditQueueList = titleAuditRepository.getByContentIdIn(new ArrayList<>(auditQueueIds));
+                auditQueuePoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+                excludeContentIds.addAll(auditQueueIds);
+            }
+            // 每日配置发送量不足添加
+            for (Map.Entry<String, Integer> entry : dailyAuditPoolCount.entrySet()) {
+                ContentPoolEnum poolEnum = ContentPoolEnum.from(entry.getKey());
+                String key = "article_audit_count_" + dateStr + "_" + poolEnum.getContentPool();
+                int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
+                if (totalCount < entry.getValue()) {
+                    List<LongArticleTitleAudit> auditQueueList = auditQueuePoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                    List<LongArticleTitleAudit> inAuditListPool = inAuditListPoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                    int needCount = entry.getValue() - (totalCount + auditQueueList.size() + inAuditListPool.size());
+                    if (needCount > 0) {
+                        List<ArticleVideoAuditListVO> addList = articleAuditMapper.articleVideoWatingAuditList(
+                                Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()),
+                                Arrays.asList(poolEnum.getContentPool()), excludeContentIds, needCount);
+                        if (CollectionUtils.isNotEmpty(addList)) {
+                            for (ArticleVideoAuditListVO item : addList) {
+                                redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentId(), poolEnum.getWeight());
+                                excludeContentIds.add(item.getContentId());
+                            }
+                        }
+                    }
+                }
+            }
+            // 待发布内容不足添加
+            Long auditQueueSize = redisTemplate.opsForZSet().size(auditQueueRedisKey);
+            if (Objects.isNull(auditQueueSize) || auditQueueSize < 20) {
+                List<ArticleVideoAuditListVO> list = articleAuditMapper.articleVideoWatingAuditList(
+                        Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()), null, excludeContentIds, 40);
+                if (CollectionUtils.isNotEmpty(list)) {
+                    for (ArticleVideoAuditListVO item : list) {
+                        ContentPoolEnum poolEnum = ContentPoolEnum.from(item.getFlowPoolLevel());
+                        redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentId(), poolEnum.getWeight());
+                    }
+                }
+            }
+            redisTemplate.expire(auditQueueRedisKey, 12, TimeUnit.HOURS);
+
+        } finally {
+            redisUtil.releaseLock(lockKey, requestId);
+        }
+        return ReturnT.SUCCESS;
+    }
 }

+ 159 - 54
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/VideoPoolAuditService.java

@@ -1,5 +1,6 @@
 package com.tzld.longarticle.recommend.server.service.recommend;
 
+import com.alibaba.fastjson.JSONObject;
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.tzld.longarticle.recommend.server.common.enums.aigc.PublishContentStatusEnum;
 import com.tzld.longarticle.recommend.server.common.enums.longArticle.ArticleVideoAuditStatusEnum;
@@ -12,12 +13,12 @@ import com.tzld.longarticle.recommend.server.model.param.videoAudit.*;
 import com.tzld.longarticle.recommend.server.model.vo.VideoPoolAuditListVO;
 import com.tzld.longarticle.recommend.server.repository.longArticle.PublishSingleVideoSourceRepository;
 import com.tzld.longarticle.recommend.server.util.DateUtils;
+import com.tzld.longarticle.recommend.server.util.RedisUtil;
 import com.tzld.longarticle.recommend.server.util.page.Page;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.collections4.MapUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
@@ -37,6 +38,8 @@ public class VideoPoolAuditService {
     private PublishContentMapper publishContentMapper;
     @Autowired
     private PublishSingleVideoSourceRepository videoSourceRepository;
+    @Autowired
+    private RedisUtil redisUtil;
 
     @Autowired
     private RedisTemplate<String, String> redisTemplate;
@@ -93,73 +96,84 @@ public class VideoPoolAuditService {
     }
 
     public Page<VideoPoolAuditListVO> next(VideoPoolAuditListParam param) {
-        if (Objects.nonNull(param.getFlowPoolLevel())) {
-            param.setPageSize(1);
-            return list(param);
-        }
         Page<VideoPoolAuditListVO> result = new Page<>();
-        // 排除已分配内容
-        Long now = System.currentTimeMillis();
-        String redisKey = "video-pool-audit-next-list";
-        Map<Object, Object> entries = redisTemplate.opsForHash().entries(redisKey);
-        List<String> excludeContentIds = new ArrayList<>();
-        entries.forEach((k, v) -> {
-            long timestamp = Long.parseLong((String) v);
-            if (now > timestamp) {
-                redisTemplate.opsForHash().delete(redisKey, k);
-            } else {
-                excludeContentIds.add((String) k);
+        String lockKey = "video-pool-audit-lock";
+        String requestId = UUID.randomUUID().toString();
+        int retryCount = 0;
+        boolean lockAcquired = false;
+
+        // 尝试获取分布式锁,最多重试10次,每次间隔200ms
+        while (retryCount < 10) {
+            lockAcquired = redisUtil.tryAcquireLock(lockKey, requestId);
+            if (lockAcquired) {
+                break;
             }
-        });
-        // 根据配置判断当日是否审核完成 并 选择内容池返回
+            retryCount++;
+            try {
+                Thread.sleep(200); // 等待200ms后重试
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return result; // 返回空结果
+            }
+        }
+
+        if (!lockAcquired) {
+            return result; // 返回空结果
+        }
+        long now = System.currentTimeMillis();
+        String id;
         PublishSingleVideoSource obj = null;
-        List<String> excludePoolLevel = new ArrayList<>();
-        Integer poolLevel = getAuditPoolLevel(excludePoolLevel);
-        if (Objects.isNull(poolLevel)) {
-            obj = videoPoolAuditMapper.articleVideoAuditNext(param.getContentId(),
-                    param.getStatus(), param.getTitle(), param.getAuditAccount(), poolLevel, excludeContentIds);
-        } else {
-            do {
+        String inAuditListRedisKey = "video-pool-in-audit-list";
+        try {
+            String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
+            String auditQueueRedisKey = "video-pool-audit-queue-" + dateStr;
+            // 从待审核队列中获取数据,如未获取到则从数据库查询一条
+            Long size = redisTemplate.opsForZSet().size(auditQueueRedisKey);
+            if (Objects.isNull(size) || size == 0) {
+                List<String> entries = redisTemplate.opsForList().range(inAuditListRedisKey, 0, -1);
+                List<String> excludeContentIds = new ArrayList<>();
+                if (CollectionUtils.isNotEmpty(entries)) {
+                    excludeContentIds = entries.stream().map(o -> {
+                        JSONObject json = JSONObject.parseObject(o);
+                        return json.getString("id");
+                    }).collect(Collectors.toList());
+                }
                 obj = videoPoolAuditMapper.articleVideoAuditNext(param.getContentId(),
-                        param.getStatus(), param.getTitle(), param.getAuditAccount(), poolLevel, excludeContentIds);
-                if (Objects.nonNull(obj)) {
-                    break;
+                        param.getStatus(), param.getTitle(), param.getAuditAccount(), null, excludeContentIds);
+            } else {
+                // 从待审核队列中获取数据, 如获取到的内容已审核,则重新获取
+                while (true) {
+                    Set<String> ids = redisTemplate.opsForZSet().reverseRangeByScore(auditQueueRedisKey, 0, 100, 0, 1);
+                    if (CollectionUtils.isNotEmpty(ids)) {
+                        id = ids.iterator().next();
+                        redisTemplate.opsForZSet().remove(auditQueueRedisKey, id);
+                        obj = videoSourceRepository.getByContentTraceId(id);
+                        if (obj.getVideoPoolAuditStatus() == ArticleVideoAuditStatusEnum.WAITING.getCode()) {
+                            break;
+                        }
+                    } else {
+                        break;
+                    }
                 }
-                excludePoolLevel.add(ContentPoolEnum.from(poolLevel).getContentPool());
-                poolLevel = getAuditPoolLevel(excludePoolLevel);
-            } while (Objects.nonNull(poolLevel));
+            }
+        } finally {
+            // 释放锁
+            redisUtil.releaseLock(lockKey, requestId);
         }
         if (Objects.isNull(obj)) {
             return result;
         }
-        redisTemplate.opsForHash().put(redisKey, obj.getContentTraceId(), String.valueOf(now + 600000));
+        // 添加到超时队列
+        JSONObject json = new JSONObject();
+        json.put("id", obj.getContentTraceId());
+        json.put("timestamp", now + (10 * 60 * 1000));
+        redisTemplate.opsForList().rightPush(inAuditListRedisKey, json.toJSONString());
+        // 填充数据
         List<VideoPoolAuditListVO> list = buildVideoPoolAuditListVO(Collections.singletonList(obj));
         result.setObjs(list);
         return result;
     }
 
-    private Integer getAuditPoolLevel(List<String> excludePoolLevel) {
-        if (MapUtils.isEmpty(dailyAuditPoolCount)) {
-            return null;
-        }
-        String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
-        Set<String> keySet = dailyAuditPoolCount.keySet();
-        keySet = keySet.stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new));
-        for (String poolLevel : keySet) {
-            if (excludePoolLevel.contains(poolLevel)) {
-                continue;
-            }
-            int target = dailyAuditPoolCount.get(poolLevel);
-            String key = "video_audit_count_" + dateStr + "_" + poolLevel;
-            int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
-            if (target > totalCount) {
-                ContentPoolEnum poolEnum = ContentPoolEnum.from(poolLevel);
-                return poolEnum.getValue();
-            }
-        }
-        return null;
-    }
-
     public void auditArticle(ArticleAuditParam param) {
         PublishSingleVideoSource videoAudit = videoSourceRepository.getByContentTraceId(param.getContentId());
         Long now = System.currentTimeMillis();
@@ -236,4 +250,95 @@ public class VideoPoolAuditService {
         }
         return ReturnT.SUCCESS;
     }
+
+    @XxlJob("videoPoolAuditQueueJob")
+    public ReturnT<String> videoPoolAuditQueueJob(String param) {
+        String lockKey = "video-pool-audit-queue-job-lock";
+        String requestId = UUID.randomUUID().toString();
+        Boolean lockAcquired = redisUtil.tryAcquireLock(lockKey, requestId);
+        if (!lockAcquired) {
+            return ReturnT.SUCCESS;
+        }
+        try {
+            Long now = System.currentTimeMillis();
+            String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
+            String inAuditListRedisKey = "video-pool-in-audit-list";
+            String auditQueueRedisKey = "video-pool-audit-queue-" + dateStr;
+            // 判断是否审核超时,重新加入待审核队列
+            while (true) {
+                List<String> inAuditList = redisTemplate.opsForList().range(inAuditListRedisKey, 0, 0);
+                if (CollectionUtils.isEmpty(inAuditList)) {
+                    break;
+                }
+                JSONObject firstObj = JSONObject.parseObject(inAuditList.get(0));
+                if (now > firstObj.getLong("timestamp")) {
+                    redisTemplate.opsForList().leftPop(inAuditListRedisKey);
+                    String id = firstObj.getString("id");
+                    PublishSingleVideoSource videoAudit = videoSourceRepository.getByContentTraceId(id);
+                    if (videoAudit.getVideoPoolAuditStatus() == ArticleVideoAuditStatusEnum.WAITING.getCode()) {
+                        ContentPoolEnum poolEnum = ContentPoolEnum.from(videoAudit.getFlowPoolLevel());
+                        redisTemplate.opsForZSet().add(auditQueueRedisKey, id, poolEnum.getWeight());
+                    }
+                } else {
+                    break;
+                }
+            }
+            List<String> excludeContentIds = new ArrayList<>();
+            // 审核中列表获取,按内容池分组
+            List<String> inAuditList = redisTemplate.opsForList().range(inAuditListRedisKey, 0, -1);
+            Map<String, List<PublishSingleVideoSource>> inAuditListPoolCountMap = new HashMap<>();
+            if (CollectionUtils.isNotEmpty(inAuditList)) {
+                List<String> inAuditListIds = inAuditList.stream().map(item -> JSONObject.parseObject(item).getString("id")).collect(Collectors.toList());
+                List<PublishSingleVideoSource> auditQueueList = videoSourceRepository.getByContentTraceIdIn(new ArrayList<>(inAuditListIds));
+                inAuditListPoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+                excludeContentIds.addAll(inAuditListIds);
+            }
+            // 待审核列表获取,按内容池分组
+            Set<String> auditQueueIds = redisTemplate.opsForZSet().rangeByScore(auditQueueRedisKey, 0, 100);
+            Map<String, List<PublishSingleVideoSource>> auditQueuePoolCountMap = new HashMap<>();
+            if (CollectionUtils.isNotEmpty(auditQueueIds)) {
+                List<PublishSingleVideoSource> auditQueueList = videoSourceRepository.getByContentTraceIdIn(new ArrayList<>(auditQueueIds));
+                auditQueuePoolCountMap = auditQueueList.stream().collect(Collectors.groupingBy(item -> ContentPoolEnum.from(item.getFlowPoolLevel()).getContentPool()));
+                excludeContentIds.addAll(auditQueueIds);
+            }
+            // 每日配置发送量不足添加
+            for (Map.Entry<String, Integer> entry : dailyAuditPoolCount.entrySet()) {
+                ContentPoolEnum poolEnum = ContentPoolEnum.from(entry.getKey());
+                String key = "video_audit_count_" + dateStr + "_" + poolEnum.getContentPool();
+                int totalCount = Integer.parseInt(Optional.ofNullable(redisTemplate.opsForValue().get(key)).orElse("0"));
+                if (totalCount < entry.getValue()) {
+                    List<PublishSingleVideoSource> auditQueueList = auditQueuePoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                    List<PublishSingleVideoSource> inAuditListPool = inAuditListPoolCountMap.getOrDefault(poolEnum.getContentPool(), new ArrayList<>());
+                    int needCount = entry.getValue() - (totalCount + auditQueueList.size() + inAuditListPool.size());
+                    if (needCount > 0) {
+                        List<PublishSingleVideoSource> addList = videoPoolAuditMapper.articleVideoWatingAuditList(
+                                Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()),
+                                Arrays.asList(poolEnum.getValue()), excludeContentIds, needCount);
+                        if (CollectionUtils.isNotEmpty(addList)) {
+                            for (PublishSingleVideoSource item : addList) {
+                                redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentTraceId(), poolEnum.getWeight());
+                                excludeContentIds.add(item.getContentTraceId());
+                            }
+                        }
+                    }
+                }
+            }
+            // 待发布内容不足添加
+            Long auditQueueSize = redisTemplate.opsForZSet().size(auditQueueRedisKey);
+            if (Objects.isNull(auditQueueSize) || auditQueueSize < 20) {
+                List<PublishSingleVideoSource> list = videoPoolAuditMapper.articleVideoWatingAuditList(
+                        Arrays.asList(ArticleVideoAuditStatusEnum.WAITING.getCode()), null, excludeContentIds, 40);
+                if (CollectionUtils.isNotEmpty(list)) {
+                    for (PublishSingleVideoSource item : list) {
+                        ContentPoolEnum poolEnum = ContentPoolEnum.from(item.getFlowPoolLevel());
+                        redisTemplate.opsForZSet().add(auditQueueRedisKey, item.getContentTraceId(), poolEnum.getWeight());
+                    }
+                }
+            }
+            redisTemplate.expire(auditQueueRedisKey, 12, TimeUnit.HOURS);
+        } finally {
+            redisUtil.releaseLock(lockKey, requestId);
+        }
+        return ReturnT.SUCCESS;
+    }
 }

+ 31 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/util/RedisUtil.java

@@ -0,0 +1,31 @@
+package com.tzld.longarticle.recommend.server.util;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.ReturnType;
+import org.springframework.data.redis.core.RedisCallback;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class RedisUtil {
+
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
+    public boolean tryAcquireLock(String lockKey, String requestId) {
+        // 尝试获取锁
+        Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, 10, TimeUnit.SECONDS);
+        return Boolean.TRUE.equals(lockAcquired);
+    }
+
+    public void releaseLock(String lockKey, String requestId) {
+        // 使用 Lua 脚本确保释放锁的原子性
+        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
+        redisTemplate.execute((RedisCallback<Object>) connection -> {
+            connection.eval(script.getBytes(), ReturnType.INTEGER, 1, lockKey.getBytes(), requestId.getBytes());
+            return null;
+        });
+    }
+}

+ 6 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/web/recommend/ArticleVideoAuditController.java

@@ -80,4 +80,10 @@ public class ArticleVideoAuditController {
         return CommonResponse.success();
     }
 
+    @GetMapping("/articlePoolAuditQueueJob")
+    public CommonResponse<Void> articlePoolAuditQueueJob() {
+        service.articlePoolAuditQueueJob(null);
+        return CommonResponse.success();
+    }
+
 }

+ 6 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/web/recommend/VideoPoolAuditController.java

@@ -58,4 +58,10 @@ public class VideoPoolAuditController {
         return CommonResponse.success();
     }
 
+    @GetMapping("/videoPoolAuditQueueJob")
+    public CommonResponse<Void> videoPoolAuditQueueJob() {
+        service.videoPoolAuditQueueJob(null);
+        return CommonResponse.success();
+    }
+
 }

+ 42 - 0
long-article-recommend-service/src/main/resources/mapper/longArticle/ArticleAuditMapper.xml

@@ -37,6 +37,12 @@
                     #{item}
                 </foreach>
             </if>
+            <if test="flowPoolLevel!= null and flowPoolLevel.size() > 0">
+                and lata.flow_pool_level in
+                <foreach collection="flowPoolLevel" item="item" separator="," open="(" close=")">
+                    #{item}
+                </foreach>
+            </if>
             <if test="auditTimestamp!= null">
                 <if test="auditTimestamp.conditionOperator != null and auditTimestamp.conditionOperator != ''">
                     <choose>
@@ -95,6 +101,12 @@
                     #{item}
                 </foreach>
             </if>
+            <if test="flowPoolLevel!= null and flowPoolLevel.size() > 0">
+                and lata.flow_pool_level in
+                <foreach collection="flowPoolLevel" item="item" separator="," open="(" close=")">
+                    #{item}
+                </foreach>
+            </if>
             <if test="auditTimestamp!= null">
                 <if test="auditTimestamp.conditionOperator != null and auditTimestamp.conditionOperator != ''">
                     <choose>
@@ -231,6 +243,36 @@
         limit 1
     </select>
 
+    <select id="articleVideoWatingAuditList"
+            resultType="com.tzld.longarticle.recommend.server.model.vo.ArticleVideoAuditListVO">
+        select lata.content_id, lata.status, lat.article_title as title, lat.kimi_title,
+        lata.audit_account, lata.audit_timestamp, lata.flow_pool_level
+        from long_articles_title_audit lata
+        left join long_articles_text lat on lata.content_id = lat.content_id
+        <where>
+            <if test="status!= null and status.size() > 0">
+                and lata.status in
+                <foreach collection="status" item="item" separator="," open="(" close=")">
+                    #{item}
+                </foreach>
+            </if>
+            <if test="flowPoolLevels!= null and flowPoolLevels.size() > 0">
+                and lata.flow_pool_level in
+                <foreach collection="flowPoolLevels" item="item" separator="," open="(" close=")">
+                    #{item}
+                </foreach>
+            </if>
+            <if test="excludeContentIds!= null and excludeContentIds.size() > 0">
+                and lata.content_id not in
+                <foreach collection="excludeContentIds" item="item" separator="," open="(" close=")">
+                    #{item}
+                </foreach>
+            </if>
+        </where>
+        order by lata.content_id desc
+        limit #{size}
+    </select>
+
     <update id="updateArticleTitle">
         update long_articles_text
         set old_article_title = article_title,

+ 30 - 3
long-article-recommend-service/src/main/resources/mapper/longArticle/VideoPoolAuditMapper.xml

@@ -111,13 +111,13 @@
         </if>
         <choose>
             <when test="poolLevelDesc != null and poolLevelDesc != ''">
-                order by flow_pool_level ${poolLevelDesc}, content_trace_id desc
+                order by flow_pool_level ${poolLevelDesc}, crawler_timestamp desc
             </when>
             <when test="auditTimestamp != null and auditTimestamp.orderType != null and auditTimestamp.orderType != ''">
                 order by video_pool_audit_timestamp ${auditTimestamp.orderType}
             </when>
             <otherwise>
-                order by content_trace_id desc
+                order by crawler_timestamp desc
             </otherwise>
         </choose>
         limit #{offset}, #{pageSize}
@@ -161,7 +161,7 @@
         <if test="flowPoolLevel != null">
             and flow_pool_level = #{flowPoolLevel}
         </if>
-        order by content_trace_id desc
+        order by crawler_timestamp desc
         limit 1
     </select>
 
@@ -203,6 +203,33 @@
         </choose>
     </select>
 
+    <select id="articleVideoWatingAuditList"
+            resultType="com.tzld.longarticle.recommend.server.model.entity.longArticle.PublishSingleVideoSource">
+        select *
+        from publish_single_video_source
+        where bad_status in (0, 5) and audit_status = 1
+        <if test="status!= null and status.size() > 0">
+            and `video_pool_audit_status` in
+            <foreach collection="status" item="item" separator="," open="(" close=")">
+                #{item}
+            </foreach>
+        </if>
+        <if test="flowPoolLevels!= null and flowPoolLevels.size() > 0">
+            and flow_pool_level in
+            <foreach collection="flowPoolLevels" item="item" separator="," open="(" close=")">
+                #{item}
+            </foreach>
+        </if>
+        <if test="excludeContentIds!= null and excludeContentIds.size() > 0">
+            and content_trace_id not in
+            <foreach collection="excludeContentIds" item="item" separator="," open="(" close=")">
+                #{item}
+            </foreach>
+        </if>
+        order by crawler_timestamp desc
+        limit #{size}
+    </select>
+
     <update id="updateArticleTitle">
         update publish_single_video_source
         set old_article_title = article_title,