Browse Source

待发布文章提前拉取 并发

wangyunpeng 1 tháng trước cách đây
mục cha
commit
e21eb93852

+ 3 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/mapper/aigc/PublishContentMapper.java

@@ -2,6 +2,7 @@ package com.tzld.longarticle.recommend.server.mapper.aigc;
 
 import com.tzld.longarticle.recommend.server.model.dto.*;
 import com.tzld.longarticle.recommend.server.model.dto.aigc.PublishAccountRemarkDTO;
+import com.tzld.longarticle.recommend.server.model.dto.aigc.PublishPlanAccountDTO;
 import com.tzld.longarticle.recommend.server.model.entity.aigc.PublishAccount;
 import com.tzld.longarticle.recommend.server.model.entity.aigc.PublishContent;
 import com.tzld.longarticle.recommend.server.model.entity.aigc.PublishPlanMiniprogramTask;
@@ -25,6 +26,8 @@ public interface PublishContentMapper {
 
     List<PublishAccount> getPublishAccounts(String planId, Long todayStart);
 
+    List<PublishPlanAccountDTO> getPublishPlanAccounts(List<String> planIds);
+
     List<ProduceContentDTO> getSourceProduceContentByTitles(List<String> titleList);
 
     List<AccountTypeFansDTO> getAccountTypeFans();

+ 9 - 0
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/model/dto/aigc/PublishPlanAccountDTO.java

@@ -0,0 +1,9 @@
+package com.tzld.longarticle.recommend.server.model.dto.aigc;
+
+import lombok.Data;
+
+@Data
+public class PublishPlanAccountDTO {
+    private String planId;
+    private String accountId;
+}

+ 1 - 1
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/repository/aigc/PublishPlanRepository.java

@@ -13,5 +13,5 @@ public interface PublishPlanRepository extends JpaRepository<PublishPlan, String
 
     PublishPlan getById(String planId);
 
-    List<PublishPlan> getByChannelAndPlanStatus(Integer channel, Integer planStatus);
+    List<PublishPlan> getByChannelAndContentModalAndPlanTypeAndPlanStatus(Integer channel, Integer contentModal, Integer planType, Integer planStatus);
 }

+ 50 - 35
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/XxlJobService.java

@@ -23,6 +23,7 @@ import com.tzld.longarticle.recommend.server.mapper.growth.NewPushMessageCallbac
 import com.tzld.longarticle.recommend.server.mapper.longArticle.LongArticleBaseMapper;
 import com.tzld.longarticle.recommend.server.model.dto.*;
 import com.tzld.longarticle.recommend.server.model.dto.aigc.BadCrawlerAccountDTO;
+import com.tzld.longarticle.recommend.server.model.dto.aigc.PublishPlanAccountDTO;
 import com.tzld.longarticle.recommend.server.model.entity.aigc.*;
 import com.tzld.longarticle.recommend.server.model.entity.crawler.AccountAvgInfo;
 import com.tzld.longarticle.recommend.server.model.entity.crawler.GetOffVideoCrawler;
@@ -977,44 +978,58 @@ public class XxlJobService {
     @XxlJob("syncGzhWaitingPublishContent")
     public ReturnT<String> syncGzhWaitingPublishContent(String param) {
         Long now = System.currentTimeMillis();
-        List<PublishPlan> planList = publishPlanRepository.getByChannelAndPlanStatus(ChannelEnum.wx.getVal(), 1);
-        for (PublishPlan plan : planList) {
-            List<PublishAccount> accountList = publishContentMapper.getPublishAccounts(plan.getId(), null);
-            for (PublishAccount account : accountList) {
-                List<Content> contentList = aigcWaitingPublishContentService.getAllContent(plan.getId(), account.getId());
-                if (CollectionUtil.isEmpty(contentList)) {
-                    continue;
-                }
-                List<String> contentIds = contentList.stream().map(Content::getId).collect(Collectors.toList());
-                // 不存在状态置0
-                longArticleBaseMapper.updatePublishContentGzhWaitingStatus(plan.getId(), account.getId(), contentIds, 0, now);
-                List<PublishContentGzhWaiting> existList = publishContentGzhWaitingRepository.getByIdIn(contentIds);
-                List<String> existContentIds = existList.stream().map(PublishContentGzhWaiting::getId).collect(Collectors.toList());
-                Map<String, PublishContentGzhWaiting> existMap = existList.stream().collect(
-                        Collectors.toMap(PublishContentGzhWaiting::getId, Function.identity()));
-                List<PublishContentGzhWaiting> saveList = new ArrayList<>();
-                for (Content content : contentList) {
-                    // 已存在更新
-                    if (existContentIds.contains(content.getId())) {
-                        PublishContentGzhWaiting existItem = existMap.get(content.getId());
-                        if (existItem.getStatus().equals(0)) {
-                            existItem.setStatus(1);
-                            existItem.setUpdateTimestamp(now);
-                            publishContentGzhWaitingRepository.save(existItem);
+        List<PublishPlan> planList = publishPlanRepository.getByChannelAndContentModalAndPlanTypeAndPlanStatus(ChannelEnum.wx.getVal(),
+                3, 1, 1);
+        List<String> planIds = planList.stream().map(PublishPlan::getId).collect(Collectors.toList());
+        List<PublishPlanAccountDTO> accountList = publishContentMapper.getPublishPlanAccounts(planIds);
+        Collections.shuffle(accountList);
+        CountDownLatch cdl = new CountDownLatch(accountList.size());
+        for (PublishPlanAccountDTO account : accountList) {
+            thread.submit(() -> {
+                try {
+                    List<Content> contentList = aigcWaitingPublishContentService.getAllContent(account.getPlanId(), account.getAccountId());
+                    if (CollectionUtil.isEmpty(contentList)) {
+                        return;
+                    }
+                    List<String> contentIds = contentList.stream().map(Content::getId).collect(Collectors.toList());
+                    // 不存在状态置0
+                    longArticleBaseMapper.updatePublishContentGzhWaitingStatus(account.getPlanId(), account.getAccountId(), contentIds, 0, now);
+                    List<PublishContentGzhWaiting> existList = publishContentGzhWaitingRepository.getByIdIn(contentIds);
+                    List<String> existContentIds = existList.stream().map(PublishContentGzhWaiting::getId).collect(Collectors.toList());
+                    Map<String, PublishContentGzhWaiting> existMap = existList.stream().collect(
+                            Collectors.toMap(PublishContentGzhWaiting::getId, Function.identity()));
+                    List<PublishContentGzhWaiting> saveList = new ArrayList<>();
+                    for (Content content : contentList) {
+                        // 已存在更新
+                        if (existContentIds.contains(content.getId())) {
+                            PublishContentGzhWaiting existItem = existMap.get(content.getId());
+                            if (existItem.getStatus().equals(0)) {
+                                existItem.setStatus(1);
+                                existItem.setUpdateTimestamp(now);
+                                publishContentGzhWaitingRepository.save(existItem);
+                            }
+//                              setPublishContentValue(existItem, content, plan.getId(), account.getId(), now);
+//                              publishContentGzhWaitingRepository.save(existItem);
+                            continue;
                         }
-//                        setPublishContentValue(existItem, content, plan.getId(), account.getId(), now);
-//                        publishContentGzhWaitingRepository.save(existItem);
-                        continue;
+                        // 新增
+                        PublishContentGzhWaiting item = new PublishContentGzhWaiting();
+                        setPublishContentValue(item, content, account.getPlanId(), account.getAccountId(), now);
+                        saveList.add(item);
                     }
-                    // 新增
-                    PublishContentGzhWaiting item = new PublishContentGzhWaiting();
-                    setPublishContentValue(item, content, plan.getId(), account.getId(), now);
-                    saveList.add(item);
-                }
-                if (CollectionUtil.isNotEmpty(saveList)) {
-                    longArticleBaseMapper.batchInsertPublishContentGzhWaiting(saveList);
+                    if (CollectionUtil.isNotEmpty(saveList)) {
+                        longArticleBaseMapper.batchInsertPublishContentGzhWaiting(saveList);
+                    }
+                    log.info("syncGzhWaitingPublishContent success planId: {} accountId: {}", account.getPlanId(), account.getAccountId());
+                } finally {
+                    cdl.countDown();
                 }
-            }
+            });
+        }
+        try {
+            cdl.await();
+        } catch (InterruptedException e) {
+            log.error("syncGzhWaitingPublishContent error", e);
         }
         return ReturnT.SUCCESS;
     }

+ 12 - 0
long-article-recommend-service/src/main/resources/mapper/aigc/PublishContentMapper.xml

@@ -63,6 +63,18 @@
             and content.id is null
         </if>
     </select>
+
+    <select id="getPublishPlanAccounts"
+            resultType="com.tzld.longarticle.recommend.server.model.dto.aigc.PublishPlanAccountDTO">
+        SELECT planAccount.plan_id, planAccount.account_id
+        FROM publish_plan_account planAccount
+        WHERE planAccount.plan_id in
+        <foreach collection="planIds" item="item" open="(" close=")" separator=",">
+            #{item}
+        </foreach>
+        and planAccount.publish_open_flag = 1
+    </select>
+
     <select id="getSourceProduceContentByTitles"
             resultType="com.tzld.longarticle.recommend.server.model.dto.ProduceContentDTO">
         select record.plan_exe_id as contentId,