|
@@ -1,28 +1,33 @@
|
|
|
package com.tzld.piaoquan.longarticle.service.local.impl;
|
|
|
|
|
|
-import cn.hutool.json.JSONObject;
|
|
|
-import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.tzld.piaoquan.longarticle.common.enums.ContentStatusEnum;
|
|
|
+import com.tzld.piaoquan.longarticle.common.enums.SourceTypesEnum;
|
|
|
import com.tzld.piaoquan.longarticle.component.RedisLock;
|
|
|
-import com.tzld.piaoquan.longarticle.dao.mapper.CrawlerVideoMapper;
|
|
|
import com.tzld.piaoquan.longarticle.dao.mapper.LongArticlesTextMapper;
|
|
|
import com.tzld.piaoquan.longarticle.dao.mapper.MatchVideoMapper;
|
|
|
import com.tzld.piaoquan.longarticle.model.bo.MatchContent;
|
|
|
-import com.tzld.piaoquan.longarticle.model.po.*;
|
|
|
-import com.tzld.piaoquan.longarticle.model.vo.MatchVideoVo;
|
|
|
+import com.tzld.piaoquan.longarticle.model.dto.MiniprogramCardRequest;
|
|
|
+import com.tzld.piaoquan.longarticle.model.po.LongArticlesText;
|
|
|
+import com.tzld.piaoquan.longarticle.model.po.LongArticlesTextExample;
|
|
|
+import com.tzld.piaoquan.longarticle.model.po.MatchVideo;
|
|
|
+import com.tzld.piaoquan.longarticle.model.po.MatchVideoExample;
|
|
|
+import com.tzld.piaoquan.longarticle.model.vo.MatchMiniprogramStatusParam;
|
|
|
import com.tzld.piaoquan.longarticle.service.local.KimiService;
|
|
|
-import com.tzld.piaoquan.longarticle.utils.*;
|
|
|
-import com.tzld.piaoquan.longarticle.utils.other.*;
|
|
|
import com.tzld.piaoquan.longarticle.utils.page.Page;
|
|
|
-import org.apache.commons.lang3.StringUtils;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
-import java.util.*;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Objects;
|
|
|
+import java.util.UUID;
|
|
|
+import java.util.concurrent.*;
|
|
|
|
|
|
+@Slf4j
|
|
|
@Service
|
|
|
public class MatchVideoServiceImpl {
|
|
|
|
|
@@ -50,6 +55,15 @@ public class MatchVideoServiceImpl {
|
|
|
@Autowired
|
|
|
private RedisTemplate<String, Object> redisTemplate;
|
|
|
|
|
|
+
|
|
|
+ // 定义一个阻塞队列
|
|
|
+ private static final ArrayBlockingQueue<MatchVideo> matchVideoQueue = new ArrayBlockingQueue<>(100000);
|
|
|
+
|
|
|
+
|
|
|
+ private static final int size = 5;
|
|
|
+ // 定义一个线程池,设置消费线程的数量
|
|
|
+ private static final ThreadPoolExecutor matchVideoPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(size);
|
|
|
+
|
|
|
@Transactional
|
|
|
public void addMatchVideo(MatchContent matchContent) {
|
|
|
MatchVideoExample example = new MatchVideoExample();
|
|
@@ -85,12 +99,39 @@ public class MatchVideoServiceImpl {
|
|
|
}
|
|
|
|
|
|
public void getMatchVideo() {
|
|
|
+ if (matchVideoPoolExecutor.getCorePoolSize() - matchVideoPoolExecutor.getActiveCount() > 0) {
|
|
|
+ int threadSize = matchVideoPoolExecutor.getCorePoolSize() - matchVideoPoolExecutor.getActiveCount();
|
|
|
+ log.info("threadNum={}", threadSize);
|
|
|
+ CountDownLatch countDownLatch = new CountDownLatch(threadSize);
|
|
|
+ // 启动消费者线程
|
|
|
+ for (int i = 0; i < threadSize; i++) {
|
|
|
+ matchVideoPoolExecutor.submit(new Thread(() -> {
|
|
|
+ log.info("启动匹配小程序线程");
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ // 超过 5 分钟没有数据,销毁当前线程
|
|
|
+ MatchVideo matchVideo = matchVideoQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
|
|
|
+ if (matchVideo == null) {
|
|
|
+ break; // 退出当前线程
|
|
|
+ }
|
|
|
+ processMatchContent(matchVideo);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.info("启动匹配小程序线程结束");
|
|
|
+ countDownLatch.countDown();
|
|
|
+ }));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
List<Integer> targetStatus = new ArrayList<Integer>() {{
|
|
|
add(0);
|
|
|
add(1);
|
|
|
add(2);
|
|
|
}};
|
|
|
- List<MatchVideo> matchVideos = null;
|
|
|
+ List<MatchVideo> matchVideos;
|
|
|
//循环增加游标位置,通过主键索引过滤已经处理的数据
|
|
|
do {
|
|
|
Integer id = (Integer) redisTemplate.opsForValue().get("last_match_video_id");
|
|
@@ -134,9 +175,7 @@ public class MatchVideoServiceImpl {
|
|
|
page.setPageSize(pageSize);
|
|
|
example.setPage(page);
|
|
|
matchVideos = matchVideoMapper.selectByExample(example);
|
|
|
- for (MatchVideo matchVideo : matchVideos) {
|
|
|
- //TODO 加入等待队列 多线程处理
|
|
|
- }
|
|
|
+ matchVideoQueue.addAll(matchVideos);
|
|
|
pageNum++;
|
|
|
} while (CollectionUtils.isEmpty(matchVideos));
|
|
|
|