Forráskód Böngészése

RootSourceId处理增加并发

wangyunpeng 7 hónapja
szülő
commit
f46380738f

+ 20 - 3
long-article-recommend-service/src/main/java/com/tzld/longarticle/recommend/server/service/XxlJobService.java

@@ -6,11 +6,11 @@ import com.alibaba.fastjson.JSONObject;
 import com.aliyun.odps.data.Record;
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.google.common.collect.Lists;
+import com.tzld.longarticle.recommend.server.common.ThreadPoolFactory;
 import com.tzld.longarticle.recommend.server.common.enums.recommend.AccountBusinessTypeEnum;
 import com.tzld.longarticle.recommend.server.common.enums.recommend.FeishuRobotIdEnum;
 import com.tzld.longarticle.recommend.server.mapper.aigc.AigcBaseMapper;
 import com.tzld.longarticle.recommend.server.mapper.crawler.CrawlerBaseMapper;
-import com.tzld.longarticle.recommend.server.mapper.crawler.PushMessageCallbackMapper;
 import com.tzld.longarticle.recommend.server.mapper.growth.NewPushMessageCallbackMapper;
 import com.tzld.longarticle.recommend.server.model.dto.AccountTypeFansDTO;
 import com.tzld.longarticle.recommend.server.model.dto.NotPublishPlan;
@@ -46,6 +46,7 @@ import org.springframework.util.StringUtils;
 
 import java.time.LocalTime;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 
 import static com.tzld.longarticle.recommend.server.common.constant.TimeConstant.MILLISECOND_DAY;
@@ -142,9 +143,17 @@ public class XxlJobService {
             List<GetOffVideoCrawler> getOffVideoCrawlerList = getOffVideoCrawlerRepository.getByPublishTimeBetween(timeStamp, timeStamp + 86400);
             List<String> traceIds = getOffVideoCrawlerList.stream().map(GetOffVideoCrawler::getTraceId).distinct().collect(Collectors.toList());
             List<LongArticlesVideo> longArticlesVideoList = longArticlesVideoRepository.getByTraceIdIn(traceIds);
+            CountDownLatch cdl = new CountDownLatch(longArticlesVideoList.size());
             for (LongArticlesVideo longArticlesVideo : longArticlesVideoList) {
-                processCrawlerEachData(longArticlesVideo);
+                ThreadPoolFactory.defaultPool().submit(() -> {
+                    try {
+                        processCrawlerEachData(longArticlesVideo);
+                    } finally {
+                        cdl.countDown();
+                    }
+                });
             }
+            cdl.await();
         } catch (Exception e) {
             log.error("migrateCrawlerRootSourceId exception: {}", e.getMessage(), e);
             return ReturnT.FAIL;
@@ -227,9 +236,17 @@ public class XxlJobService {
             List<GetOffVideoArticle> getOffVideoArticleList = getOffVideoArticleRepository.getByPublishTimeBetween(timeStamp, timeStamp + 86400);
             List<String> traceIds = getOffVideoArticleList.stream().map(GetOffVideoArticle::getTraceId).distinct().collect(Collectors.toList());
             List<LongArticlesMatchVideo> longArticlesMatchVideoList = longArticlesMatchVideoRepository.getByTraceIdIn(traceIds);
+            CountDownLatch cdl = new CountDownLatch(longArticlesMatchVideoList.size());
             for (LongArticlesMatchVideo longArticlesMatchVideo : longArticlesMatchVideoList) {
-                processArticleEachData(longArticlesMatchVideo);
+                ThreadPoolFactory.defaultPool().submit(() -> {
+                    try {
+                        processArticleEachData(longArticlesMatchVideo);
+                    } finally {
+                        cdl.countDown();
+                    }
+                });
             }
+            cdl.await();
         } catch (Exception e) {
             log.error("migrateArticleRootSourceId exception: {}", e.getMessage(), e);
             return ReturnT.FAIL;