|
@@ -20,6 +20,7 @@ import org.springframework.transaction.annotation.Transactional;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.UUID;
|
|
|
import java.util.concurrent.*;
|
|
@@ -61,12 +62,19 @@ public class MatchVideoServiceImpl {
|
|
|
|
|
|
|
|
|
// 定义一个阻塞队列
|
|
|
- private static final ArrayBlockingQueue<MatchVideo> matchVideoQueue = new ArrayBlockingQueue<>(100000);
|
|
|
+ private static final ArrayBlockingQueue<MatchVideo> matchKimiVideoQueue = new ArrayBlockingQueue<>(100000);
|
|
|
|
|
|
|
|
|
- private static final int size = 5;
|
|
|
+ private static final int size = 2;
|
|
|
// 定义一个线程池,设置消费线程的数量
|
|
|
- private static final ThreadPoolExecutor matchVideoPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(size);
|
|
|
+ private static final ThreadPoolExecutor matchKimiVideoPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(size);
|
|
|
+
|
|
|
+
|
|
|
+ // 定义一个阻塞队列
|
|
|
+ private static final ArrayBlockingQueue<MatchVideo> matchCrawlerVideoQueue = new ArrayBlockingQueue<>(100000);
|
|
|
+
|
|
|
+ // 定义一个线程池,设置消费线程的数量
|
|
|
+ private static final ThreadPoolExecutor matchCrawlerVideoPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(size);
|
|
|
|
|
|
@Transactional
|
|
|
public void addMatchVideo(MatchContent matchContent) {
|
|
@@ -102,23 +110,23 @@ public class MatchVideoServiceImpl {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void getMatchVideo() throws InterruptedException {
|
|
|
- if (matchVideoPoolExecutor.getCorePoolSize() - matchVideoPoolExecutor.getActiveCount() > 0) {
|
|
|
- int threadSize = matchVideoPoolExecutor.getCorePoolSize() - matchVideoPoolExecutor.getActiveCount();
|
|
|
+ public void matchKimiVideo() throws InterruptedException {
|
|
|
+ if (matchKimiVideoPoolExecutor.getCorePoolSize() - matchKimiVideoPoolExecutor.getActiveCount() > 0) {
|
|
|
+ int threadSize = matchKimiVideoPoolExecutor.getCorePoolSize() - matchKimiVideoPoolExecutor.getActiveCount();
|
|
|
log.info("threadNum={}", threadSize);
|
|
|
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
|
|
|
// 启动消费者线程
|
|
|
for (int i = 0; i < threadSize; i++) {
|
|
|
- matchVideoPoolExecutor.submit(new Thread(() -> {
|
|
|
- log.info("启动匹配小程序线程");
|
|
|
+ matchKimiVideoPoolExecutor.execute(new Thread(() -> {
|
|
|
+ log.info("启动Kimi搜索线程");
|
|
|
while (true) {
|
|
|
try {
|
|
|
// 超过 5 分钟没有数据,销毁当前线程
|
|
|
- MatchVideo matchVideo = matchVideoQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
|
|
|
+ MatchVideo matchVideo = matchKimiVideoQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
|
|
|
if (matchVideo == null) {
|
|
|
break; // 退出当前线程
|
|
|
}
|
|
|
- processMatchContent(matchVideo);
|
|
|
+ processKimiMatchContent(matchVideo);
|
|
|
} catch (InterruptedException e) {
|
|
|
throw new RuntimeException(e);
|
|
|
}
|
|
@@ -127,22 +135,19 @@ public class MatchVideoServiceImpl {
|
|
|
countDownLatch.countDown();
|
|
|
}));
|
|
|
}
|
|
|
- List<Integer> targetStatus = new ArrayList<Integer>() {{
|
|
|
- add(0);
|
|
|
- add(1);
|
|
|
- add(2);
|
|
|
- }};
|
|
|
+ List<Integer> targetStatus = Arrays.asList(0, 1, 2);
|
|
|
List<MatchVideo> matchVideos;
|
|
|
//循环增加游标位置,通过主键索引过滤已经处理的数据
|
|
|
do {
|
|
|
Integer id = (Integer) redisTemplate.opsForValue().get("last_match_video_id");
|
|
|
if (id == null) {
|
|
|
+ redisTemplate.opsForValue().set("last_match_video_id", 0);
|
|
|
id = 0;
|
|
|
}
|
|
|
MatchVideoExample example = new MatchVideoExample();
|
|
|
example.createCriteria().andIdGreaterThan(id);
|
|
|
example.setOrderByClause("id asc");
|
|
|
- Page page = new Page<>();
|
|
|
+ Page<Object> page = new Page<>();
|
|
|
page.setCurrentPage(1);
|
|
|
page.setPageSize(5000);
|
|
|
example.setPage(page);
|
|
@@ -157,6 +162,8 @@ public class MatchVideoServiceImpl {
|
|
|
if (flag) {
|
|
|
Integer lastId = matchVideos.get(matchVideos.size() - 1).getId();
|
|
|
redisTemplate.opsForValue().set("last_match_video_id", lastId);
|
|
|
+ } else {
|
|
|
+ break;
|
|
|
}
|
|
|
} while (CollectionUtils.isEmpty(matchVideos));
|
|
|
|
|
@@ -166,51 +173,104 @@ public class MatchVideoServiceImpl {
|
|
|
if (id == null) {
|
|
|
break;
|
|
|
}
|
|
|
-
|
|
|
int pageSize = 5000;
|
|
|
MatchVideoExample example = new MatchVideoExample();
|
|
|
- example.createCriteria().andIdGreaterThan(id).andContentStatusIn(targetStatus);
|
|
|
+ example.createCriteria().andIdGreaterThan(id).andContentStatusEqualTo(0);
|
|
|
example.setOrderByClause("id asc");
|
|
|
- Page page = new Page<>();
|
|
|
+ Page<Object> page = new Page<>();
|
|
|
page.setCurrentPage(pageNum);
|
|
|
page.setPageSize(pageSize);
|
|
|
example.setPage(page);
|
|
|
matchVideos = matchVideoMapper.selectByExample(example);
|
|
|
- matchVideoQueue.addAll(matchVideos);
|
|
|
+ matchKimiVideoQueue.addAll(matchVideos);
|
|
|
pageNum++;
|
|
|
} while (CollectionUtils.isEmpty(matchVideos));
|
|
|
countDownLatch.await();
|
|
|
-
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
- public void processMatchContent(MatchVideo matchVideo) {
|
|
|
- if (matchVideo.getContentStatus() == 0) {
|
|
|
- //1.执行kimi任务
|
|
|
- LongArticlesText kimiText = kimiService.getKimiText(matchVideo.getContentId());
|
|
|
- if (kimiText == null) {
|
|
|
- //TODO 查询信息重新生成kimi信息
|
|
|
- return;
|
|
|
+ public void processKimiMatchContent(MatchVideo matchVideo) {
|
|
|
+ if (matchVideo.getContentStatus() != 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //1.执行kimi任务
|
|
|
+ LongArticlesText kimiText = kimiService.getKimiText(matchVideo.getContentId());
|
|
|
+ if (kimiText == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //执行kimi任务
|
|
|
+ if (kimiText.getKimiStatus() == 0) {
|
|
|
+ String lockKey = String.format(KIMI_LOCK_KEY, matchVideo.getContentId());
|
|
|
+ String lockValue = UUID.randomUUID().toString();
|
|
|
+ boolean lock = redisLock.tryLock(lockKey, lockValue, 300, TimeUnit.SECONDS);
|
|
|
+ if (lock) {
|
|
|
+ boolean res = kimiService.updateKimiContent(matchVideo.getContentId());
|
|
|
+ if (res) {
|
|
|
+ updateStatus(matchVideo.getId(), 1);
|
|
|
+ }
|
|
|
+ redisLock.unlock(lockKey, lockValue);
|
|
|
}
|
|
|
- //执行kimi任务
|
|
|
- if (kimiText.getKimiStatus() == 0) {
|
|
|
- String lockKey = String.format(KIMI_LOCK_KEY, matchVideo.getContentId());
|
|
|
- String lockValue = UUID.randomUUID().toString();
|
|
|
- boolean lock = redisLock.tryLock(lockKey, lockValue, 300, TimeUnit.SECONDS);
|
|
|
- if (lock) {
|
|
|
- kimiText = kimiService.getAndUpdateContent(matchVideo.getContentId());
|
|
|
- redisLock.unlock(lockKey, lockValue);
|
|
|
- if (kimiText == null) {
|
|
|
- return;
|
|
|
+ } else {
|
|
|
+ //更新状态为kimi执行完成
|
|
|
+ updateStatus(matchVideo.getId(), 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public void matchCrawlerVideo() throws InterruptedException {
|
|
|
+ if (matchCrawlerVideoPoolExecutor.getCorePoolSize() - matchCrawlerVideoPoolExecutor.getActiveCount() > 0) {
|
|
|
+ int threadSize = matchCrawlerVideoPoolExecutor.getCorePoolSize() - matchCrawlerVideoPoolExecutor.getActiveCount();
|
|
|
+ log.info("threadNum={}", threadSize);
|
|
|
+ CountDownLatch countDownLatch = new CountDownLatch(threadSize);
|
|
|
+ // 启动消费者线程
|
|
|
+ for (int i = 0; i < threadSize; i++) {
|
|
|
+ matchCrawlerVideoPoolExecutor.execute(new Thread(() -> {
|
|
|
+ log.info("启动匹配小程序线程");
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ // 超过 5 分钟没有数据,销毁当前线程
|
|
|
+ MatchVideo matchVideo = matchCrawlerVideoQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
|
|
|
+ if (matchVideo == null) {
|
|
|
+ break; // 退出当前线程
|
|
|
+ }
|
|
|
+ processCrawlerMatchContent(matchVideo);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
}
|
|
|
- } else {
|
|
|
- return;
|
|
|
- }
|
|
|
- } else {
|
|
|
- //更新状态为kimi执行完成
|
|
|
- updateStatus(matchVideo.getId(), 1);
|
|
|
+ log.info("启动匹配小程序线程结束");
|
|
|
+ countDownLatch.countDown();
|
|
|
+ }));
|
|
|
}
|
|
|
+ int pageNum = 1;
|
|
|
+ List<MatchVideo> matchVideos;
|
|
|
+ List<Integer> status = Arrays.asList(1, 2);
|
|
|
+ do {
|
|
|
+ Integer id = (Integer) redisTemplate.opsForValue().get("last_match_video_id");
|
|
|
+ if (id == null) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ int pageSize = 5000;
|
|
|
+ MatchVideoExample example = new MatchVideoExample();
|
|
|
+ example.createCriteria().andIdGreaterThan(id).andContentStatusIn(status);
|
|
|
+ example.setOrderByClause("id asc");
|
|
|
+ Page<Object> page = new Page<>();
|
|
|
+ page.setCurrentPage(pageNum);
|
|
|
+ page.setPageSize(pageSize);
|
|
|
+ example.setPage(page);
|
|
|
+ matchVideos = matchVideoMapper.selectByExample(example);
|
|
|
+ matchKimiVideoQueue.addAll(matchVideos);
|
|
|
+ pageNum++;
|
|
|
+ } while (CollectionUtils.isEmpty(matchVideos));
|
|
|
+ countDownLatch.await();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public void processCrawlerMatchContent(MatchVideo matchVideo) {
|
|
|
+ if (matchVideo.getContentStatus() != 1 || matchVideo.getContentStatus() != 2) {
|
|
|
+ return;
|
|
|
}
|
|
|
boolean existCrawlerVideo = crawlerVideoService.existCrawlerVideo(matchVideo.getContentId());
|
|
|
if (!existCrawlerVideo) {
|