xueyiming пре 5 месеци
родитељ
комит
336a1ad0df

+ 2 - 0
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/common/enums/ContentStatusEnum.java

@@ -1,6 +1,8 @@
 package com.tzld.piaoquan.longarticle.common.enums;
 
 public enum ContentStatusEnum {
+
+    CRAWLER_SUCCESS(2, "爬取视频成功"),
     SUCCESS_3(3, "下载成功"),
     SUCCESS_4(4, "发布成功"),
     ERROR_95(95, "KIMI识别文章风险不处理"),

+ 12 - 0
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/job/MatchVideoJob.java

@@ -38,6 +38,18 @@ public class MatchVideoJob {
         return ReturnT.SUCCESS;
     }
 
+    @XxlJob("uploadCrawlerVideo")
+    public ReturnT<String> uploadCrawlerVideoJob(String param) {
+        try {
+            matchVideoService.uploadCrawlerVideo();
+        } catch (Exception e) {
+            LarkRobotUtil.sendMessage("uploadCrawlerVideoJob异常,请及时查看,@薛一鸣");
+            log.error("uploadCrawlerVideoJob error", e);
+        }
+        return ReturnT.SUCCESS;
+    }
+
+
     @XxlJob("publishCrawlerVideo")
     public ReturnT<String> publishCrawlerVideoJob(String param) {
         try {

+ 35 - 10
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/service/local/impl/CrawlerVideoServiceImpl.java

@@ -60,6 +60,7 @@ public class CrawlerVideoServiceImpl {
         if (b) {
             return true;
         }
+        //查询原contentId,如果已经爬取到视频,则不再爬取
         if (rootContentId != null) {
             CrawlerVideoExample example = new CrawlerVideoExample();
             example.createCriteria().andContentIdEqualTo(rootContentId);
@@ -67,32 +68,49 @@ public class CrawlerVideoServiceImpl {
             if (!CollectionUtils.isEmpty(crawlerVideos)) {
                 for (CrawlerVideo crawlerVideo : crawlerVideos) {
                     crawlerVideo.setContentId(contentId);
+                    crawlerVideo.setId(null);
                     crawlerVideoMapper.insertSelective(crawlerVideo);
                     return true;
                 }
             }
         }
         log.info("addCrawlerVideo contentId={} rootContentId={} kimiText={}", contentId, rootContentId, kimiText);
+        //爬虫爬取视频
+        int count = 0;
         List<CrawlerVideo> crawlerVideoList = searchVideosFromWeb(kimiText);
         if (!CollectionUtils.isEmpty(crawlerVideoList)) {
             CrawlerVideoExample example = new CrawlerVideoExample();
             example.createCriteria().andContentIdEqualTo(contentId);
             List<CrawlerVideo> crawlerVideos = crawlerVideoMapper.selectByExample(example);
-            // 使用 Lambda 表达式过滤 crawlerVideoList
-            List<CrawlerVideo> filteredList = crawlerVideoList.stream()
-                    .filter(video -> crawlerVideos.stream()
-                            .noneMatch(existingVideo ->
-                                    existingVideo.getOutVideoId().equals(video.getOutVideoId()) &&
-                                            existingVideo.getPlatform().equals(video.getPlatform())
-                            )
-                    )
-                    .collect(Collectors.toList());
+            List<CrawlerVideo> filteredList = crawlerVideoList;
+            if (!CollectionUtils.isEmpty(crawlerVideos)) {
+                count += crawlerVideos.size();
+                // 使用 Lambda 表达式过滤 crawlerVideoList 已经存在的视频
+                filteredList = crawlerVideoList.stream()
+                        .filter(video -> crawlerVideos.stream()
+                                .noneMatch(existingVideo ->
+                                        existingVideo.getOutVideoId().equals(video.getOutVideoId()) &&
+                                                existingVideo.getPlatform().equals(video.getPlatform())
+                                )
+                        )
+                        .collect(Collectors.toList());
+            }
             for (CrawlerVideo crawlerVideo : filteredList) {
                 crawlerVideo.setContentId(contentId);
                 crawlerVideo.setDownloadStatus(0);
                 crawlerVideoMapper.insertSelective(crawlerVideo);
+                count++;
             }
         }
+        return count >= MIN_NUM;
+    }
+
+
+    public boolean uploadCrawlerVideo(String contentId) {
+        boolean b = existUploadCrawlerVideo(contentId);
+        if (b) {
+            return true;
+        }
         boolean pushRes = pushOss(contentId);
         if (pushRes) {
             try {
@@ -112,13 +130,20 @@ public class CrawlerVideoServiceImpl {
         return pushRes;
     }
 
-    public boolean existCrawlerVideo(String contentId) {
+    public boolean existUploadCrawlerVideo(String contentId) {
         CrawlerVideoExample example = new CrawlerVideoExample();
         example.createCriteria().andContentIdEqualTo(contentId).andDownloadStatusEqualTo(2);
         long l = crawlerVideoMapper.countByExample(example);
         return l >= MIN_NUM;
     }
 
+    public boolean existCrawlerVideo(String contentId) {
+        CrawlerVideoExample example = new CrawlerVideoExample();
+        example.createCriteria().andContentIdEqualTo(contentId);
+        long l = crawlerVideoMapper.countByExample(example);
+        return l >= MAX_NUM;
+    }
+
 
     public List<CrawlerVideo> searchVideosFromWeb(LongArticlesText longArticlesText) {
         String articleSummary;

+ 123 - 15
long-article-server/src/main/java/com/tzld/piaoquan/longarticle/service/local/impl/MatchVideoServiceImpl.java

@@ -38,7 +38,12 @@ public class MatchVideoServiceImpl {
 
     private static final String CRAWLER_LOCK_KEY = "crawler_lock_key_%s";
 
-    private static final String CRAWLER_COUNT_KEY = "crawler_count_key_%s";
+    private static final String UPLOAD_CRAWLER_LOCK_KEY = "upload_crawler_lock_key_%s";
+
+    private static final String CRAWLER_FAIL_COUNT_KEY = "crawler_count_key_%s";
+
+    private static final String UPLOAD_CRAWLER_FAIL_COUNT_KEY = "upload_crawler_count_key_%s";
+
 
     @Autowired
     KimiService kimiService;
@@ -70,8 +75,6 @@ public class MatchVideoServiceImpl {
 
     // 定义一个阻塞队列
     private static final ArrayBlockingQueue<MatchVideo> matchKimiVideoQueue = new ArrayBlockingQueue<>(100000);
-
-
     private static final int size = 5;
     // 定义一个线程池,设置消费线程的数量
     private static final ThreadPoolExecutor matchKimiVideoPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(size);
@@ -79,10 +82,14 @@ public class MatchVideoServiceImpl {
 
     // 定义一个阻塞队列
     private static final ArrayBlockingQueue<MatchVideo> matchCrawlerVideoQueue = new ArrayBlockingQueue<>(100000);
-
     // 定义一个线程池,设置消费线程的数量
     private static final ThreadPoolExecutor matchCrawlerVideoPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(size);
 
+    // 定义一个阻塞队列
+    private static final ArrayBlockingQueue<MatchVideo> uploadCrawlerVideoQueue = new ArrayBlockingQueue<>(100000);
+    // 定义一个线程池,设置消费线程的数量
+    private static final ThreadPoolExecutor uploadCrawlerVideoPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(size);
+
     @Transactional
     public void addMatchVideo(MatchContent matchContent) {
         MatchVideoExample example = new MatchVideoExample();
@@ -156,7 +163,7 @@ public class MatchVideoServiceImpl {
                 example.setOrderByClause("id asc");
                 Page<Object> page = new Page<>();
                 page.setCurrentPage(1);
-                page.setPageSize(5000);
+                page.setPageSize(1000);
                 example.setPage(page);
                 matchVideos = matchVideoMapper.selectByExample(example);
                 boolean flag = true;
@@ -186,7 +193,7 @@ public class MatchVideoServiceImpl {
                     example.setOrderByClause("id asc");
                     Page<Object> page = new Page<>();
                     page.setCurrentPage(1);
-                    page.setPageSize(5000);
+                    page.setPageSize(1000);
                     example.setPage(page);
                     matchVideos = matchVideoMapper.selectByExample(example);
                     if (CollectionUtils.isEmpty(matchVideos)) {
@@ -255,16 +262,15 @@ public class MatchVideoServiceImpl {
                 }));
             }
             List<MatchVideo> matchVideos;
-            List<Integer> status = Arrays.asList(1, 2);
             Integer id = (Integer) redisTemplate.opsForValue().get("last_match_video_id");
             if (id != null) {
                 do {
                     MatchVideoExample example = new MatchVideoExample();
-                    example.createCriteria().andIdGreaterThan(id).andContentStatusIn(status);
+                    example.createCriteria().andIdGreaterThan(id).andContentStatusEqualTo(1);
                     example.setOrderByClause("id asc");
                     Page<Object> page = new Page<>();
                     page.setCurrentPage(1);
-                    page.setPageSize(5000);
+                    page.setPageSize(1000);
                     example.setPage(page);
                     matchVideos = matchVideoMapper.selectByExample(example);
                     if (CollectionUtils.isEmpty(matchVideos)) {
@@ -280,7 +286,7 @@ public class MatchVideoServiceImpl {
 
 
     public void processCrawlerMatchContent(MatchVideo matchVideo) {
-        if (matchVideo.getContentStatus() != 1 && matchVideo.getContentStatus() != 2) {
+        if (matchVideo.getContentStatus() != 1) {
             return;
         }
         boolean existCrawlerVideo = crawlerVideoService.existCrawlerVideo(matchVideo.getContentId());
@@ -309,10 +315,10 @@ public class MatchVideoServiceImpl {
                 String rootContentId = getRootContentId(matchVideo.getContentId());
                 boolean res = crawlerVideoService.addCrawlerVideo(matchVideo.getContentId(), rootContentId, kimiText);
                 if (res) {
-                    updateStatus(matchVideo.getId(), ContentStatusEnum.SUCCESS_3.getStatusCode());
+                    updateStatus(matchVideo.getId(), ContentStatusEnum.CRAWLER_SUCCESS.getStatusCode());
                 } else {
                     //匹配失败记录
-                    String countKey = String.format(CRAWLER_COUNT_KEY, matchVideo.getContentId());
+                    String countKey = String.format(CRAWLER_FAIL_COUNT_KEY, matchVideo.getContentId());
                     Integer count = (Integer) redisTemplate.opsForValue().get(countKey);
                     if (count != null && count >= 3) {
                         //更新状态为失败
@@ -328,8 +334,8 @@ public class MatchVideoServiceImpl {
                 redisLock.unlock(lockKey, lockValue);
             }
         } else {
-            //更新状态为etl执行完成
-            updateStatus(matchVideo.getId(), ContentStatusEnum.SUCCESS_3.getStatusCode());
+            //更新状态为爬虫执行完成
+            updateStatus(matchVideo.getId(), ContentStatusEnum.CRAWLER_SUCCESS.getStatusCode());
         }
     }
 
@@ -353,6 +359,108 @@ public class MatchVideoServiceImpl {
         return null;
     }
 
+    public void uploadCrawlerVideo() {
+        if (uploadCrawlerVideoPoolExecutor.getCorePoolSize() - uploadCrawlerVideoPoolExecutor.getActiveCount() > 0) {
+            int threadSize = uploadCrawlerVideoPoolExecutor.getCorePoolSize() - uploadCrawlerVideoPoolExecutor.getActiveCount();
+            log.info("threadNum={}", threadSize);
+            CountDownLatch countDownLatch = new CountDownLatch(threadSize);
+            // 启动消费者线程
+            for (int i = 0; i < threadSize; i++) {
+                uploadCrawlerVideoPoolExecutor.execute(new Thread(() -> {
+                    log.info("启动上传小程序视频线程");
+                    while (true) {
+                        try {
+                            // 超过 5 分钟没有数据,销毁当前线程
+                            MatchVideo matchVideo = uploadCrawlerVideoQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
+                            if (matchVideo == null) {
+                                break; // 退出当前线程
+                            }
+                            processUploadCrawlerVideo(matchVideo);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                    log.info("启动匹配小程序线程结束");
+                    countDownLatch.countDown();
+                }));
+            }
+
+
+            List<MatchVideo> matchVideos;
+            Integer id = (Integer) redisTemplate.opsForValue().get("last_match_video_id");
+            if (id != null) {
+                do {
+                    MatchVideoExample example = new MatchVideoExample();
+                    example.createCriteria().andIdGreaterThan(id)
+                            .andContentStatusEqualTo(ContentStatusEnum.CRAWLER_SUCCESS.getStatusCode());
+                    example.setOrderByClause("id asc");
+                    Page<Object> page = new Page<>();
+                    page.setCurrentPage(1);
+                    page.setPageSize(1000);
+                    example.setPage(page);
+                    matchVideos = matchVideoMapper.selectByExample(example);
+                    if (CollectionUtils.isEmpty(matchVideos)) {
+                        break;
+                    }
+                    id = matchVideos.get(matchVideos.size() - 1).getId();
+                    uploadCrawlerVideoQueue.addAll(matchVideos);
+                } while (!CollectionUtils.isEmpty(matchVideos));
+            }
+        }
+    }
+
+    public void processUploadCrawlerVideo(MatchVideo matchVideo) {
+        if (matchVideo.getContentStatus() != 2) {
+            return;
+        }
+        boolean existUploadCrawlerVideo = crawlerVideoService.existUploadCrawlerVideo(matchVideo.getContentId());
+        log.info("processUploadCrawlerVideo contentId={} existCrawlerVideo={}", matchVideo.getContentId(), existUploadCrawlerVideo);
+        if (!existUploadCrawlerVideo) {
+            //查询相同的contentId,如果已经失败,则直接更新状态为失败
+            MatchVideoExample example = new MatchVideoExample();
+            example.createCriteria().andContentIdEqualTo(matchVideo.getContentId());
+            List<MatchVideo> matchVideos = matchVideoMapper.selectByExample(example);
+            if (!CollectionUtils.isEmpty(matchVideos)) {
+                for (MatchVideo matchVideo1 : matchVideos) {
+                    if (matchVideo1.getId().equals(matchVideo.getId())) {
+                        continue;
+                    }
+                    if (ContentStatusEnum.isFail(matchVideo1.getContentStatus())) {
+                        updateStatus(matchVideo.getId(), matchVideo1.getContentStatus());
+                        return;
+                    }
+                }
+            }
+            String lockKey = String.format(UPLOAD_CRAWLER_LOCK_KEY, matchVideo.getContentId());
+            String lockValue = UUID.randomUUID().toString();
+            boolean lock = redisLock.tryLock(lockKey, lockValue, 10, TimeUnit.MINUTES);
+            if (lock) {
+                boolean res = crawlerVideoService.uploadCrawlerVideo(matchVideo.getContentId());
+                if (res) {
+                    updateStatus(matchVideo.getId(), ContentStatusEnum.SUCCESS_3.getStatusCode());
+                } else {
+                    //匹配失败记录
+                    String countKey = String.format(UPLOAD_CRAWLER_FAIL_COUNT_KEY, matchVideo.getContentId());
+                    Integer count = (Integer) redisTemplate.opsForValue().get(countKey);
+                    if (count != null && count >= 3) {
+                        //更新状态为失败
+                        updateStatus(matchVideo.getId(), ContentStatusEnum.ERROR_99.getStatusCode());
+                    } else {
+                        if (count == null) {
+                            redisTemplate.opsForValue().set(countKey, 1, 3, TimeUnit.DAYS);
+                        } else {
+                            redisTemplate.opsForValue().set(countKey, count + 1, 3, TimeUnit.DAYS);
+                        }
+                    }
+                }
+                redisLock.unlock(lockKey, lockValue);
+            }
+        } else {
+            //更新状态为etl执行完成
+            updateStatus(matchVideo.getId(), ContentStatusEnum.SUCCESS_3.getStatusCode());
+        }
+    }
+
     public void publishCrawlerVideo() {
         List<MatchVideo> matchVideos;
         Integer id = (Integer) redisTemplate.opsForValue().get("last_match_video_id");
@@ -365,7 +473,7 @@ public class MatchVideoServiceImpl {
                 example.setOrderByClause("id asc");
                 Page<Object> page = new Page<>();
                 page.setCurrentPage(1);
-                page.setPageSize(5000);
+                page.setPageSize(1000);
                 example.setPage(page);
                 matchVideos = matchVideoMapper.selectByExample(example);
                 if (CollectionUtils.isEmpty(matchVideos)) {