Browse Source

待发布内容预过滤 多机器并发

wangyunpeng 3 weeks ago
parent
commit
068bb08946

+ 56 - 3
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/XxlJobService.java

@@ -59,6 +59,7 @@ import com.tzld.longarticle.recommend.server.service.recommend.filter.FilterServ
 import com.tzld.longarticle.recommend.server.service.recommend.recall.RecallService;
 import com.tzld.longarticle.recommend.server.util.DateUtils;
 import com.tzld.longarticle.recommend.server.util.LarkRobotUtil;
+import com.tzld.longarticle.recommend.server.util.RedisUtil;
 import com.tzld.longarticle.recommend.server.util.feishu.FeishuMessageSender;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
@@ -78,6 +79,7 @@ import org.springframework.dao.DuplicateKeyException;
 import org.springframework.data.domain.Page;
 import org.springframework.data.domain.PageRequest;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 
@@ -151,6 +153,9 @@ public class XxlJobService {
     @Autowired
     private PublishPlanSettingRepository publishPlanSettingRepository;
 
+    @Autowired
+    private RedisUtil redisUtil;
+
     @Autowired
     private RedisTemplate<String, String> redisTemplate;
 
@@ -1120,8 +1125,27 @@ public class XxlJobService {
         log.info("syncGzhWaitingPublishContent success planId: {} accountId: {}", planId, accountId);
     }
 
-    @XxlJob("ContentPreFilterJob")
+//    @XxlJob("ContentPreFilterJob")
+    @Scheduled(cron = "0 0 1 * * ?")
     public ReturnT<String> gzhWaitingPublishContentPreFilter(String param) {
+        boolean lockAcquired = false;
+        String lockKey = "ContentPreFilterJobLock-";
+        String requestId = UUID.randomUUID().toString();
+        Integer index = 0;
+        for (int i = 0; i < 6; i++) {
+            String redisKey = lockKey + i;
+            lockAcquired = redisUtil.tryAcquireLock(redisKey, requestId);
+            if (lockAcquired) {
+                index = i;
+                lockKey = redisKey;
+                log.info("ContentPreFilterJob lock success index: {}", index);
+                break;
+            }
+        }
+        if (!lockAcquired) {
+            log.error("ContentPreFilterJob lock failed");
+            return ReturnT.SUCCESS;
+        }
         List<PublishPlanAccountDTO> planAccountList = longArticleBaseMapper.getGroupPublishPlanAccounts();
         ExecutorService thread = new CommonThreadPoolExecutor(
                 preFilterPublishContentThreadPoolSize, preFilterPublishContentThreadPoolSize, 0L, TimeUnit.SECONDS,
@@ -1132,6 +1156,10 @@ public class XxlJobService {
         String dateStr = DateUtils.getCurrentDateStr("yyyyMMdd");
         for (PublishPlanAccountDTO item : planAccountList) {
             String redisKey = "ContentPreFilterJob:" + dateStr + ":" + item.getPlanId() + "-" + item.getAccountId();
+            if (!(calculateRemainder(item.getAccountId(), 6) == index)) {
+                continue;
+            }
+            Integer finalIndex = index;
             thread.submit(() -> {
                 boolean success = false;
                 try {
@@ -1154,11 +1182,14 @@ public class XxlJobService {
                             BeanUtils.copyProperties(filterContent, saveItem);
                             filterContentList.add(saveItem);
                         }
-                        log.info("ContentPreFilterJob success planId: {} accountId: {} filterContent: {}",
-                                item.getPlanId(), item.getAccountId(), filterContentList.size());
+                        log.info("ContentPreFilterJob success index:{} planId: {} accountId: {} filterContent: {}",
+                                finalIndex, item.getPlanId(), item.getAccountId(), filterContentList.size());
                         recommendService.updateWaitingContentFilter(filterContentList);
                         success = true;
                     }
+                } catch (Exception e) {
+                    log.error("ContentPreFilterJob error index:{} planId: {} accountId: {}",
+                            finalIndex, item.getPlanId(), item.getAccountId(), e);
                 } finally {
                     if (success) {
                         redisTemplate.opsForValue().set(redisKey, "1", 1, TimeUnit.DAYS);
@@ -1171,10 +1202,32 @@ public class XxlJobService {
             cdl.await();
         } catch (InterruptedException e) {
             log.error("gzhWaitingPublishContentPreFilter error", e);
+        } finally {
+            redisUtil.releaseLock(lockKey, requestId);
         }
         return ReturnT.SUCCESS;
     }
 
+    /**
+     * 计算长字符串数字余数
+     * @param numberStr 长字符串数字
+     * @param divisor 除数
+     * @return 余数
+     */
+    public int calculateRemainder(String numberStr, int divisor) {
+        int remainder = 0;
+        for (int i = 0; i < numberStr.length(); i++) {
+            char c = numberStr.charAt(i);
+            if (Character.isDigit(c)) {
+                int digit = c - '0';
+                remainder = (remainder * 10 + digit) % divisor;
+            } else {
+                throw new IllegalArgumentException("字符串包含非数字字符: " + c);
+            }
+        }
+        return remainder;
+    }
+
     private FilterParam buildFilterParam(String planId, Account account, List<Content> contentList) {
         PublishPlanSetting publishPlanSetting = publishPlanSettingRepository.getByPlanId(planId);
         String type = ArticleTypeEnum.QUNFA.getVal();