package com.tzld.piaoquan.featurestools.job; import com.alibaba.fastjson.JSONObject; import com.tzld.piaoquan.featurestools.dao.mapper.CreativeVideoSummarizeMapper; import com.tzld.piaoquan.featurestools.dao.mapper.CreativeVideoUnderstanderMapper; import com.tzld.piaoquan.featurestools.model.bo.EmbeddingResult; import com.tzld.piaoquan.featurestools.model.po.CreativeVideoSummarize; import com.tzld.piaoquan.featurestools.model.po.CreativeVideoSummarizeExample; import com.tzld.piaoquan.featurestools.model.po.CreativeVideoUnderstander; import com.tzld.piaoquan.featurestools.model.po.CreativeVideoUnderstanderExample; import com.tzld.piaoquan.featurestools.service.CreativeVideoSummarizeService; import com.tzld.piaoquan.featurestools.service.TextEmbeddingService; import com.tzld.piaoquan.featurestools.util.DeepSeekAPI; import com.tzld.piaoquan.featurestools.util.page.Page; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import java.util.List; import java.util.concurrent.*; @Slf4j @Component public class SummarizeUnderstandingJob { private static final ThreadPoolExecutor understandingPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5); private static final ArrayBlockingQueue understandingQueue = new ArrayBlockingQueue<>(100000); @Autowired private CreativeVideoUnderstanderMapper creativeVideoUnderstanderMapper; @Autowired private TextEmbeddingService textEmbeddingService; @Autowired private CreativeVideoSummarizeService creativeVideoSummarizeService; @Autowired private CreativeVideoSummarizeMapper creativeVideoSummarizeMapper; @XxlJob("summarizeUnderstandingJob") public ReturnT summarizeUnderstanding(String param) throws InterruptedException { if (understandingPoolExecutor.getCorePoolSize() - understandingPoolExecutor.getActiveCount() > 0) { int threadSize = understandingPoolExecutor.getCorePoolSize() - understandingPoolExecutor.getActiveCount(); log.info("threadNum={}", threadSize); CountDownLatch countDownLatch = new CountDownLatch(threadSize); // 启动消费者线程 for (int i = 0; i < threadSize; i++) { understandingPoolExecutor.execute(new Thread(() -> { log.info("启动视频理解总结线程"); while (true) { try { // 超过 5 分钟没有数据,销毁当前线程 CreativeVideoUnderstander creativeVideoUnderstander = understandingQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟 log.info("videoQueue size={}", understandingQueue.size()); if (creativeVideoUnderstander == null) { break; // 退出当前线程 } System.out.println("creativeVideoUnderstander=" + creativeVideoUnderstander); processSummarizeUnderstanding(creativeVideoUnderstander); } catch (Exception e) { log.error("视频理解总结线程异常", e); } } log.info("视频理解总结线程结束"); countDownLatch.countDown(); })); } CreativeVideoUnderstanderExample example = new CreativeVideoUnderstanderExample(); example.createCriteria().andStatusEqualTo(1); long count = creativeVideoUnderstanderMapper.countByExample(example); System.out.println("count = " + count); int pageSize = 1000; int pageNum = (int) (count / pageSize) + 1; for (int i = 1; i <= pageNum; i++) { Page page = new Page<>(i, pageSize); example.setPage(page); List creativeVideoUnderstanderList = creativeVideoUnderstanderMapper.selectByExampleWithBLOBs(example); if (!CollectionUtils.isEmpty(creativeVideoUnderstanderList)) { understandingQueue.addAll(creativeVideoUnderstanderList); } } countDownLatch.await(); } return ReturnT.SUCCESS; } private void processSummarizeUnderstanding(CreativeVideoUnderstander creativeVideoUnderstander) { log.info("processSummarizeUnderstanding creativeVideoUnderstander = {}", creativeVideoUnderstander); String understanderText = creativeVideoUnderstander.getUnderstanderText(); if (StringUtils.isEmpty(understanderText)) { return; } String chat = DeepSeekAPI.chat(understanderText); if (StringUtils.isEmpty(chat)) { return; } JSONObject jsonObject = JSONObject.parseObject(chat); String hook = String.join(",", jsonObject.getJSONArray("hook").toJavaList(String.class)); String value = String.join(",", jsonObject.getJSONArray("value").toJavaList(String.class)); String urgency = String.join(",", jsonObject.getJSONArray("urgency").toJavaList(String.class)); log.info("hook = " + hook); log.info("value = " + value); log.info("urgency = " + urgency); if (StringUtils.isEmpty(hook) && StringUtils.isEmpty(value) && StringUtils.isEmpty(urgency)) { return; } EmbeddingResult hookResult = null; EmbeddingResult valueResult = null; EmbeddingResult urgencyResult = null; if (StringUtils.isNotEmpty(hook)) { hookResult = textEmbeddingService.getEmbedding(hook); } if (StringUtils.isNotEmpty(value)) { valueResult = textEmbeddingService.getEmbedding(value); } if (StringUtils.isNotEmpty(urgency)) { urgencyResult = textEmbeddingService.getEmbedding(urgency); } log.info("hookResult = " + hookResult); log.info("valueResult = " + valueResult); log.info("urgencyResult = " + urgencyResult); if (hookResult == null && valueResult == null && urgencyResult == null) { return; } CreativeVideoSummarize hookCreativeVideoSummarize = null; CreativeVideoSummarize valueCreativeVideoSummarize = null; CreativeVideoSummarize urgencyCreativeVideoSummarize = null; if (hookResult != null) { hookCreativeVideoSummarize = new CreativeVideoSummarize(creativeVideoUnderstander.getId() , creativeVideoUnderstander.getCreativeId(), "creative_hook_embedding", hook, hookResult.getWords(), hookResult.getEmbeddingRes()); } if (valueResult != null) { valueCreativeVideoSummarize = new CreativeVideoSummarize(creativeVideoUnderstander.getId() , creativeVideoUnderstander.getCreativeId(), "creative_why_embedding", value, valueResult.getWords(), valueResult.getEmbeddingRes()); } if (urgencyResult != null) { urgencyCreativeVideoSummarize = new CreativeVideoSummarize(creativeVideoUnderstander.getId() , creativeVideoUnderstander.getCreativeId(), "creative_action_embedding", urgency, urgencyResult.getWords(), urgencyResult.getEmbeddingRes()); } creativeVideoSummarizeService.saveRes(creativeVideoUnderstander, hookCreativeVideoSummarize, valueCreativeVideoSummarize, urgencyCreativeVideoSummarize); } @XxlJob("refreshEmbeddingJob") public ReturnT refreshEmbedding(String param) throws InterruptedException { long l = creativeVideoSummarizeMapper.countByExample(new CreativeVideoSummarizeExample()); int pageSize = 1000; long pageNum = l / pageSize + 1; for (int i = 0; i < pageNum; i++) { CreativeVideoSummarizeExample example = new CreativeVideoSummarizeExample(); example.setPage(new Page<>(i + 1, pageSize)); List creativeVideoSummarizes = creativeVideoSummarizeMapper.selectByExample(example); if (CollectionUtils.isEmpty(creativeVideoSummarizes)) { continue; } for (CreativeVideoSummarize creativeVideoSummarize : creativeVideoSummarizes) { EmbeddingResult result = textEmbeddingService.getEmbedding(creativeVideoSummarize.getAiWordSplit()); creativeVideoSummarize.setEmbedding(result.getEmbeddingRes()); creativeVideoSummarize.setNlpWordSplit(result.getWords()); creativeVideoSummarizeMapper.updateByPrimaryKeyWithBLOBs(creativeVideoSummarize); } } return ReturnT.SUCCESS; } }