xueyiming пре 7 месеци
родитељ
комит
f248d9d338

+ 1 - 1
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/service/local/CoreService.java

@@ -6,7 +6,7 @@ public interface CoreService {
 
     void matchContent();
 
-    void core(String param);
+    void core(String param) throws InterruptedException;
 
     void getPushStatus();
 }

+ 111 - 78
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/service/local/impl/CoreServiceImpl.java

@@ -35,10 +35,7 @@ import org.springframework.util.CollectionUtils;
 import java.math.BigDecimal;
 import java.time.LocalTime;
 import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -178,9 +175,15 @@ public class CoreServiceImpl implements CoreService {
     // 定义一个阻塞队列
     private static final ArrayBlockingQueue<MatchContent> queue = new ArrayBlockingQueue<>(100000);
 
+    private static final ArrayBlockingQueue<PlanAccount> queue1 = new ArrayBlockingQueue<>(100000);
+
+
     private static final int size = 5;
     // 定义一个线程池,设置消费线程的数量
-    private static final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(size);
+    private static final ThreadPoolExecutor matchPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(size);
+
+    private static final int size1 = 10;
+    private static final ThreadPoolExecutor corePoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(size1);
 
 
     @Override
@@ -219,11 +222,11 @@ public class CoreServiceImpl implements CoreService {
         }).start();
 
 
-        if (threadPoolExecutor.getCorePoolSize() - threadPoolExecutor.getActiveCount() > 0) {
-            int threadSize = threadPoolExecutor.getCorePoolSize() - threadPoolExecutor.getActiveCount();
+        if (matchPoolExecutor.getCorePoolSize() - matchPoolExecutor.getActiveCount() > 0) {
+            int threadSize = matchPoolExecutor.getCorePoolSize() - matchPoolExecutor.getActiveCount();
             // 启动消费者线程
             for (int i = 0; i < threadSize; i++) {
-                threadPoolExecutor.submit(new Thread(() -> {
+                matchPoolExecutor.submit(new Thread(() -> {
                     log.info("启动匹配小程序线程");
                     while (true) {
                         try {
@@ -405,88 +408,118 @@ public class CoreServiceImpl implements CoreService {
 
 
     @Override
-    public void core(String param) {
-        List<PlanAccount> planAccounts;
-        if (StringUtils.isEmpty(param)) {
-            planAccounts = planAccountService.getNormalPlanAccount();
-        } else {
-            planAccounts = planAccountService.getPlanAccount(param);
+    public void core(String param) throws InterruptedException {
+
+        if (corePoolExecutor.getCorePoolSize() - corePoolExecutor.getActiveCount() > 0) {
+            int threadSize = corePoolExecutor.getCorePoolSize() - corePoolExecutor.getActiveCount();
+            CountDownLatch latch = new CountDownLatch(threadSize);
+            // 启动消费者线程
+            for (int i = 0; i < threadSize; i++) {
+                matchPoolExecutor.submit(new Thread(() -> {
+                    log.info("启动匹配小程序线程");
+                    while (true) {
+                        try {
+                            // 超过 5 分钟没有数据,销毁当前线程
+                            PlanAccount planAccount = queue1.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
+                            if (planAccount == null) {
+                                break; // 退出当前线程
+                            }
+                            operatePlanAccount(planAccount);
+                        } catch (Exception e) {
+                            log.error("", e);
+                        }
+                    }
+                    log.info("核心操作线程结束");
+                    latch.countDown();
+                }));
+            }
+            List<PlanAccount> planAccounts;
+            if (StringUtils.isEmpty(param)) {
+                planAccounts = planAccountService.getNormalPlanAccount();
+            } else {
+                planAccounts = planAccountService.getPlanAccount(param);
+            }
+            if (CollectionUtils.isEmpty(planAccounts)) {
+                return;
+            }
+            for (PlanAccount planAccount : planAccounts) {
+                boolean flag = checkPlanAccount(planAccount);
+                if (!flag && StringUtils.isEmpty(param)) {
+                    continue;
+                }
+                queue1.put(planAccount);
+            }
+            latch.await();
         }
-        if (CollectionUtils.isEmpty(planAccounts)) {
+    }
+
+    private void operatePlanAccount(PlanAccount planAccount) {
+        //获取待排序数据  数量不足会返回空list
+        List<PublishArticleData> rankList = contentService.getWaitingSort(planAccount);
+        if (CollectionUtils.isEmpty(rankList)) {
             return;
         }
-        for (PlanAccount planAccount : planAccounts) {
-            boolean flag = checkPlanAccount(planAccount);
-            if (!flag && StringUtils.isEmpty(param)) {
-                continue;
-            }
-            //获取待排序数据  数量不足会返回空list
-            List<PublishArticleData> rankList = contentService.getWaitingSort(planAccount);
+        if (StringUtils.isNotEmpty(planAccount.getSortStrategy())) {
+            rankList = contentService.getSortList(planAccount);
             if (CollectionUtils.isEmpty(rankList)) {
+                return;
+            }
+        }
+        //文章处理  小程序查询和发布
+        List<PushContentParam> pushContentList = new ArrayList<>();
+        List<Long> sendIds = new ArrayList<>();
+        for (PublishArticleData publishArticleData : rankList) {
+            PushContentParam contentParam = new PushContentParam();
+            PublishContent publishContent = publicContentService.getPublishContent(planAccount, publishArticleData);
+            if (publishContent == null) {
                 continue;
             }
-            if (StringUtils.isNotEmpty(planAccount.getSortStrategy())) {
-                rankList = contentService.getSortList(planAccount);
-                if (CollectionUtils.isEmpty(rankList)) {
+            //获取小程序
+            List<PublishMiniprogram> publishMiniprogramList = publicContentService.getPublishMiniprograms(publishArticleData);
+            //不存在则重新生成
+            if (CollectionUtils.isEmpty(publishMiniprogramList)) {
+                MatchVideo matchVideo = contentService.getMatchVideo(publishContent, planAccount);
+                if (matchVideo == null) {
                     continue;
                 }
-            }
-            //文章处理  小程序查询和发布
-            List<PushContentParam> pushContentList = new ArrayList<>();
-            List<Long> sendIds = new ArrayList<>();
-            for (PublishArticleData publishArticleData : rankList) {
-                PushContentParam contentParam = new PushContentParam();
-                PublishContent publishContent = publicContentService.getPublishContent(planAccount, publishArticleData);
-                if (publishContent == null) {
+                List<VideoDetail> videoDetails = contentService.getPublishVideoDetail(publishContent, planAccount, matchVideo);
+                log.info("publishContentId={}, videoDetails={}", publishContent.getId(), videoDetails);
+                if (CollectionUtils.isEmpty(videoDetails)) {
                     continue;
                 }
-                //获取小程序
-                List<PublishMiniprogram> publishMiniprogramList = publicContentService.getPublishMiniprograms(publishArticleData);
-                //不存在则重新生成
-                if (CollectionUtils.isEmpty(publishMiniprogramList)) {
-                    MatchVideo matchVideo = contentService.getMatchVideo(publishContent, planAccount);
-                    if (matchVideo == null) {
-                        continue;
-                    }
-                    List<VideoDetail> videoDetails = contentService.getPublishVideoDetail(publishContent, planAccount, matchVideo);
-                    log.info("publishContentId={}, videoDetails={}", publishContent.getId(), videoDetails);
-                    if (CollectionUtils.isEmpty(videoDetails)) {
-                        continue;
-                    }
-                    //获取小程序卡片  判断封面是否可用
-                    List<PublishMiniprogramBo> publishMiniprogramBoList = cardService.generateCards(videoDetails, planAccount, publishContent);
-                    if (CollectionUtils.isEmpty(publishMiniprogramBoList) && publishMiniprogramBoList.size() < 2) {
-                        continue;
-                    }
-                    for (PublishMiniprogramBo publishMiniprogramBo : publishMiniprogramBoList) {
-                        //插入rootSource
-                        rootSourceService.addRootSource(publishMiniprogramBo, planAccount, publishArticleData);
-
-                        //拷贝小程序BO到PO中 写入数据库
-                        PublishMiniprogram publishMiniprogram = new PublishMiniprogram();
-                        BeanUtils.copyProperties(publishMiniprogramBo, publishMiniprogram);
-                        publishMiniprogramMapper.insertSelective(publishMiniprogram);
-                        publishMiniprogramList.add(publishMiniprogram);
-                    }
-                    contentService.updateMatchContent(publishContent, publishMiniprogramBoList, matchVideo);
+                //获取小程序卡片  判断封面是否可用
+                List<PublishMiniprogramBo> publishMiniprogramBoList = cardService.generateCards(videoDetails, planAccount, publishContent);
+                if (CollectionUtils.isEmpty(publishMiniprogramBoList) && publishMiniprogramBoList.size() < 2) {
+                    continue;
                 }
-                sendIds.add(publishContent.getId());
-                log.info("publishMiniprogramList={}", publishMiniprogramList);
-                List<PublishMiniprogramParam> publishCardList = getPublishCardList(publishMiniprogramList);
-                contentParam.setPublishContentId(publishContent.getPublishContentId());
-                contentParam.setMiniprogramCardList(publishCardList);
-                pushContentList.add(contentParam);
-            }
-            CreatePushTaskParam gzhPushParam = getCreatePushTaskParam(planAccount, pushContentList);
-            if (gzhPushParam == null) {
-                continue;
-            }
-            log.info("gzhPushParam={}", gzhPushParam);
-            String pushId = aigcService.createPushTask(gzhPushParam);
-            log.info("pushId = {}", pushId);
-            if (StringUtils.isNotEmpty(pushId)) {
-                publicContentService.updatePublishContentPushId(sendIds, pushId);
+                for (PublishMiniprogramBo publishMiniprogramBo : publishMiniprogramBoList) {
+                    //插入rootSource
+                    rootSourceService.addRootSource(publishMiniprogramBo, planAccount, publishArticleData);
+
+                    //拷贝小程序BO到PO中 写入数据库
+                    PublishMiniprogram publishMiniprogram = new PublishMiniprogram();
+                    BeanUtils.copyProperties(publishMiniprogramBo, publishMiniprogram);
+                    publishMiniprogramMapper.insertSelective(publishMiniprogram);
+                    publishMiniprogramList.add(publishMiniprogram);
+                }
+                contentService.updateMatchContent(publishContent, publishMiniprogramBoList, matchVideo);
             }
+            sendIds.add(publishContent.getId());
+            log.info("publishMiniprogramList={}", publishMiniprogramList);
+            List<PublishMiniprogramParam> publishCardList = getPublishCardList(publishMiniprogramList);
+            contentParam.setPublishContentId(publishContent.getPublishContentId());
+            contentParam.setMiniprogramCardList(publishCardList);
+            pushContentList.add(contentParam);
+        }
+        CreatePushTaskParam gzhPushParam = getCreatePushTaskParam(planAccount, pushContentList);
+        if (gzhPushParam == null) {
+            return;
+        }
+        log.info("gzhPushParam={}", gzhPushParam);
+        String pushId = aigcService.createPushTask(gzhPushParam);
+        log.info("pushId = {}", pushId);
+        if (StringUtils.isNotEmpty(pushId)) {
+            publicContentService.updatePublishContentPushId(sendIds, pushId);
         }
     }