Bläddra i källkod

Merge branch 'wyp/0606-publishContentCache' of Server/long-article-recommend into master

wangyunpeng 1 månad sedan
förälder
incheckning
0b2bbf996d

+ 11 - 1
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/remote/aigc/AIGCWaitingPublishContentService.java

@@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.tzld.longarticle.recommend.server.model.dto.Content;
 import com.tzld.longarticle.recommend.server.model.entity.longArticle.PublishContentGzhWaiting;
 import com.tzld.longarticle.recommend.server.repository.longArticle.PublishContentGzhWaitingRepository;
+import com.tzld.longarticle.recommend.server.service.XxlJobService;
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.*;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -15,6 +16,7 @@ import javax.annotation.PostConstruct;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -27,6 +29,9 @@ public class AIGCWaitingPublishContentService {
     @Autowired
     private PublishContentGzhWaitingRepository publishContentGzhWaitingRepository;
 
+    @Autowired
+    private XxlJobService xxlJobService;
+
     private OkHttpClient client;
 
     @PostConstruct
@@ -87,7 +92,12 @@ public class AIGCWaitingPublishContentService {
     public List<Content> getAllContentByDB(String planId, String accountId) {
         List<PublishContentGzhWaiting> list = publishContentGzhWaitingRepository.getByPlanIdAndPublishAccountIdAndStatus(planId, accountId, 1);
         if (CollectionUtil.isEmpty(list)) {
-            return getAllContent(planId, accountId);
+            List<Content> contentList = getAllContent(planId, accountId);
+            if (CollectionUtil.isNotEmpty(contentList)) {
+                Long now = System.currentTimeMillis();
+                CompletableFuture.runAsync(() -> xxlJobService.savePublishContentCache(contentList, planId, accountId, now));
+            }
+            return contentList;
         }
         List<Content> result = new ArrayList<>();
         for (PublishContentGzhWaiting item : list) {

+ 47 - 33
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/XxlJobService.java

@@ -60,6 +60,7 @@ import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.dao.DuplicateKeyException;
 import org.springframework.data.domain.Page;
 import org.springframework.data.domain.PageRequest;
@@ -146,6 +147,9 @@ public class XxlJobService {
             "\"20241030070010871546586\"]}")
     private static List<String> producePlanIds;
 
+    @Value("${sync.publish.content.thread.pool.size:5}")
+    private static Integer syncPublishContentThreadPoolSize;
+
     @XxlJob("checkPublishPlan")
     public ReturnT<String> checkPublishPlan(String param) {
         Long todayStart = DateUtils.getTodayStart();
@@ -984,43 +988,17 @@ public class XxlJobService {
         List<PublishPlanAccountDTO> accountList = publishContentMapper.getPublishPlanAccounts(planIds);
         Collections.shuffle(accountList);
         CountDownLatch cdl = new CountDownLatch(accountList.size());
+
+        ExecutorService thread = new CommonThreadPoolExecutor(
+                syncPublishContentThreadPoolSize, syncPublishContentThreadPoolSize, 0L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                new ThreadFactoryBuilder().setNameFormat("syncGzhWaitingPublishContent-%d").build(),
+                new ThreadPoolExecutor.AbortPolicy());
         for (PublishPlanAccountDTO account : accountList) {
             thread.submit(() -> {
                 try {
                     List<Content> contentList = aigcWaitingPublishContentService.getAllContent(account.getPlanId(), account.getAccountId());
-                    if (CollectionUtil.isEmpty(contentList)) {
-                        return;
-                    }
-                    List<String> contentIds = contentList.stream().map(Content::getId).collect(Collectors.toList());
-                    // 不存在状态置0
-                    longArticleBaseMapper.updatePublishContentGzhWaitingStatus(account.getPlanId(), account.getAccountId(), contentIds, 0, now);
-                    List<PublishContentGzhWaiting> existList = publishContentGzhWaitingRepository.getByIdIn(contentIds);
-                    List<String> existContentIds = existList.stream().map(PublishContentGzhWaiting::getId).collect(Collectors.toList());
-                    Map<String, PublishContentGzhWaiting> existMap = existList.stream().collect(
-                            Collectors.toMap(PublishContentGzhWaiting::getId, Function.identity()));
-                    List<PublishContentGzhWaiting> saveList = new ArrayList<>();
-                    for (Content content : contentList) {
-                        // 已存在更新
-                        if (existContentIds.contains(content.getId())) {
-                            PublishContentGzhWaiting existItem = existMap.get(content.getId());
-                            if (existItem.getStatus().equals(0)) {
-                                existItem.setStatus(1);
-                                existItem.setUpdateTimestamp(now);
-                                publishContentGzhWaitingRepository.save(existItem);
-                            }
-//                              setPublishContentValue(existItem, content, plan.getId(), account.getId(), now);
-//                              publishContentGzhWaitingRepository.save(existItem);
-                            continue;
-                        }
-                        // 新增
-                        PublishContentGzhWaiting item = new PublishContentGzhWaiting();
-                        setPublishContentValue(item, content, account.getPlanId(), account.getAccountId(), now);
-                        saveList.add(item);
-                    }
-                    if (CollectionUtil.isNotEmpty(saveList)) {
-                        longArticleBaseMapper.batchInsertPublishContentGzhWaiting(saveList);
-                    }
-                    log.info("syncGzhWaitingPublishContent success planId: {} accountId: {}", account.getPlanId(), account.getAccountId());
+                    savePublishContentCache(contentList, account.getPlanId(), account.getAccountId(), now);
                 } finally {
                     cdl.countDown();
                 }
@@ -1053,4 +1031,40 @@ public class XxlJobService {
         item.setUpdateTimestamp(updateTimestamp);
     }
 
+    public void savePublishContentCache(List<Content> contentList, String planId, String accountId, Long now) {
+        if (CollectionUtil.isEmpty(contentList)) {
+            return;
+        }
+        List<String> contentIds = contentList.stream().map(Content::getId).collect(Collectors.toList());
+        // 不存在状态置0
+        longArticleBaseMapper.updatePublishContentGzhWaitingStatus(planId, accountId, contentIds, 0, now);
+        List<PublishContentGzhWaiting> existList = publishContentGzhWaitingRepository.getByIdIn(contentIds);
+        List<String> existContentIds = existList.stream().map(PublishContentGzhWaiting::getId).collect(Collectors.toList());
+        Map<String, PublishContentGzhWaiting> existMap = existList.stream().collect(
+                Collectors.toMap(PublishContentGzhWaiting::getId, Function.identity()));
+        List<PublishContentGzhWaiting> saveList = new ArrayList<>();
+        for (Content content : contentList) {
+            // 已存在更新
+            if (existContentIds.contains(content.getId())) {
+                PublishContentGzhWaiting existItem = existMap.get(content.getId());
+                if (existItem.getStatus().equals(0)) {
+                    existItem.setStatus(1);
+                    existItem.setUpdateTimestamp(now);
+                    publishContentGzhWaitingRepository.save(existItem);
+                }
+//                setPublishContentValue(existItem, content, planId, accountId, now);
+//                publishContentGzhWaitingRepository.save(existItem);
+                continue;
+            }
+            // 新增
+            PublishContentGzhWaiting item = new PublishContentGzhWaiting();
+            setPublishContentValue(item, content, planId, accountId, now);
+            saveList.add(item);
+        }
+        if (CollectionUtil.isNotEmpty(saveList)) {
+            longArticleBaseMapper.batchInsertPublishContentGzhWaiting(saveList);
+        }
+        log.info("syncGzhWaitingPublishContent success planId: {} accountId: {}", planId, accountId);
+    }
+
 }