Kaynağa Gözat

修改并发任务

xueyiming 7 ay önce
ebeveyn
işleme
d0d94949fd

+ 1 - 1
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/job/PlanAccountJob.java

@@ -41,7 +41,7 @@ public class PlanAccountJob {
     @XxlJob("corePlanAccountJob")
     public ReturnT<String> corePlanAccount(String param) {
         try {
-            coreService.core(param);
+            coreService.core();
         } catch (Exception e) {
             LarkRobotUtil.sendMessage("长文系统核心流程异常,请及时查看,@薛一鸣");
             log.error("corePlanAccountJob error", e);

+ 2 - 2
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/service/local/CoreService.java

@@ -4,9 +4,9 @@ public interface CoreService {
 
     void initPlanAccount();
 
-    void matchContent();
+    void matchContent() throws InterruptedException;
 
-    void core(String param) throws InterruptedException;
+    void core() throws InterruptedException;
 
     void getPushStatus();
 }

+ 47 - 58
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/service/local/impl/CoreServiceImpl.java

@@ -173,9 +173,9 @@ public class CoreServiceImpl implements CoreService {
 
 
     // 定义一个阻塞队列
-    private static final ArrayBlockingQueue<MatchContent> queue = new ArrayBlockingQueue<>(100000);
+    private static final ArrayBlockingQueue<MatchContent> matchQueue = new ArrayBlockingQueue<>(100000);
 
-    private static final ArrayBlockingQueue<PlanAccount> queue1 = new ArrayBlockingQueue<>(100000);
+    private static final ArrayBlockingQueue<PlanAccount> coreQueue = new ArrayBlockingQueue<>(100000);
 
 
     private static final int size = 5;
@@ -186,51 +186,19 @@ public class CoreServiceImpl implements CoreService {
 
 
     @Override
-    public void matchContent() {
-        // 启动生产者线程
-        new Thread(() -> {
-            List<PlanAccount> matchPlanAccounts = planAccountService.getMatchPlanAccount();
-            if (CollectionUtils.isEmpty(matchPlanAccounts)) {
-                return;
-            }
-            for (PlanAccount planAccount : matchPlanAccounts) {
-                GetContentsParam param = new GetContentsParam();
-                param.setAccountId(planAccount.getAccountId());
-                param.setPlanId(planAccount.getPlanId());
-                LongArticleSystemContentVO longArticleSystemContentVO = aigcService.getContentItemList(param);
-                if (longArticleSystemContentVO == null || CollectionUtils.isEmpty(longArticleSystemContentVO.getContentItemList())) {
-                    continue;
-                }
-                log.info("longArticleSystemContentVO totalCount={}", longArticleSystemContentVO.getTotalCount());
-                for (ContentItemVO contentItemVO : longArticleSystemContentVO.getContentItemList()) {
-                    MatchContent matchContent = new MatchContent();
-                    matchContent.setSourceId(contentItemVO.getSourceId());
-                    matchContent.setGhId(planAccount.getGhId());
-                    matchContent.setPublishContentId(contentItemVO.getPublishContentId());
-                    matchContent.setAccountName(planAccount.getAccountName());
-                    matchContent.setContent(contentItemVO.getContent());
-                    matchContent.setTitle(contentItemVO.getTitle());
-                    matchContent.setFlowPoolLevelTag(contentItemVO.getFlowPoolLevelTag());
-                    try {
-                        queue.put(matchContent);
-                    } catch (InterruptedException e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-            }
-        }).start();
-
-
+    public void matchContent() throws InterruptedException {
         if (matchPoolExecutor.getCorePoolSize() - matchPoolExecutor.getActiveCount() > 0) {
             int threadSize = matchPoolExecutor.getCorePoolSize() - matchPoolExecutor.getActiveCount();
+            log.info("threadNum={}", threadSize);
+            CountDownLatch countDownLatch = new CountDownLatch(threadSize);
             // 启动消费者线程
             for (int i = 0; i < threadSize; i++) {
                 matchPoolExecutor.submit(new Thread(() -> {
                     log.info("启动匹配小程序线程");
                     while (true) {
                         try {
-                            // 超过 10 分钟没有数据,销毁当前线程
-                            MatchContent matchContent = queue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
+                            // 超过 5 分钟没有数据,销毁当前线程
+                            MatchContent matchContent = matchQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
                             if (matchContent == null) {
                                 break; // 退出当前线程
                             }
@@ -270,11 +238,41 @@ public class CoreServiceImpl implements CoreService {
                         }
                     }
                     log.info("启动匹配小程序线程结束");
+                    countDownLatch.countDown();
                 }));
             }
-        }
-
 
+            List<PlanAccount> matchPlanAccounts = planAccountService.getMatchPlanAccount();
+            if (CollectionUtils.isEmpty(matchPlanAccounts)) {
+                return;
+            }
+            for (PlanAccount planAccount : matchPlanAccounts) {
+                GetContentsParam param = new GetContentsParam();
+                param.setAccountId(planAccount.getAccountId());
+                param.setPlanId(planAccount.getPlanId());
+                LongArticleSystemContentVO longArticleSystemContentVO = aigcService.getContentItemList(param);
+                if (longArticleSystemContentVO == null || CollectionUtils.isEmpty(longArticleSystemContentVO.getContentItemList())) {
+                    continue;
+                }
+                log.info("longArticleSystemContentVO totalCount={}", longArticleSystemContentVO.getTotalCount());
+                for (ContentItemVO contentItemVO : longArticleSystemContentVO.getContentItemList()) {
+                    MatchContent matchContent = new MatchContent();
+                    matchContent.setSourceId(contentItemVO.getSourceId());
+                    matchContent.setGhId(planAccount.getGhId());
+                    matchContent.setPublishContentId(contentItemVO.getPublishContentId());
+                    matchContent.setAccountName(planAccount.getAccountName());
+                    matchContent.setContent(contentItemVO.getContent());
+                    matchContent.setTitle(contentItemVO.getTitle());
+                    matchContent.setFlowPoolLevelTag(contentItemVO.getFlowPoolLevelTag());
+                    try {
+                        matchQueue.put(matchContent);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            countDownLatch.await();
+        }
     }
 
 
@@ -407,12 +405,11 @@ public class CoreServiceImpl implements CoreService {
 
 
     @Override
-    public void core(String param) throws InterruptedException {
-
+    public void core() throws InterruptedException {
         if (corePoolExecutor.getCorePoolSize() - corePoolExecutor.getActiveCount() > 0) {
             int threadSize = corePoolExecutor.getCorePoolSize() - corePoolExecutor.getActiveCount();
             log.info("threadNum={}", threadSize);
-            CountDownLatch latch = new CountDownLatch(threadSize);
+            CountDownLatch countDownLatch = new CountDownLatch(threadSize);
             // 启动消费者线程
             for (int i = 0; i < threadSize; i++) {
                 corePoolExecutor.submit(new Thread(() -> {
@@ -420,7 +417,7 @@ public class CoreServiceImpl implements CoreService {
                     while (true) {
                         try {
                             // 超过 5 分钟没有数据,销毁当前线程
-                            PlanAccount planAccount = queue1.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
+                            PlanAccount planAccount = coreQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
                             if (planAccount == null) {
                                 break; // 退出当前线程
                             }
@@ -430,30 +427,22 @@ public class CoreServiceImpl implements CoreService {
                         }
                     }
                     log.info("核心操作线程结束");
-                    latch.countDown();
+                    countDownLatch.countDown();
                 }));
             }
             List<PlanAccount> planAccounts;
-            if (StringUtils.isEmpty(param)) {
-                planAccounts = planAccountService.getNormalPlanAccount();
-            } else {
-                planAccounts = planAccountService.getPlanAccount(param);
-            }
+            planAccounts = planAccountService.getNormalPlanAccount();
             if (CollectionUtils.isEmpty(planAccounts)) {
                 return;
             }
             for (PlanAccount planAccount : planAccounts) {
-                boolean flag = checkPlanAccount(planAccount);
-                if (!flag && StringUtils.isEmpty(param)) {
-                    continue;
-                }
-                queue1.put(planAccount);
+                coreQueue.put(planAccount);
             }
-            latch.await();
+            countDownLatch.await();
         }
     }
 
-    private void operatePlanAccount(PlanAccount planAccount) {
+    public void operatePlanAccount(PlanAccount planAccount) {
         boolean flag = checkPlanAccount(planAccount);
         if (!flag) {
             return;