|
@@ -4,6 +4,8 @@ import cn.hutool.core.collection.CollectionUtil;
|
|
|
import com.alibaba.fastjson.JSONArray;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.google.common.collect.Lists;
|
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
+import com.tzld.longarticle.recommend.server.common.CommonThreadPoolExecutor;
|
|
|
import com.tzld.longarticle.recommend.server.common.ThreadPoolFactory;
|
|
|
import com.tzld.longarticle.recommend.server.mapper.crawler.CrawlerBaseMapper;
|
|
|
import com.tzld.longarticle.recommend.server.mapper.longArticle.LongArticleBaseMapper;
|
|
@@ -16,7 +18,7 @@ import org.springframework.util.StringUtils;
|
|
|
|
|
|
import java.net.URLDecoder;
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.*;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
@Service
|
|
@@ -261,10 +263,14 @@ public class DataFlushService {
|
|
|
String productionPath = fromJSON.getString("productionPath");
|
|
|
String uid = getParamFromPath(productionPath, "su");
|
|
|
String videoId = getParamFromPath(productionPath, "id");
|
|
|
+ String rootSourceId = getParamFromPath(productionPath, "rootSourceId");
|
|
|
jsonObject.put("uid", uid);
|
|
|
if (StringUtils.hasText(videoId)) {
|
|
|
jsonObject.put("videoId", Long.valueOf(videoId));
|
|
|
}
|
|
|
+ if (StringUtils.hasText(rootSourceId)) {
|
|
|
+ jsonObject.put("rootSourceId", rootSourceId);
|
|
|
+ }
|
|
|
return jsonObject;
|
|
|
}
|
|
|
|
|
@@ -303,4 +309,66 @@ public class DataFlushService {
|
|
|
}
|
|
|
log.info("flushLongArticlesText updateNum:{}", updateNum);
|
|
|
}
|
|
|
+
|
|
|
+ private final static ExecutorService batchPool = new CommonThreadPoolExecutor(
|
|
|
+ 5,
|
|
|
+ 5,
|
|
|
+ 0L, TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>(10000),
|
|
|
+ new ThreadFactoryBuilder().setNameFormat("batch-%d").build(),
|
|
|
+ new ThreadPoolExecutor.AbortPolicy());
|
|
|
+
|
|
|
+ public void updateLongArticleMatchVideosResponse(Long id) {
|
|
|
+ int pageSize = 1000;
|
|
|
+ if (Objects.isNull(id)) {
|
|
|
+ id = 0L;
|
|
|
+ }
|
|
|
+ int count = longArticleBaseMapper.countNeedMatchVideos(id);
|
|
|
+ CountDownLatch cdl = new CountDownLatch((count / 1000) + 1);
|
|
|
+ while (true) {
|
|
|
+ List<LongArticlesMatchVideos> matchVideosList = longArticleBaseMapper.getNeedMatchVideos(id, pageSize);
|
|
|
+ if (CollectionUtil.isEmpty(matchVideosList)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ id = matchVideosList.stream().mapToLong(LongArticlesMatchVideos::getId).max().getAsLong();
|
|
|
+ Long finalId = id;
|
|
|
+ batchPool.submit(() -> {
|
|
|
+ try {
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
+ List<String> traceIds = matchVideosList.stream().map(LongArticlesMatchVideos::getTraceId)
|
|
|
+ .distinct().collect(Collectors.toList());
|
|
|
+ List<LongArticlesVideo> longArticlesVideoList = crawlerBaseMapper.getLongArticlesVideo(traceIds);
|
|
|
+ Map<String, LongArticlesVideo> longArticlesVideoMap = longArticlesVideoList.stream().collect(
|
|
|
+ Collectors.toMap(LongArticlesVideo::getTraceId, o -> o, (existing, replacement) -> replacement));
|
|
|
+ CountDownLatch countDownLatch = new CountDownLatch(matchVideosList.size());
|
|
|
+ for (LongArticlesMatchVideos longArticlesMatchVideos : matchVideosList) {
|
|
|
+ pool.submit(() -> {
|
|
|
+ try {
|
|
|
+ LongArticlesVideo longArticlesVideo = longArticlesVideoMap.get(longArticlesMatchVideos.getTraceId());
|
|
|
+ if (Objects.nonNull(longArticlesVideo)) {
|
|
|
+ longArticlesMatchVideos.setResponse(getLongArticleVideoResponse(longArticlesVideo));
|
|
|
+ longArticleBaseMapper.updateLongArticleMatchVideosResponse(longArticlesMatchVideos);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ countDownLatch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ countDownLatch.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("updateLongArticleMatchVideosResponse InterruptedException", e);
|
|
|
+ }
|
|
|
+ log.info("updateLongArticleMatchVideosResponse end id:{}, cost:{}", finalId, System.currentTimeMillis() - start);
|
|
|
+ } finally {
|
|
|
+ cdl.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ cdl.await();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("updateLongArticleMatchVideosResponse InterruptedException", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|