Преглед на файлове

CgiReplyBucketData 加锁去重

wangyunpeng преди 1 месец
родител
ревизия
d90cf12dfc

+ 39 - 22
api-module/src/main/java/com/tzld/piaoquan/api/service/strategy/impl/BuckStrategyV1.java

@@ -94,15 +94,26 @@ public class BuckStrategyV1 implements ReplyStrategyService {
         // 0 获取策略key
         JSONObject bucketStrategyConfigJsonObject = getStrategyConfig(bucketDataParam);
         Set<String> keyedSet = bucketStrategyConfigJsonObject.keySet();
-        // 1 处理文章--算法引擎--排序文章数据
-//        getWenzhangData();
-        // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
-        List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
-        // 2.1 获取小程序落地页地址 http调用
-        smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList);
-        log.info(JSON.toJSONString(smallDataCgiReplyList));
-        // 3 入库读表
-        insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        // 加锁,防止并发请求重复触发数据生成(readStrategyOrderSmallData + setSmallPageUrl + insertSmallData)
+        String LOCK_KEY = "getResult:" + bucketDataParam.getGhId();
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 60, 10, 1000);
+        if (!locked) {
+            log.warn("BuckStrategyV1 getResult 获取锁失败,直接读取已有数据,ghId:" + bucketDataParam.getGhId());
+            return getReplyBucketData(bucketStrategyConfigJsonObject, keyedSet, bucketDataParam);
+        }
+        try {
+            // 1 处理文章--算法引擎--排序文章数据
+//            getWenzhangData();
+            // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
+            List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
+            // 2.1 获取小程序落地页地址 http调用
+            smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList);
+            log.info(JSON.toJSONString(smallDataCgiReplyList));
+            // 3 入库读表
+            insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        } finally {
+            redisUtils.del(LOCK_KEY);
+        }
         // 4 组装分桶数据
         return getReplyBucketData(bucketStrategyConfigJsonObject, keyedSet, bucketDataParam);
     }
@@ -246,20 +257,13 @@ public class BuckStrategyV1 implements ReplyStrategyService {
         if (CollectionUtils.isEmpty(smallDataCgiReplyList)) {
             return;
         }
-        // 循环获取锁,锁5s,每获取一次锁,未获取到锁,等待1s,最多等待10次
-        int tryCount = 0;
+        // 循环获取锁,锁30s,每次重试间隔 1s,最多重试10次
         String LOCK_KEY = "insertSmallData:" + bucketDataParam.getGhId();
-        while (tryCount < 10) {
-            if (redisUtils.tryLock(LOCK_KEY, "1", 5)) {
-                break;
-            } else {
-                try {
-                    Thread.sleep(1000);
-                } catch (Exception e) {
-                    log.error("BuckStrategyV1 insertSmallData 尝试获取锁失败,ghId:" + bucketDataParam.getGhId() + ",tryCount:" + tryCount, e);
-                }
-                tryCount++;
-            }
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 30, 10, 1000);
+        // 未获取到锁,直接返回,避免重复入库
+        if (!locked) {
+            log.error("BuckStrategyV1 insertSmallData 获取锁失败,放弃入库,ghId:" + bucketDataParam.getGhId());
+            return;
         }
         try {
             for (String key : keyedSet) {
@@ -280,6 +284,19 @@ public class BuckStrategyV1 implements ReplyStrategyService {
                 List<CgiReplyBucketData> cgiReplyBucketData1 = cgiReplyBucketDataMapper.selectByExample(cgiReplyBucketDataExample);
                 Map<Long, CgiReplyBucketData> cgiReplyBucketDataMap = cgiReplyBucketData1.stream()
                         .collect(Collectors.toMap(CgiReplyBucketData::getMiniVideoId, x -> x, (a, b) -> b));
+                // 锁内二次校验:若已存在相同视频ID的 is_delete=0 记录,说明并发请求已处理,跳过本次入库
+                if (!CollectionUtils.isEmpty(cgiReplyBucketData1) && cgiReplyBucketData1.size() == collect.size()) {
+                    Set<Long> existVideoIds = cgiReplyBucketDataMap.keySet();
+                    Set<Long> toInsertVideoIds = collect.stream()
+                            .map(x -> x.getMiniVideoId() != null ? x.getMiniVideoId() : MessageUtil.getVideoId(x.getMiniPagePath()))
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toSet());
+                    if (existVideoIds.equals(toInsertVideoIds)) {
+                        log.info("BuckStrategyV1 insertSmallData 锁内二次校验:数据已被并发请求处理,跳过,key:{},ghId:{}",
+                                key, bucketDataParam.getGhId());
+                        continue;
+                    }
+                }
                 // 清上个版本的策略数据
                 cgiReplyBucketDataMapperExt.updateDeleteStatus(bucketDataParam.getGhId(), key, 1, 1);
                 List<GhDetailExt> ghDetailExtList = ghDetailService.getGhDetailExtList(bucketDataParam.getGhId(), GhTypeEnum.GH.type);

+ 35 - 25
api-module/src/main/java/com/tzld/piaoquan/api/service/strategy/impl/ThirdPartyPushMessageStrategyV1.java

@@ -90,15 +90,26 @@ public class ThirdPartyPushMessageStrategyV1 implements ReplyStrategyService {
             configJsonObject = JSON.parseObject(bucketStrategyConfig);
         }
         Set<String> keyedSet = configJsonObject.keySet();
-        // 1 处理文章--算法引擎--排序文章数据
-//        getWenzhangData();
-        // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
-        List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
-        // 2.1 获取小程序落地页地址 http调用
-        smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList, bucketDataParam.getChannel());
-        log.info(JSON.toJSONString(smallDataCgiReplyList));
-        // 3 入库读表
-        insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        // 加锁,防止并发请求重复触发数据生成(readStrategyOrderSmallData + setSmallPageUrl + insertSmallData)
+        String LOCK_KEY = "getResult:" + bucketDataParam.getGhId();
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 60, 10, 1000);
+        if (!locked) {
+            log.warn("ThirdPartyPushMessageStrategyV1 getResult 获取锁失败,直接读取已有数据,ghId:" + bucketDataParam.getGhId());
+            return getReplyBucketData(configJsonObject, keyedSet, bucketDataParam.getGhId());
+        }
+        try {
+            // 1 处理文章--算法引擎--排序文章数据
+//            getWenzhangData();
+            // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
+            List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
+            // 2.1 获取小程序落地页地址 http调用
+            smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList, bucketDataParam.getChannel());
+            log.info(JSON.toJSONString(smallDataCgiReplyList));
+            // 3 入库读表
+            insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        } finally {
+            redisUtils.del(LOCK_KEY);
+        }
         // 4 组装分桶数据
         return getReplyBucketData(configJsonObject, keyedSet, bucketDataParam.getGhId());
     }
@@ -151,23 +162,9 @@ public class ThirdPartyPushMessageStrategyV1 implements ReplyStrategyService {
         if (CollectionUtils.isEmpty(smallDataCgiReplyList)) {
             return;
         }
-        // 循环获取锁,锁5s,每获取一次锁,未获取到锁,等待1s,最多等待10次
-        int tryCount = 0;
+        // 循环获取锁,锁30s,每次重试间隔 1s,最多重试10次
         String LOCK_KEY = "insertSmallData:" + bucketDataParam.getGhId();
-        boolean locked = false;
-        while (tryCount < 10) {
-            if (redisUtils.tryLock(LOCK_KEY, "1", 5)) {
-                locked = true;
-                break;
-            } else {
-                try {
-                    Thread.sleep(1000);
-                } catch (Exception e) {
-                    log.error("ThirdPartyPushMessageStrategyV1 insertSmallData 尝试获取锁失败,ghId:" + bucketDataParam.getGhId() + ",tryCount:" + tryCount, e);
-                }
-                tryCount++;
-            }
-        }
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 30, 10, 1000);
         // 未获取到锁,直接返回,避免重复入库
         if (!locked) {
             log.error("ThirdPartyPushMessageStrategyV1 insertSmallData 获取锁失败,放弃入库,ghId:" + bucketDataParam.getGhId());
@@ -192,6 +189,19 @@ public class ThirdPartyPushMessageStrategyV1 implements ReplyStrategyService {
                 List<CgiReplyBucketData> cgiReplyBucketData1 = cgiReplyBucketDataMapper.selectByExample(cgiReplyBucketDataExample);
                 Map<Long, CgiReplyBucketData> cgiReplyBucketDataMap = cgiReplyBucketData1.stream()
                         .collect(Collectors.toMap(CgiReplyBucketData::getMiniVideoId, x -> x, (a, b) -> b));
+                // 锁内二次校验:若已存在相同视频ID的 is_delete=0 记录,说明并发请求已处理,跳过本次入库
+                if (!CollectionUtils.isEmpty(cgiReplyBucketData1) && cgiReplyBucketData1.size() == collect.size()) {
+                    Set<Long> existVideoIds = cgiReplyBucketDataMap.keySet();
+                    Set<Long> toInsertVideoIds = collect.stream()
+                            .map(x -> x.getMiniVideoId() != null ? x.getMiniVideoId() : MessageUtil.getVideoId(x.getMiniPagePath()))
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toSet());
+                    if (existVideoIds.equals(toInsertVideoIds)) {
+                        log.info("ThirdPartyPushMessageStrategyV1 insertSmallData 锁内二次校验:数据已被并发请求处理,跳过,key:{},ghId:{}",
+                                key, bucketDataParam.getGhId());
+                        continue;
+                    }
+                }
                 // 清上个版本的策略数据
                 cgiReplyBucketDataMapperExt.updateDeleteStatus(bucketDataParam.getGhId(), key, 1, 1);
                 List<ContentPlatformGzhPlanVideo> gzhPlanVideoList = contentPlatformPlanService.getGzhPlanVideoListByCooperateAccountId(bucketDataParam.getGhId());

+ 89 - 53
api-module/src/main/java/com/tzld/piaoquan/api/service/strategy/impl/WeComPushMessageStrategyV1.java

@@ -73,7 +73,7 @@ public class WeComPushMessageStrategyV1 implements ReplyStrategyService {
 
     @Override
     public ReplyBucketData getResult(BucketDataParam bucketDataParam) {
-        log.info("ThirdPartyPushMessageStrategyV1 start");
+        log.info("WeComPushMessageStrategyV1 start");
         // 0 获取策略key
         JSONObject configJsonObject;
         if (Objects.equals(StrategyStatusEnum.DEFAULT.status, bucketDataParam.getStrategyStatus())) {
@@ -82,15 +82,26 @@ public class WeComPushMessageStrategyV1 implements ReplyStrategyService {
             configJsonObject = JSON.parseObject(bucketStrategyConfig);
         }
         Set<String> keyedSet = configJsonObject.keySet();
-        // 1 处理文章--算法引擎--排序文章数据
-//        getWenzhangData();
-        // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
-        List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
-        // 2.1 获取小程序落地页地址 http调用
-        smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList);
-        log.info(JSON.toJSONString(smallDataCgiReplyList));
-        // 3 入库读表
-        insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        // 加锁,防止并发请求重复触发数据生成(readStrategyOrderSmallData + setSmallPageUrl + insertSmallData)
+        String LOCK_KEY = "getResult:" + bucketDataParam.getGhId();
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 60, 10, 1000);
+        if (!locked) {
+            log.warn("WeComPushMessageStrategyV1 getResult 获取锁失败,直接读取已有数据,ghId:" + bucketDataParam.getGhId());
+            return getReplyBucketData(configJsonObject, keyedSet, bucketDataParam.getGhId());
+        }
+        try {
+            // 1 处理文章--算法引擎--排序文章数据
+//            getWenzhangData();
+            // 2 处理小程序--读取离线数据表--获取策略排序小程序数据
+            List<CgiReplyBucketData> smallDataCgiReplyList = readStrategyOrderSmallData(keyedSet, bucketDataParam);
+            // 2.1 获取小程序落地页地址 http调用
+            smallDataCgiReplyList = setSmallPageUrl(smallDataCgiReplyList);
+            log.info(JSON.toJSONString(smallDataCgiReplyList));
+            // 3 入库读表
+            insertSmallData(smallDataCgiReplyList, keyedSet, bucketDataParam);
+        } finally {
+            redisUtils.del(LOCK_KEY);
+        }
         // 4 组装分桶数据
         return getReplyBucketData(configJsonObject, keyedSet, bucketDataParam.getGhId());
     }
@@ -143,54 +154,79 @@ public class WeComPushMessageStrategyV1 implements ReplyStrategyService {
         if (CollectionUtils.isEmpty(smallDataCgiReplyList)) {
             return;
         }
-        for (String key : keyedSet) {
-            if ("base".equals(key)) {
-                continue;
-            }
-            List<CgiReplyBucketData> collect = smallDataCgiReplyList.stream()
-                    .filter(x -> x.getStrategy().equals(key))
-                    .filter(x -> x.getGhId().equals(bucketDataParam.getGhId()))
-                    .collect(Collectors.toList());
-            if (CollectionUtils.isEmpty(collect)) {
-                log.info("WeComPushMessageStrategyV1 insertSmallData 数据未变更跳过,key:{},ghId:{},data:{}",
-                        key, bucketDataParam.getGhId(), JSON.toJSONString(smallDataCgiReplyList));
-                continue;
-            }
-            CgiReplyBucketDataExample cgiReplyBucketDataExample = new CgiReplyBucketDataExample();
-            cgiReplyBucketDataExample.createCriteria().andIsDeleteEqualTo(0).andMsgTypeEqualTo(1).andStrategyEqualTo(key).andGhIdEqualTo(bucketDataParam.getGhId());
-            List<CgiReplyBucketData> cgiReplyBucketData1 = cgiReplyBucketDataMapper.selectByExample(cgiReplyBucketDataExample);
-            Map<Long, CgiReplyBucketData> cgiReplyBucketDataMap = cgiReplyBucketData1.stream()
-                    .collect(Collectors.toMap(CgiReplyBucketData::getMiniVideoId, x -> x, (a, b) -> b));
-            // 清上个版本的策略数据
-            cgiReplyBucketDataMapperExt.updateDeleteStatus(bucketDataParam.getGhId(), key, 1, 1);
-            // 入库
-            for (CgiReplyBucketData cgiReplyBucketData : collect) {
-                CgiReplyBucketData oldData = cgiReplyBucketDataMap.get(cgiReplyBucketData.getMiniVideoId());
-                if (Objects.isNull(oldData)) {
-                    oldData = cgiReplyBucketDataMapperExt.getOldCgiReplyData(cgiReplyBucketData.getMiniVideoId());
+        // 循环获取锁,锁30s,每次重试间隔 1s,最多重试10次
+        String LOCK_KEY = "insertSmallData:" + bucketDataParam.getGhId();
+        boolean locked = redisUtils.tryLockWithRetry(LOCK_KEY, "1", 30, 10, 1000);
+        // 未获取到锁,直接返回,避免重复入库
+        if (!locked) {
+            log.error("WeComPushMessageStrategyV1 insertSmallData 获取锁失败,放弃入库,ghId:" + bucketDataParam.getGhId());
+            return;
+        }
+        try {
+            for (String key : keyedSet) {
+                if ("base".equals(key)) {
+                    continue;
+                }
+                List<CgiReplyBucketData> collect = smallDataCgiReplyList.stream()
+                        .filter(x -> x.getStrategy().equals(key))
+                        .filter(x -> x.getGhId().equals(bucketDataParam.getGhId()))
+                        .collect(Collectors.toList());
+                if (CollectionUtils.isEmpty(collect)) {
+                    log.info("WeComPushMessageStrategyV1 insertSmallData 数据未变更跳过,key:{},ghId:{},data:{}",
+                            key, bucketDataParam.getGhId(), JSON.toJSONString(smallDataCgiReplyList));
+                    continue;
+                }
+                CgiReplyBucketDataExample cgiReplyBucketDataExample = new CgiReplyBucketDataExample();
+                cgiReplyBucketDataExample.createCriteria().andIsDeleteEqualTo(0).andMsgTypeEqualTo(1).andStrategyEqualTo(key).andGhIdEqualTo(bucketDataParam.getGhId());
+                List<CgiReplyBucketData> cgiReplyBucketData1 = cgiReplyBucketDataMapper.selectByExample(cgiReplyBucketDataExample);
+                Map<Long, CgiReplyBucketData> cgiReplyBucketDataMap = cgiReplyBucketData1.stream()
+                        .collect(Collectors.toMap(CgiReplyBucketData::getMiniVideoId, x -> x, (a, b) -> b));
+                // 锁内二次校验:若已存在相同视频ID的 is_delete=0 记录,说明并发请求已处理,跳过本次入库
+                if (!CollectionUtils.isEmpty(cgiReplyBucketData1) && cgiReplyBucketData1.size() == collect.size()) {
+                    Set<Long> existVideoIds = cgiReplyBucketDataMap.keySet();
+                    Set<Long> toInsertVideoIds = collect.stream()
+                            .map(x -> x.getMiniVideoId() != null ? x.getMiniVideoId() : MessageUtil.getVideoId(x.getMiniPagePath()))
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toSet());
+                    if (existVideoIds.equals(toInsertVideoIds)) {
+                        log.info("WeComPushMessageStrategyV1 insertSmallData 锁内二次校验:数据已被并发请求处理,跳过,key:{},ghId:{}",
+                                key, bucketDataParam.getGhId());
+                        continue;
+                    }
                 }
-                if (Objects.nonNull(oldData)) {
-                    if (StringUtils.isEmpty(cgiReplyBucketData.getTitle())) {
-                        cgiReplyBucketData.setTitle(oldData.getTitle());
+                // 清上个版本的策略数据
+                cgiReplyBucketDataMapperExt.updateDeleteStatus(bucketDataParam.getGhId(), key, 1, 1);
+                // 入库
+                for (CgiReplyBucketData cgiReplyBucketData : collect) {
+                    CgiReplyBucketData oldData = cgiReplyBucketDataMap.get(cgiReplyBucketData.getMiniVideoId());
+                    if (Objects.isNull(oldData)) {
+                        oldData = cgiReplyBucketDataMapperExt.getOldCgiReplyData(cgiReplyBucketData.getMiniVideoId());
                     }
-                    if (StringUtils.isEmpty(cgiReplyBucketData.getCoverUrl())) {
-                        cgiReplyBucketData.setCoverUrl(oldData.getCoverUrl());
+                    if (Objects.nonNull(oldData)) {
+                        if (StringUtils.isEmpty(cgiReplyBucketData.getTitle())) {
+                            cgiReplyBucketData.setTitle(oldData.getTitle());
+                        }
+                        if (StringUtils.isEmpty(cgiReplyBucketData.getCoverUrl())) {
+                            cgiReplyBucketData.setCoverUrl(oldData.getCoverUrl());
+                        }
                     }
+                    String pageUrl = videoMultiService.setVideoMultiTitleCoverPagePath(cgiReplyBucketData.getMiniVideoId(),
+                            cgiReplyBucketData.getMiniPagePath(), cgiReplyBucketData.getTitle(), cgiReplyBucketData.getCoverUrl());
+                    cgiReplyBucketData.setMiniPagePath(pageUrl);
+                    cgiReplyBucketData.setRootSourceId(MessageUtil.getRootSourceId(cgiReplyBucketData.getMiniPagePath()));
+                    cgiReplyBucketDataMapper.insertSelective(cgiReplyBucketData);
+                    String redisKey = "auto_reply_video_detail_" + cgiReplyBucketData.getRootSourceId();
+                    VideoCharacteristicVO vo = new VideoCharacteristicVO();
+                    vo.setGhId(cgiReplyBucketData.getGhId());
+                    vo.setName(bucketDataParam.getAccountName());
+                    vo.setVideoId(cgiReplyBucketData.getMiniVideoId());
+                    vo.setTitle(cgiReplyBucketData.getTitle());
+                    vo.setCover(cgiReplyBucketData.getCoverUrl());
+                    redisUtils.set(redisKey, JSONObject.toJSONString(vo), 3L * 24 * 60 * 60);
                 }
-                String pageUrl = videoMultiService.setVideoMultiTitleCoverPagePath(cgiReplyBucketData.getMiniVideoId(),
-                        cgiReplyBucketData.getMiniPagePath(), cgiReplyBucketData.getTitle(), cgiReplyBucketData.getCoverUrl());
-                cgiReplyBucketData.setMiniPagePath(pageUrl);
-                cgiReplyBucketData.setRootSourceId(MessageUtil.getRootSourceId(cgiReplyBucketData.getMiniPagePath()));
-                cgiReplyBucketDataMapper.insertSelective(cgiReplyBucketData);
-                String redisKey = "auto_reply_video_detail_" + cgiReplyBucketData.getRootSourceId();
-                VideoCharacteristicVO vo = new VideoCharacteristicVO();
-                vo.setGhId(cgiReplyBucketData.getGhId());
-                vo.setName(bucketDataParam.getAccountName());
-                vo.setVideoId(cgiReplyBucketData.getMiniVideoId());
-                vo.setTitle(cgiReplyBucketData.getTitle());
-                vo.setCover(cgiReplyBucketData.getCoverUrl());
-                redisUtils.set(redisKey, JSONObject.toJSONString(vo), 3L * 24 * 60 * 60);
             }
+        } finally {
+            redisUtils.del(LOCK_KEY);
         }
     }
 

+ 26 - 0
common-module/src/main/java/com/tzld/piaoquan/growth/common/utils/RedisUtils.java

@@ -191,6 +191,32 @@ public class RedisUtils {
         return false;
     }
 
+    /**
+     * 带重试的尝试获取分布式锁
+     *
+     * @param key              锁key
+     * @param value            锁value
+     * @param expireSeconds    锁超时时间(秒)
+     * @param maxRetry         最大重试次数
+     * @param retryIntervalMs  每次重试间隔(毫秒)
+     * @return true:获取锁成功,false:达到最大重试次数仍未获取到锁
+     */
+    public boolean tryLockWithRetry(String key, String value, long expireSeconds, int maxRetry, long retryIntervalMs) {
+        for (int i = 0; i < maxRetry; i++) {
+            if (tryLock(key, value, expireSeconds)) {
+                return true;
+            }
+            try {
+                Thread.sleep(retryIntervalMs);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.warn("tryLockWithRetry 被中断,放弃获取锁,key:{}", key);
+                return false;
+            }
+        }
+        return false;
+    }
+
     public void listLeftPush(String key, String value) {
         redisTemplate.opsForList().leftPush(key, value);