瀏覽代碼

匹配增加线程

xueyiming 8 月之前
父節點
當前提交
4f33b8a34d

+ 23 - 0
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/model/bo/MatchContent.java

@@ -0,0 +1,23 @@
+package com.tzld.piaoquan.longarticle.model.bo;
+
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString
+public class MatchContent {
+
+    private String sourceId;
+
+    private String ghId;
+
+    private String publishContentId;
+
+    private String accountName;
+
+    private String content;
+
+    private String title;
+
+    private String flowPoolLevelTag;
+}

+ 0 - 11
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/service/local/impl/ContentServiceImpl.java

@@ -40,17 +40,6 @@ public class ContentServiceImpl implements ContentService {
     private RedisTemplate<String, Object> redisTemplate;
 
     public MatchVideo getContent(String contentId, String ghId) {
-        String key = contentId + "_" + ghId;
-        String traceId = (String) redisTemplate.opsForValue().get(key);
-        if (StringUtils.isNotEmpty(traceId)) {
-            MatchVideoExample matchVideoExample = new MatchVideoExample();
-            matchVideoExample.createCriteria().andTraceIdEqualTo(traceId);
-            List<MatchVideo> matchVideos = matchVideoMapper.selectByExample(matchVideoExample);
-            if (!CollectionUtils.isEmpty(matchVideos)) {
-                redisTemplate.delete(key);
-                return matchVideos.get(0);
-            }
-        }
         MatchVideoExample matchVideoExample = new MatchVideoExample();
         matchVideoExample.createCriteria().andGhIdEqualTo(ghId).andContentIdEqualTo(contentId);
         List<MatchVideo> matchVideos = matchVideoMapper.selectByExample(matchVideoExample);

+ 95 - 51
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/service/local/impl/CoreServiceImpl.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.tzld.piaoquan.longarticle.common.enums.ContentStatusEnum;
 import com.tzld.piaoquan.longarticle.common.enums.PublishGzhPushTypeEnum;
 import com.tzld.piaoquan.longarticle.dao.mapper.*;
+import com.tzld.piaoquan.longarticle.model.bo.MatchContent;
 import com.tzld.piaoquan.longarticle.model.bo.VideoDetail;
 import com.tzld.piaoquan.longarticle.model.dto.*;
 import com.tzld.piaoquan.longarticle.model.po.*;
@@ -27,6 +28,9 @@ import org.springframework.util.CollectionUtils;
 import java.math.BigDecimal;
 import java.time.LocalTime;
 import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -162,66 +166,106 @@ public class CoreServiceImpl implements CoreService {
         return plan;
     }
 
+
+    // 定义一个阻塞队列
+    private static final ArrayBlockingQueue<MatchContent> queue = new ArrayBlockingQueue<>(100000);
+
+    private static final int size = 5;
+    // 定义一个线程池,设置消费线程的数量
+    private static final ExecutorService executorService = Executors.newFixedThreadPool(size);
+
+
     @Override
     public void matchContent() {
-        //查询状态为0的请求匹配
-        List<PlanAccount> matchPlanAccounts = planAccountService.getMatchPlanAccount();
-        if (CollectionUtils.isEmpty(matchPlanAccounts)) {
-            return;
-        }
-        for (PlanAccount planAccount : matchPlanAccounts) {
-            LongArticleSystemGetContentsParam param = new LongArticleSystemGetContentsParam();
-            param.setAccountId(planAccount.getAccountId());
-            param.setPlanId(planAccount.getPlanId());
-            LongArticleSystemContentVO longArticleSystemContentVO = aigcService.getContentItemList(param);
-            if (longArticleSystemContentVO == null) {
-                continue;
+        // 启动生产者线程
+        new Thread(() -> {
+            List<PlanAccount> matchPlanAccounts = planAccountService.getMatchPlanAccount();
+            if (CollectionUtils.isEmpty(matchPlanAccounts)) {
+                return;
             }
-            log.info("longArticleSystemContentVO total={} accountId={}", longArticleSystemContentVO.getTotalCount(), param.getAccountId());
-            //没有待匹配的文章  更新状态为1
-            if (longArticleSystemContentVO.getTotalCount() == 0
-                    || CollectionUtils.isEmpty(longArticleSystemContentVO.getContentItemList())) {
-                continue;
-            }
-            for (ContentItemVO contentItemVO : longArticleSystemContentVO.getContentItemList()) {
-                MatchVideo content = contentService.getContent(contentItemVO.getSourceId(), planAccount.getGhId());
-                if (content != null) {
-                    Integer contentStatus = content.getContentStatus();
-                    if (ContentStatusEnum.isSuccess(contentStatus)) {
-                        MatchMiniprogramStatusParam statusParam = new MatchMiniprogramStatusParam();
-                        statusParam.setStatus(2);
-                        statusParam.setPublishContentId(contentItemVO.getPublishContentId());
-                        aigcService.updateMatchMiniprogramStatus(statusParam);
-                        continue;
-                    }
-                    if (ContentStatusEnum.isFail(contentStatus)) {
-                        MatchMiniprogramStatusParam statusParam = new MatchMiniprogramStatusParam();
-                        statusParam.setStatus(3);
-                        statusParam.setPublishContentId(contentItemVO.getPublishContentId());
-                        String errorMessage = ContentStatusEnum.getErrorMessage(contentStatus);
-                        statusParam.setErrorMsg(errorMessage);
-                        aigcService.updateMatchMiniprogramStatus(statusParam);
-                    }
-                } else {
-                    MiniprogramCardRequest request = new MiniprogramCardRequest();
-                    request.setGhId(planAccount.getGhId());
-                    request.setAccountName(planAccount.getAccountName());
-                    request.setContent(contentItemVO.getContent());
-                    request.setTitle(contentItemVO.getTitle());
-                    //请求到新服务
-                    request.setStrategy("strategy_v2");
-                    request.setArticleId(contentItemVO.getSourceId());
-                    request.setFlowPoolLevelTag(contentItemVO.getFlowPoolLevelTag());
-                    String traceId = matchService.matchMiniprogramVideo(request);
-                    if (StringUtils.isNotEmpty(traceId)) {
-                        String key = contentItemVO.getSourceId() + "_" + planAccount.getGhId();
-                        redisTemplate.opsForValue().set(key, traceId, 24, TimeUnit.HOURS);
+            for (PlanAccount planAccount : matchPlanAccounts) {
+                LongArticleSystemGetContentsParam param = new LongArticleSystemGetContentsParam();
+                param.setAccountId(planAccount.getAccountId());
+                param.setPlanId(planAccount.getPlanId());
+                LongArticleSystemContentVO longArticleSystemContentVO = aigcService.getContentItemList(param);
+                if (longArticleSystemContentVO == null || CollectionUtils.isEmpty(longArticleSystemContentVO.getContentItemList())) {
+                    continue;
+                }
+                for (ContentItemVO contentItemVO : longArticleSystemContentVO.getContentItemList()) {
+                    MatchContent matchContent = new MatchContent();
+                    matchContent.setSourceId(contentItemVO.getSourceId());
+                    matchContent.setGhId(planAccount.getGhId());
+                    matchContent.setPublishContentId(contentItemVO.getPublishContentId());
+                    matchContent.setAccountName(planAccount.getAccountName());
+                    matchContent.setContent(contentItemVO.getContent());
+                    matchContent.setTitle(contentItemVO.getTitle());
+                    matchContent.setFlowPoolLevelTag(contentItemVO.getFlowPoolLevelTag());
+                    try {
+                        queue.put(matchContent);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
                     }
                 }
             }
+        }).start();
+
+        // 启动消费者线程
+        for (int i = 0; i < size; i++) {
+            executorService.submit(new Thread(() -> {
+                while (true) {
+                    try {
+                        MatchContent matchContent = queue.take();
+                        MatchVideo content = contentService.getContent(matchContent.getSourceId(), matchContent.getGhId());
+                        if (content != null) {
+                            Integer contentStatus = content.getContentStatus();
+                            if (ContentStatusEnum.isSuccess(contentStatus)) {
+                                MatchMiniprogramStatusParam statusParam = new MatchMiniprogramStatusParam();
+                                statusParam.setStatus(2);
+                                statusParam.setPublishContentId(matchContent.getPublishContentId());
+                                aigcService.updateMatchMiniprogramStatus(statusParam);
+                                continue;
+                            }
+                            if (ContentStatusEnum.isFail(contentStatus)) {
+                                MatchMiniprogramStatusParam statusParam = new MatchMiniprogramStatusParam();
+                                statusParam.setStatus(3);
+                                statusParam.setPublishContentId(matchContent.getPublishContentId());
+                                String errorMessage = ContentStatusEnum.getErrorMessage(contentStatus);
+                                statusParam.setErrorMsg(errorMessage);
+                                aigcService.updateMatchMiniprogramStatus(statusParam);
+                            }
+                        } else {
+                            MiniprogramCardRequest request = new MiniprogramCardRequest();
+                            request.setGhId(matchContent.getGhId());
+                            request.setAccountName(matchContent.getAccountName());
+                            request.setContent(matchContent.getContent());
+                            request.setTitle(matchContent.getTitle());
+                            //请求到新服务
+                            request.setStrategy("strategy_v2");
+                            request.setArticleId(matchContent.getSourceId());
+                            request.setFlowPoolLevelTag(matchContent.getFlowPoolLevelTag());
+                            matchService.matchMiniprogramVideo(request);
+                        }
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }));
         }
+
+        // 关闭线程池
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            executorService.shutdown();
+            try {
+                if (!executorService.awaitTermination(20, TimeUnit.MINUTES)) {
+                    executorService.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                executorService.shutdownNow();
+            }
+        }));
     }
 
+
     private boolean effectiveTime(String startWindow, String endWindow, TimeZoneUtil.Timezone timezone) {
         if (!org.springframework.util.StringUtils.hasText(startWindow) || !org.springframework.util.StringUtils.hasText(endWindow)) {
             return true;