Quellcode durchsuchen

视频审核分发加锁

wangyunpeng vor 4 Monaten
Ursprung
Commit
603446ee7e

+ 65 - 32
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/ArticleVideoAuditService.java

@@ -16,6 +16,7 @@ import com.tzld.longarticle.recommend.server.model.vo.ArticleVideoAuditListVO;
 import com.tzld.longarticle.recommend.server.repository.aigc.ProducePlanExeRecordRepository;
 import com.tzld.longarticle.recommend.server.repository.longArticle.*;
 import com.tzld.longarticle.recommend.server.util.DateUtils;
+import com.tzld.longarticle.recommend.server.util.RedisUtil;
 import com.tzld.longarticle.recommend.server.util.page.Page;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
@@ -56,6 +57,8 @@ public class ArticleVideoAuditService {
     private ProducePlanExeRecordRepository producePlanExeRecordRepository;
     @Autowired
     private ArticlePoolPromotionSourceRepository articlePoolPromotionSourceRepository;
+    @Autowired
+    private RedisUtil redisUtil;
 
     @Autowired
     private RedisTemplate<String, String> redisTemplate;
@@ -124,42 +127,72 @@ public class ArticleVideoAuditService {
             return list(param);
         }
         Page<ArticleVideoAuditListVO> result = new Page<>();
-        Long now = System.currentTimeMillis();
-        String redisKey = "article-pool-audit-next-list";
-        Map<Object, Object> entries = redisTemplate.opsForHash().entries(redisKey);
-        List<String> excludeContentIds = new ArrayList<>();
-        entries.forEach((k, v) -> {
-            long timestamp = Long.parseLong((String) v);
-            if (now > timestamp) {
-                redisTemplate.opsForHash().delete(redisKey, k);
-            } else {
-                excludeContentIds.add((String) k);
+
+        String lockKey = "article-pool-audit-lock";
+        String requestId = UUID.randomUUID().toString();
+        int retryCount = 0;
+        boolean lockAcquired = false;
+
+        // 尝试获取分布式锁,最多重试10秒,每次间隔1秒
+        while (retryCount < 10) {
+            lockAcquired = redisUtil.tryAcquireLock(lockKey, requestId);
+            if (lockAcquired) {
+                break;
             }
-        });
-        // 根据配置判断当日是否审核完成 并 选择内容池返回
+            retryCount++;
+            try {
+                Thread.sleep(1000); // 等待1秒后重试
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return result; // 返回空结果
+            }
+        }
+
+        if (!lockAcquired) {
+            return result; // 返回空结果
+        }
+
         ArticleVideoAuditListVO item = null;
-        List<String> excludePoolLevel = new ArrayList<>();
-        String poolLevel = getAuditPoolLevel(excludePoolLevel);
-        if (Objects.isNull(poolLevel)) {
-            item = articleAuditMapper.articleVideoAuditNext(param.getContentId(),
-                    param.getStatus(), param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan(),
-                    poolLevel, excludeContentIds);
-        } else {
-            do {
+        try {
+            Long now = System.currentTimeMillis();
+            String redisKey = "article-pool-audit-next-list";
+            Map<Object, Object> entries = redisTemplate.opsForHash().entries(redisKey);
+            List<String> excludeContentIds = new ArrayList<>();
+            entries.forEach((k, v) -> {
+                long timestamp = Long.parseLong((String) v);
+                if (now > timestamp) {
+                    redisTemplate.opsForHash().delete(redisKey, k);
+                } else {
+                    excludeContentIds.add((String) k);
+                }
+            });
+            // 根据配置判断当日是否审核完成 并 选择内容池返回
+            List<String> excludePoolLevel = new ArrayList<>();
+            String poolLevel = getAuditPoolLevel(excludePoolLevel);
+            if (Objects.isNull(poolLevel)) {
                 item = articleAuditMapper.articleVideoAuditNext(param.getContentId(),
                         param.getStatus(), param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan(),
                         poolLevel, excludeContentIds);
-                if (Objects.nonNull(item)) {
-                    break;
-                }
-                excludePoolLevel.add(poolLevel);
-                poolLevel = getAuditPoolLevel(excludePoolLevel);
-            } while (Objects.nonNull(poolLevel));
-        }
-        if (Objects.isNull(item)) {
-            return result;
+            } else {
+                do {
+                    item = articleAuditMapper.articleVideoAuditNext(param.getContentId(),
+                            param.getStatus(), param.getTitle(), param.getAuditAccount(), param.getSourceProducePlan(),
+                            poolLevel, excludeContentIds);
+                    if (Objects.nonNull(item)) {
+                        break;
+                    }
+                    excludePoolLevel.add(poolLevel);
+                    poolLevel = getAuditPoolLevel(excludePoolLevel);
+                } while (Objects.nonNull(poolLevel));
+            }
+            if (Objects.isNull(item)) {
+                return result;
+            }
+            redisTemplate.opsForHash().put(redisKey, item.getContentId(), String.valueOf(now + 600000));
+        } finally {
+            // 释放锁
+            redisUtil.releaseLock(lockKey, requestId);
         }
-        redisTemplate.opsForHash().put(redisKey, item.getContentId(), String.valueOf(now + 600000));
         List<ArticleVideoAuditListVO> list = Collections.singletonList(item);
         buildArticleVideoAuditListVO(list);
         result.setObjs(list);
@@ -359,7 +392,7 @@ public class ArticleVideoAuditService {
 
     @XxlJob("shuffleAuditGroup")
     public ReturnT<String> shuffleAuditGroup(String param) {
-        List<String> auditUser = Arrays.asList("a","b","c","d","e","f","g","h","i","j");
+        List<String> auditUser = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
         List<LongArticleTitleAudit> contentIds = titleAuditRepository.getByStatus(ArticleVideoAuditStatusEnum.WAITING.getCode());
         for (int i = 0; i < contentIds.size(); i++) {
             int per = i % auditUser.size();
@@ -372,7 +405,7 @@ public class ArticleVideoAuditService {
 
     public void addAudit(ArticleVideoAuditAddAuditParam param) {
         ProducePlanExeRecord exeRecord = producePlanExeRecordRepository.getByPlanExeId(param.getContentId());
-        List<String> auditUser = Arrays.asList("a","b","c","d","e","f","g","h","i","j");
+        List<String> auditUser = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
         Random random = new Random();
         int index = random.nextInt(auditUser.size());
         String auditAccount = auditUser.get(index);

+ 61 - 29
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/recommend/VideoPoolAuditService.java

@@ -12,6 +12,7 @@ import com.tzld.longarticle.recommend.server.model.param.videoAudit.*;
 import com.tzld.longarticle.recommend.server.model.vo.VideoPoolAuditListVO;
 import com.tzld.longarticle.recommend.server.repository.longArticle.PublishSingleVideoSourceRepository;
 import com.tzld.longarticle.recommend.server.util.DateUtils;
+import com.tzld.longarticle.recommend.server.util.RedisUtil;
 import com.tzld.longarticle.recommend.server.util.page.Page;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
@@ -37,6 +38,8 @@ public class VideoPoolAuditService {
     private PublishContentMapper publishContentMapper;
     @Autowired
     private PublishSingleVideoSourceRepository videoSourceRepository;
+    @Autowired
+    private RedisUtil redisUtil;
 
     @Autowired
     private RedisTemplate<String, String> redisTemplate;
@@ -99,40 +102,69 @@ public class VideoPoolAuditService {
         }
         Page<VideoPoolAuditListVO> result = new Page<>();
         // 排除已分配内容
-        Long now = System.currentTimeMillis();
-        String redisKey = "video-pool-audit-next-list";
-        Map<Object, Object> entries = redisTemplate.opsForHash().entries(redisKey);
-        List<String> excludeContentIds = new ArrayList<>();
-        entries.forEach((k, v) -> {
-            long timestamp = Long.parseLong((String) v);
-            if (now > timestamp) {
-                redisTemplate.opsForHash().delete(redisKey, k);
-            } else {
-                excludeContentIds.add((String) k);
+        String lockKey = "video-pool-audit-lock";
+        String requestId = UUID.randomUUID().toString();
+        int retryCount = 0;
+        boolean lockAcquired = false;
+
+        // 尝试获取分布式锁,最多重试10秒,每次间隔1秒
+        while (retryCount < 10) {
+            lockAcquired = redisUtil.tryAcquireLock(lockKey, requestId);
+            if (lockAcquired) {
+                break;
+            }
+            retryCount++;
+            try {
+                Thread.sleep(1000); // 等待1秒后重试
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return result; // 返回空结果
             }
-        });
-        // 根据配置判断当日是否审核完成 并 选择内容池返回
+        }
+
+        if (!lockAcquired) {
+            return result; // 返回空结果
+        }
+
         PublishSingleVideoSource obj = null;
-        List<String> excludePoolLevel = new ArrayList<>();
-        Integer poolLevel = getAuditPoolLevel(excludePoolLevel);
-        if (Objects.isNull(poolLevel)) {
-            obj = videoPoolAuditMapper.articleVideoAuditNext(param.getContentId(),
-                    param.getStatus(), param.getTitle(), param.getAuditAccount(), poolLevel, excludeContentIds);
-        } else {
-            do {
+        try {
+            Long now = System.currentTimeMillis();
+            String redisKey = "video-pool-audit-next-list";
+            Map<Object, Object> entries = redisTemplate.opsForHash().entries(redisKey);
+            List<String> excludeContentIds = new ArrayList<>();
+            entries.forEach((k, v) -> {
+                long timestamp = Long.parseLong((String) v);
+                if (now > timestamp) {
+                    redisTemplate.opsForHash().delete(redisKey, k);
+                } else {
+                    excludeContentIds.add((String) k);
+                }
+            });
+            // 根据配置判断当日是否审核完成 并 选择内容池返回
+            List<String> excludePoolLevel = new ArrayList<>();
+            Integer poolLevel = getAuditPoolLevel(excludePoolLevel);
+            if (Objects.isNull(poolLevel)) {
                 obj = videoPoolAuditMapper.articleVideoAuditNext(param.getContentId(),
                         param.getStatus(), param.getTitle(), param.getAuditAccount(), poolLevel, excludeContentIds);
-                if (Objects.nonNull(obj)) {
-                    break;
-                }
-                excludePoolLevel.add(ContentPoolEnum.from(poolLevel).getContentPool());
-                poolLevel = getAuditPoolLevel(excludePoolLevel);
-            } while (Objects.nonNull(poolLevel));
-        }
-        if (Objects.isNull(obj)) {
-            return result;
+            } else {
+                do {
+                    obj = videoPoolAuditMapper.articleVideoAuditNext(param.getContentId(),
+                            param.getStatus(), param.getTitle(), param.getAuditAccount(), poolLevel, excludeContentIds);
+                    if (Objects.nonNull(obj)) {
+                        break;
+                    }
+                    excludePoolLevel.add(ContentPoolEnum.from(poolLevel).getContentPool());
+                    poolLevel = getAuditPoolLevel(excludePoolLevel);
+                } while (Objects.nonNull(poolLevel));
+            }
+            if (Objects.isNull(obj)) {
+                return result;
+            }
+            redisTemplate.opsForHash().put(redisKey, obj.getContentTraceId(), String.valueOf(now + 600000));
+        } finally {
+            // 释放锁
+            redisUtil.releaseLock(lockKey, requestId);
         }
-        redisTemplate.opsForHash().put(redisKey, obj.getContentTraceId(), String.valueOf(now + 600000));
         List<VideoPoolAuditListVO> list = buildVideoPoolAuditListVO(Collections.singletonList(obj));
         result.setObjs(list);
         return result;

+ 31 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/util/RedisUtil.java

@@ -0,0 +1,31 @@
+package com.tzld.longarticle.recommend.server.util;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.ReturnType;
+import org.springframework.data.redis.core.RedisCallback;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class RedisUtil {
+
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
+    public boolean tryAcquireLock(String lockKey, String requestId) {
+        // 尝试获取锁
+        Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, requestId, 10, TimeUnit.SECONDS);
+        return Boolean.TRUE.equals(lockAcquired);
+    }
+
+    public void releaseLock(String lockKey, String requestId) {
+        // 使用 Lua 脚本确保释放锁的原子性
+        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
+        redisTemplate.execute((RedisCallback<Object>) connection -> {
+            connection.eval(script.getBytes(), ReturnType.INTEGER, 1, lockKey.getBytes(), requestId.getBytes());
+            return null;
+        });
+    }
+}