123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- package com.tzld.piaoquan.featurestools.job;
- import com.tzld.piaoquan.featurestools.dao.mapper.CreativeVideoUnderstanderMapper;
- import com.tzld.piaoquan.featurestools.model.po.CreativeVideoUnderstander;
- import com.tzld.piaoquan.featurestools.model.po.CreativeVideoUnderstanderExample;
- import com.tzld.piaoquan.featurestools.model.vo.VideoUnderstandParam;
- import com.tzld.piaoquan.featurestools.model.vo.VideoUnderstandResult;
- import com.tzld.piaoquan.featurestools.service.VideoUnderstandService;
- import com.tzld.piaoquan.featurestools.util.DateUtil;
- import com.tzld.piaoquan.featurestools.util.page.Page;
- import com.xxl.job.core.biz.model.ReturnT;
- import com.xxl.job.core.handler.annotation.XxlJob;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
- import java.util.Date;
- import java.util.List;
- import java.util.concurrent.*;
- @Slf4j
- @Component
- public class CreativeVideoUnderstandJob {
- private static final ThreadPoolExecutor videoPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
- private static final ArrayBlockingQueue<CreativeVideoUnderstander> videoQueue = new ArrayBlockingQueue<>(100000);
- @Autowired
- private CreativeVideoUnderstanderMapper creativeVideoUnderstanderMapper;
- @Autowired
- private VideoUnderstandService videoUnderstandService;
- @XxlJob("videoUnderstanderJob")
- public ReturnT<String> videoUnderstander(String param) throws InterruptedException {
- if (videoPoolExecutor.getCorePoolSize() - videoPoolExecutor.getActiveCount() > 0) {
- int threadSize = videoPoolExecutor.getCorePoolSize() - videoPoolExecutor.getActiveCount();
- log.info("threadNum={}", threadSize);
- CountDownLatch countDownLatch = new CountDownLatch(threadSize);
- // 启动消费者线程
- for (int i = 0; i < threadSize; i++) {
- videoPoolExecutor.execute(new Thread(() -> {
- log.info("启动视频理解线程");
- while (true) {
- try {
- // 超过 5 分钟没有数据,销毁当前线程
- CreativeVideoUnderstander creativeVideoUnderstander = videoQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
- log.info("videoQueue size={}", videoQueue.size());
- if (creativeVideoUnderstander == null) {
- break; // 退出当前线程
- }
- System.out.println("creativeVideoUnderstander=" + creativeVideoUnderstander);
- processCreativeVideoUnderstander(creativeVideoUnderstander);
- } catch (Exception e) {
- log.error("视频理解线程异常", e);
- }
- }
- log.info("视频理解线程结束");
- countDownLatch.countDown();
- }));
- }
- CreativeVideoUnderstanderExample example = new CreativeVideoUnderstanderExample();
- example.createCriteria().andStatusEqualTo(0);
- long count = creativeVideoUnderstanderMapper.countByExample(example);
- int pageSize = 1000;
- int pageNum = (int) (count / pageSize) + 1;
- for (int i = 1; i <= pageNum; i++) {
- Page page = new Page<>(i, pageSize);
- example.setPage(page);
- List<CreativeVideoUnderstander> creativeVideoUnderstanderList = creativeVideoUnderstanderMapper.selectByExample(example);
- if (!CollectionUtils.isEmpty(creativeVideoUnderstanderList)) {
- videoQueue.addAll(creativeVideoUnderstanderList);
- }
- }
- countDownLatch.await();
- }
- return ReturnT.SUCCESS;
- }
- private void processCreativeVideoUnderstander(CreativeVideoUnderstander creativeVideoUnderstander) {
- VideoUnderstandParam videoUnderstandParam = new VideoUnderstandParam();
- videoUnderstandParam.setUrl(creativeVideoUnderstander.getVideoUrl());
- // videoUnderstandParam.setFileName(creativeVideoUnderstander.getVideoFileName());
- VideoUnderstandResult videoUnderstandResult = videoUnderstandService.getVideoUnderstandResult(videoUnderstandParam);
- System.out.println("videoUnderstandResult=" + videoUnderstandResult);
- if (videoUnderstandResult == null) {
- addRetryCount(creativeVideoUnderstander);
- return;
- }
- if (StringUtils.isNotEmpty(videoUnderstandResult.getError())) {
- log.error("videoUnderstandResult id={} error={}", creativeVideoUnderstander.getId(), videoUnderstandResult.getError());
- addRetryCount(creativeVideoUnderstander);
- return;
- }
- if (StringUtils.isNotEmpty(videoUnderstandResult.getFileName())) {
- creativeVideoUnderstander.setVideoFileName(videoUnderstandResult.getFileName());
- }
- if (StringUtils.isNotEmpty(videoUnderstandResult.getFileStatus())) {
- creativeVideoUnderstander.setVideoFileStatus(videoUnderstandResult.getFileStatus());
- }
- if (StringUtils.isNotEmpty(videoUnderstandResult.getExpireTime())) {
- Date date = DateUtil.parseDate(videoUnderstandResult.getExpireTime());
- if (date != null) {
- creativeVideoUnderstander.setVideoFileExpireTime(date);
- }
- }
- if (StringUtils.isNotEmpty(videoUnderstandResult.getUnderstanderText())) {
- creativeVideoUnderstander.setUnderstanderText(videoUnderstandResult.getUnderstanderText());
- creativeVideoUnderstander.setStatus(1);
- creativeVideoUnderstanderMapper.updateByPrimaryKeyWithBLOBs(creativeVideoUnderstander);
- return;
- }
- addRetryCount(creativeVideoUnderstander);
- }
- private void addRetryCount(CreativeVideoUnderstander creativeVideoUnderstander) {
- if (creativeVideoUnderstander.getRetryCount() >= 3) {
- creativeVideoUnderstander.setStatus(2);
- } else {
- creativeVideoUnderstander.setRetryCount(creativeVideoUnderstander.getRetryCount() + 1);
- }
- creativeVideoUnderstanderMapper.updateByPrimaryKeySelective(creativeVideoUnderstander);
- }
- }
|