123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- package com.tzld.piaoquan.featurestools.job;
- import com.alibaba.fastjson.JSONObject;
- import com.tzld.piaoquan.featurestools.dao.mapper.CreativeVideoSummarizeMapper;
- import com.tzld.piaoquan.featurestools.dao.mapper.CreativeVideoUnderstanderMapper;
- import com.tzld.piaoquan.featurestools.model.bo.EmbeddingResult;
- import com.tzld.piaoquan.featurestools.model.po.CreativeVideoSummarize;
- import com.tzld.piaoquan.featurestools.model.po.CreativeVideoSummarizeExample;
- import com.tzld.piaoquan.featurestools.model.po.CreativeVideoUnderstander;
- import com.tzld.piaoquan.featurestools.model.po.CreativeVideoUnderstanderExample;
- import com.tzld.piaoquan.featurestools.service.CreativeVideoSummarizeService;
- import com.tzld.piaoquan.featurestools.service.TextEmbeddingService;
- import com.tzld.piaoquan.featurestools.util.DeepSeekAPI;
- import com.tzld.piaoquan.featurestools.util.page.Page;
- import com.xxl.job.core.biz.model.ReturnT;
- import com.xxl.job.core.handler.annotation.XxlJob;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- import org.springframework.transaction.annotation.Transactional;
- import org.springframework.util.CollectionUtils;
- import java.util.List;
- import java.util.concurrent.*;
- @Slf4j
- @Component
- public class SummarizeUnderstandingJob {
- private static final ThreadPoolExecutor understandingPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
- private static final ArrayBlockingQueue<CreativeVideoUnderstander> understandingQueue = new ArrayBlockingQueue<>(100000);
- @Autowired
- private CreativeVideoUnderstanderMapper creativeVideoUnderstanderMapper;
- @Autowired
- private TextEmbeddingService textEmbeddingService;
- @Autowired
- private CreativeVideoSummarizeService creativeVideoSummarizeService;
- @Autowired
- private CreativeVideoSummarizeMapper creativeVideoSummarizeMapper;
- @XxlJob("summarizeUnderstandingJob")
- public ReturnT<String> summarizeUnderstanding(String param) throws InterruptedException {
- if (understandingPoolExecutor.getCorePoolSize() - understandingPoolExecutor.getActiveCount() > 0) {
- int threadSize = understandingPoolExecutor.getCorePoolSize() - understandingPoolExecutor.getActiveCount();
- log.info("threadNum={}", threadSize);
- CountDownLatch countDownLatch = new CountDownLatch(threadSize);
- // 启动消费者线程
- for (int i = 0; i < threadSize; i++) {
- understandingPoolExecutor.execute(new Thread(() -> {
- log.info("启动视频理解总结线程");
- while (true) {
- try {
- // 超过 5 分钟没有数据,销毁当前线程
- CreativeVideoUnderstander creativeVideoUnderstander = understandingQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
- log.info("videoQueue size={}", understandingQueue.size());
- if (creativeVideoUnderstander == null) {
- break; // 退出当前线程
- }
- System.out.println("creativeVideoUnderstander=" + creativeVideoUnderstander);
- processSummarizeUnderstanding(creativeVideoUnderstander);
- } catch (Exception e) {
- log.error("视频理解总结线程异常", e);
- }
- }
- log.info("视频理解总结线程结束");
- countDownLatch.countDown();
- }));
- }
- CreativeVideoUnderstanderExample example = new CreativeVideoUnderstanderExample();
- example.createCriteria().andStatusEqualTo(1);
- long count = creativeVideoUnderstanderMapper.countByExample(example);
- System.out.println("count = " + count);
- int pageSize = 1000;
- int pageNum = (int) (count / pageSize) + 1;
- for (int i = 1; i <= pageNum; i++) {
- Page page = new Page<>(i, pageSize);
- example.setPage(page);
- List<CreativeVideoUnderstander> creativeVideoUnderstanderList = creativeVideoUnderstanderMapper.selectByExampleWithBLOBs(example);
- if (!CollectionUtils.isEmpty(creativeVideoUnderstanderList)) {
- understandingQueue.addAll(creativeVideoUnderstanderList);
- }
- }
- countDownLatch.await();
- }
- return ReturnT.SUCCESS;
- }
- private void processSummarizeUnderstanding(CreativeVideoUnderstander creativeVideoUnderstander) {
- log.info("processSummarizeUnderstanding creativeVideoUnderstander = {}", creativeVideoUnderstander);
- String understanderText = creativeVideoUnderstander.getUnderstanderText();
- if (StringUtils.isEmpty(understanderText)) {
- return;
- }
- String chat = DeepSeekAPI.chat(understanderText);
- if (StringUtils.isEmpty(chat)) {
- return;
- }
- JSONObject jsonObject = JSONObject.parseObject(chat);
- String hook = String.join(",", jsonObject.getJSONArray("hook").toJavaList(String.class));
- String value = String.join(",", jsonObject.getJSONArray("value").toJavaList(String.class));
- String urgency = String.join(",", jsonObject.getJSONArray("urgency").toJavaList(String.class));
- log.info("hook = " + hook);
- log.info("value = " + value);
- log.info("urgency = " + urgency);
- if (StringUtils.isEmpty(hook) && StringUtils.isEmpty(value) && StringUtils.isEmpty(urgency)) {
- return;
- }
- EmbeddingResult hookResult = null;
- EmbeddingResult valueResult = null;
- EmbeddingResult urgencyResult = null;
- if (StringUtils.isNotEmpty(hook)) {
- hookResult = textEmbeddingService.getEmbedding(hook);
- }
- if (StringUtils.isNotEmpty(value)) {
- valueResult = textEmbeddingService.getEmbedding(value);
- }
- if (StringUtils.isNotEmpty(urgency)) {
- urgencyResult = textEmbeddingService.getEmbedding(urgency);
- }
- log.info("hookResult = " + hookResult);
- log.info("valueResult = " + valueResult);
- log.info("urgencyResult = " + urgencyResult);
- if (hookResult == null && valueResult == null && urgencyResult == null) {
- return;
- }
- CreativeVideoSummarize hookCreativeVideoSummarize = null;
- CreativeVideoSummarize valueCreativeVideoSummarize = null;
- CreativeVideoSummarize urgencyCreativeVideoSummarize = null;
- if (hookResult != null) {
- hookCreativeVideoSummarize = new CreativeVideoSummarize(creativeVideoUnderstander.getId()
- , creativeVideoUnderstander.getCreativeId(), "creative_hook_embedding", hook,
- hookResult.getWords(), hookResult.getEmbeddingRes());
- }
- if (valueResult != null) {
- valueCreativeVideoSummarize = new CreativeVideoSummarize(creativeVideoUnderstander.getId()
- , creativeVideoUnderstander.getCreativeId(), "creative_why_embedding", value,
- valueResult.getWords(), valueResult.getEmbeddingRes());
- }
- if (urgencyResult != null) {
- urgencyCreativeVideoSummarize = new CreativeVideoSummarize(creativeVideoUnderstander.getId()
- , creativeVideoUnderstander.getCreativeId(), "creative_action_embedding", urgency,
- urgencyResult.getWords(), urgencyResult.getEmbeddingRes());
- }
- creativeVideoSummarizeService.saveRes(creativeVideoUnderstander, hookCreativeVideoSummarize,
- valueCreativeVideoSummarize, urgencyCreativeVideoSummarize);
- }
- @XxlJob("refreshEmbeddingJob")
- public ReturnT<String> refreshEmbedding(String param) throws InterruptedException {
- long l = creativeVideoSummarizeMapper.countByExample(new CreativeVideoSummarizeExample());
- int pageSize = 1000;
- long pageNum = l / pageSize + 1;
- for (int i = 0; i < pageNum; i++) {
- CreativeVideoSummarizeExample example = new CreativeVideoSummarizeExample();
- example.setPage(new Page<>(i + 1, pageSize));
- List<CreativeVideoSummarize> creativeVideoSummarizes = creativeVideoSummarizeMapper.selectByExample(example);
- if (CollectionUtils.isEmpty(creativeVideoSummarizes)) {
- continue;
- }
- for (CreativeVideoSummarize creativeVideoSummarize : creativeVideoSummarizes) {
- EmbeddingResult result = textEmbeddingService.getEmbedding(creativeVideoSummarize.getAiWordSplit());
- creativeVideoSummarize.setEmbedding(result.getEmbeddingRes());
- creativeVideoSummarize.setNlpWordSplit(result.getWords());
- creativeVideoSummarizeMapper.updateByPrimaryKeyWithBLOBs(creativeVideoSummarize);
- }
- }
- return ReturnT.SUCCESS;
- }
- }
|