package com.tzld.piaoquan.featurestools.job; import com.tzld.piaoquan.featurestools.dao.mapper.CreativeVideoUnderstanderMapper; import com.tzld.piaoquan.featurestools.model.po.CreativeVideoUnderstander; import com.tzld.piaoquan.featurestools.model.po.CreativeVideoUnderstanderExample; import com.tzld.piaoquan.featurestools.model.vo.VideoUnderstandParam; import com.tzld.piaoquan.featurestools.model.vo.VideoUnderstandResult; import com.tzld.piaoquan.featurestools.service.VideoUnderstandService; import com.tzld.piaoquan.featurestools.util.DateUtil; import com.tzld.piaoquan.featurestools.util.page.Page; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.Date; import java.util.List; import java.util.concurrent.*; @Slf4j @Component public class CreativeVideoUnderstandJob { private static final ThreadPoolExecutor videoPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); private static final ArrayBlockingQueue videoQueue = new ArrayBlockingQueue<>(100000); @Autowired private CreativeVideoUnderstanderMapper creativeVideoUnderstanderMapper; @Autowired private VideoUnderstandService videoUnderstandService; @XxlJob("videoUnderstanderJob") public ReturnT videoUnderstander(String param) throws InterruptedException { if (videoPoolExecutor.getCorePoolSize() - videoPoolExecutor.getActiveCount() > 0) { int threadSize = videoPoolExecutor.getCorePoolSize() - videoPoolExecutor.getActiveCount(); log.info("threadNum={}", threadSize); CountDownLatch countDownLatch = new CountDownLatch(threadSize); // 启动消费者线程 for (int i = 0; i < threadSize; i++) { videoPoolExecutor.execute(new Thread(() -> { log.info("启动视频理解线程"); while (true) { try { // 超过 5 分钟没有数据,销毁当前线程 CreativeVideoUnderstander creativeVideoUnderstander = videoQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟 log.info("videoQueue size={}", videoQueue.size()); if (creativeVideoUnderstander == null) { break; // 退出当前线程 } System.out.println("creativeVideoUnderstander=" + creativeVideoUnderstander); processCreativeVideoUnderstander(creativeVideoUnderstander); } catch (Exception e) { log.error("视频理解线程异常", e); } } log.info("视频理解线程结束"); countDownLatch.countDown(); })); } CreativeVideoUnderstanderExample example = new CreativeVideoUnderstanderExample(); example.createCriteria().andStatusEqualTo(0); long count = creativeVideoUnderstanderMapper.countByExample(example); int pageSize = 1000; int pageNum = (int) (count / pageSize) + 1; for (int i = 1; i <= pageNum; i++) { Page page = new Page<>(i, pageSize); example.setPage(page); List creativeVideoUnderstanderList = creativeVideoUnderstanderMapper.selectByExample(example); if (!CollectionUtils.isEmpty(creativeVideoUnderstanderList)) { videoQueue.addAll(creativeVideoUnderstanderList); } } countDownLatch.await(); } return ReturnT.SUCCESS; } private void processCreativeVideoUnderstander(CreativeVideoUnderstander creativeVideoUnderstander) { VideoUnderstandParam videoUnderstandParam = new VideoUnderstandParam(); videoUnderstandParam.setUrl(creativeVideoUnderstander.getVideoUrl()); // videoUnderstandParam.setFileName(creativeVideoUnderstander.getVideoFileName()); VideoUnderstandResult videoUnderstandResult = videoUnderstandService.getVideoUnderstandResult(videoUnderstandParam); System.out.println("videoUnderstandResult=" + videoUnderstandResult); if (videoUnderstandResult == null) { addRetryCount(creativeVideoUnderstander); return; } if (StringUtils.isNotEmpty(videoUnderstandResult.getError())) { log.error("videoUnderstandResult id={} error={}", creativeVideoUnderstander.getId(), videoUnderstandResult.getError()); addRetryCount(creativeVideoUnderstander); return; } if (StringUtils.isNotEmpty(videoUnderstandResult.getFileName())) { creativeVideoUnderstander.setVideoFileName(videoUnderstandResult.getFileName()); } if (StringUtils.isNotEmpty(videoUnderstandResult.getFileStatus())) { creativeVideoUnderstander.setVideoFileStatus(videoUnderstandResult.getFileStatus()); } if (StringUtils.isNotEmpty(videoUnderstandResult.getExpireTime())) { Date date = DateUtil.parseDate(videoUnderstandResult.getExpireTime()); if (date != null) { creativeVideoUnderstander.setVideoFileExpireTime(date); } } if (StringUtils.isNotEmpty(videoUnderstandResult.getUnderstanderText())) { creativeVideoUnderstander.setUnderstanderText(videoUnderstandResult.getUnderstanderText()); creativeVideoUnderstander.setStatus(1); creativeVideoUnderstanderMapper.updateByPrimaryKeyWithBLOBs(creativeVideoUnderstander); return; } addRetryCount(creativeVideoUnderstander); } private void addRetryCount(CreativeVideoUnderstander creativeVideoUnderstander) { if (creativeVideoUnderstander.getRetryCount() >= 3) { creativeVideoUnderstander.setStatus(2); } else { creativeVideoUnderstander.setRetryCount(creativeVideoUnderstander.getRetryCount() + 1); } creativeVideoUnderstanderMapper.updateByPrimaryKeySelective(creativeVideoUnderstander); } }