瀏覽代碼

修复匹配线程

xueyiming 7 月之前
父節點
當前提交
b3779e2431

+ 51 - 51
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/service/local/impl/CoreServiceImpl.java

@@ -33,10 +33,7 @@ import org.springframework.util.CollectionUtils;
 import java.math.BigDecimal;
 import java.time.LocalTime;
 import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -180,19 +177,11 @@ public class CoreServiceImpl implements CoreService {
 
     private static final int size = 5;
     // 定义一个线程池,设置消费线程的数量
-    private static ExecutorService executorService = Executors.newFixedThreadPool(size);
+    private static final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(size);
 
 
     @Override
     public void matchContent() {
-
-        if (executorService.isShutdown() || executorService.isTerminated()) {
-            executorService = Executors.newFixedThreadPool(size);
-        } else {
-            executorService.shutdownNow();
-            executorService = Executors.newFixedThreadPool(size);
-        }
-
         // 启动生产者线程
         new Thread(() -> {
             List<PlanAccount> matchPlanAccounts = planAccountService.getMatchPlanAccount();
@@ -204,6 +193,7 @@ public class CoreServiceImpl implements CoreService {
                 param.setAccountId(planAccount.getAccountId());
                 param.setPlanId(planAccount.getPlanId());
                 LongArticleSystemContentVO longArticleSystemContentVO = aigcService.getContentItemList(param);
+                log.info("longArticleSystemContentVO={}", longArticleSystemContentVO);
                 if (longArticleSystemContentVO == null || CollectionUtils.isEmpty(longArticleSystemContentVO.getContentItemList())) {
                     continue;
                 }
@@ -226,49 +216,59 @@ public class CoreServiceImpl implements CoreService {
         }).start();
 
 
-        // 启动消费者线程
-        for (int i = 0; i < size; i++) {
-            executorService.submit(new Thread(() -> {
-                while (true) {
-                    try {
-                        MatchContent matchContent = queue.take();
-                        MatchVideo content = contentService.getContent(matchContent.getSourceId(), matchContent.getGhId(), 2);
-                        if (content != null) {
-                            Integer contentStatus = content.getContentStatus();
-                            if (ContentStatusEnum.isSuccess(contentStatus)) {
-                                MatchMiniprogramStatusParam statusParam = new MatchMiniprogramStatusParam();
-                                statusParam.setStatus(2);
-                                statusParam.setPublishContentId(matchContent.getPublishContentId());
-                                aigcService.updateMatchMiniprogramStatus(statusParam);
-                                continue;
+        if (threadPoolExecutor.getCorePoolSize() - threadPoolExecutor.getActiveCount() > 0) {
+            int threadSize = threadPoolExecutor.getCorePoolSize() - threadPoolExecutor.getActiveCount();
+            // 启动消费者线程
+            for (int i = 0; i < threadSize; i++) {
+                threadPoolExecutor.submit(new Thread(() -> {
+                    while (true) {
+                        try {
+                            // 超过 10 分钟没有数据,销毁当前线程
+                            MatchContent matchContent = queue.poll(5, TimeUnit.MINUTES); // 等待最多 10 分钟
+                            if (matchContent == null) {
+                                break; // 退出当前线程
                             }
-                            if (ContentStatusEnum.isFail(contentStatus)) {
-                                MatchMiniprogramStatusParam statusParam = new MatchMiniprogramStatusParam();
-                                statusParam.setStatus(3);
-                                statusParam.setPublishContentId(matchContent.getPublishContentId());
-                                String errorMessage = ContentStatusEnum.getErrorMessage(contentStatus);
-                                statusParam.setErrorMsg(errorMessage);
-                                aigcService.updateMatchMiniprogramStatus(statusParam);
+                            log.info("matchContent={}", matchContent);
+                            MatchVideo content = contentService.getContent(matchContent.getSourceId(), matchContent.getGhId(), 2);
+                            if (content != null) {
+                                Integer contentStatus = content.getContentStatus();
+                                if (ContentStatusEnum.isSuccess(contentStatus)) {
+                                    MatchMiniprogramStatusParam statusParam = new MatchMiniprogramStatusParam();
+                                    statusParam.setStatus(2);
+                                    statusParam.setPublishContentId(matchContent.getPublishContentId());
+                                    aigcService.updateMatchMiniprogramStatus(statusParam);
+                                    continue;
+                                }
+                                if (ContentStatusEnum.isFail(contentStatus)) {
+                                    MatchMiniprogramStatusParam statusParam = new MatchMiniprogramStatusParam();
+                                    statusParam.setStatus(3);
+                                    statusParam.setPublishContentId(matchContent.getPublishContentId());
+                                    String errorMessage = ContentStatusEnum.getErrorMessage(contentStatus);
+                                    statusParam.setErrorMsg(errorMessage);
+                                    aigcService.updateMatchMiniprogramStatus(statusParam);
+                                }
+                            } else {
+                                MiniprogramCardRequest request = new MiniprogramCardRequest();
+                                request.setGhId(matchContent.getGhId());
+                                request.setAccountName(matchContent.getAccountName());
+                                request.setContent(matchContent.getContent());
+                                request.setTitle(matchContent.getTitle());
+                                //请求到新服务
+                                request.setStrategy("strategy_v2");
+                                request.setArticleId(matchContent.getSourceId());
+                                request.setFlowPoolLevelTag(matchContent.getFlowPoolLevelTag());
+                                request.setPublishFlag(2);
+                                matchService.matchMiniprogramVideo(request);
                             }
-                        } else {
-                            MiniprogramCardRequest request = new MiniprogramCardRequest();
-                            request.setGhId(matchContent.getGhId());
-                            request.setAccountName(matchContent.getAccountName());
-                            request.setContent(matchContent.getContent());
-                            request.setTitle(matchContent.getTitle());
-                            //请求到新服务
-                            request.setStrategy("strategy_v2");
-                            request.setArticleId(matchContent.getSourceId());
-                            request.setFlowPoolLevelTag(matchContent.getFlowPoolLevelTag());
-                            request.setPublishFlag(2);
-                            matchService.matchMiniprogramVideo(request);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
                         }
-                    } catch (InterruptedException e) {
-                        throw new RuntimeException(e);
                     }
-                }
-            }));
+                }));
+            }
         }
+
+
     }