SummarizeUnderstandingJob.java 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package com.tzld.piaoquan.featurestools.job;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.tzld.piaoquan.featurestools.dao.mapper.CreativeVideoSummarizeMapper;
  4. import com.tzld.piaoquan.featurestools.dao.mapper.CreativeVideoUnderstanderMapper;
  5. import com.tzld.piaoquan.featurestools.model.bo.EmbeddingResult;
  6. import com.tzld.piaoquan.featurestools.model.po.CreativeVideoSummarize;
  7. import com.tzld.piaoquan.featurestools.model.po.CreativeVideoSummarizeExample;
  8. import com.tzld.piaoquan.featurestools.model.po.CreativeVideoUnderstander;
  9. import com.tzld.piaoquan.featurestools.model.po.CreativeVideoUnderstanderExample;
  10. import com.tzld.piaoquan.featurestools.service.CreativeVideoSummarizeService;
  11. import com.tzld.piaoquan.featurestools.service.TextEmbeddingService;
  12. import com.tzld.piaoquan.featurestools.util.DeepSeekAPI;
  13. import com.tzld.piaoquan.featurestools.util.page.Page;
  14. import com.xxl.job.core.biz.model.ReturnT;
  15. import com.xxl.job.core.handler.annotation.XxlJob;
  16. import lombok.extern.slf4j.Slf4j;
  17. import org.apache.commons.lang3.StringUtils;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.stereotype.Component;
  20. import org.springframework.transaction.annotation.Transactional;
  21. import org.springframework.util.CollectionUtils;
  22. import java.util.List;
  23. import java.util.concurrent.*;
  24. @Slf4j
  25. @Component
  26. public class SummarizeUnderstandingJob {
  27. private static final ThreadPoolExecutor understandingPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
  28. private static final ArrayBlockingQueue<CreativeVideoUnderstander> understandingQueue = new ArrayBlockingQueue<>(100000);
  29. @Autowired
  30. private CreativeVideoUnderstanderMapper creativeVideoUnderstanderMapper;
  31. @Autowired
  32. private TextEmbeddingService textEmbeddingService;
  33. @Autowired
  34. private CreativeVideoSummarizeService creativeVideoSummarizeService;
  35. @Autowired
  36. private CreativeVideoSummarizeMapper creativeVideoSummarizeMapper;
  37. @XxlJob("summarizeUnderstandingJob")
  38. public ReturnT<String> summarizeUnderstanding(String param) throws InterruptedException {
  39. if (understandingPoolExecutor.getCorePoolSize() - understandingPoolExecutor.getActiveCount() > 0) {
  40. int threadSize = understandingPoolExecutor.getCorePoolSize() - understandingPoolExecutor.getActiveCount();
  41. log.info("threadNum={}", threadSize);
  42. CountDownLatch countDownLatch = new CountDownLatch(threadSize);
  43. // 启动消费者线程
  44. for (int i = 0; i < threadSize; i++) {
  45. understandingPoolExecutor.execute(new Thread(() -> {
  46. log.info("启动视频理解总结线程");
  47. while (true) {
  48. try {
  49. // 超过 5 分钟没有数据,销毁当前线程
  50. CreativeVideoUnderstander creativeVideoUnderstander = understandingQueue.poll(5, TimeUnit.MINUTES); // 等待最多 5 分钟
  51. log.info("videoQueue size={}", understandingQueue.size());
  52. if (creativeVideoUnderstander == null) {
  53. break; // 退出当前线程
  54. }
  55. System.out.println("creativeVideoUnderstander=" + creativeVideoUnderstander);
  56. processSummarizeUnderstanding(creativeVideoUnderstander);
  57. } catch (Exception e) {
  58. log.error("视频理解总结线程异常", e);
  59. }
  60. }
  61. log.info("视频理解总结线程结束");
  62. countDownLatch.countDown();
  63. }));
  64. }
  65. CreativeVideoUnderstanderExample example = new CreativeVideoUnderstanderExample();
  66. example.createCriteria().andStatusEqualTo(1);
  67. long count = creativeVideoUnderstanderMapper.countByExample(example);
  68. System.out.println("count = " + count);
  69. int pageSize = 1000;
  70. int pageNum = (int) (count / pageSize) + 1;
  71. for (int i = 1; i <= pageNum; i++) {
  72. Page page = new Page<>(i, pageSize);
  73. example.setPage(page);
  74. List<CreativeVideoUnderstander> creativeVideoUnderstanderList = creativeVideoUnderstanderMapper.selectByExampleWithBLOBs(example);
  75. if (!CollectionUtils.isEmpty(creativeVideoUnderstanderList)) {
  76. understandingQueue.addAll(creativeVideoUnderstanderList);
  77. }
  78. }
  79. countDownLatch.await();
  80. }
  81. return ReturnT.SUCCESS;
  82. }
  83. private void processSummarizeUnderstanding(CreativeVideoUnderstander creativeVideoUnderstander) {
  84. log.info("processSummarizeUnderstanding creativeVideoUnderstander = {}", creativeVideoUnderstander);
  85. String understanderText = creativeVideoUnderstander.getUnderstanderText();
  86. if (StringUtils.isEmpty(understanderText)) {
  87. return;
  88. }
  89. String chat = DeepSeekAPI.chat(understanderText);
  90. if (StringUtils.isEmpty(chat)) {
  91. return;
  92. }
  93. JSONObject jsonObject = JSONObject.parseObject(chat);
  94. String hook = String.join(",", jsonObject.getJSONArray("hook").toJavaList(String.class));
  95. String value = String.join(",", jsonObject.getJSONArray("value").toJavaList(String.class));
  96. String urgency = String.join(",", jsonObject.getJSONArray("urgency").toJavaList(String.class));
  97. log.info("hook = " + hook);
  98. log.info("value = " + value);
  99. log.info("urgency = " + urgency);
  100. if (StringUtils.isEmpty(hook) && StringUtils.isEmpty(value) && StringUtils.isEmpty(urgency)) {
  101. return;
  102. }
  103. EmbeddingResult hookResult = null;
  104. EmbeddingResult valueResult = null;
  105. EmbeddingResult urgencyResult = null;
  106. if (StringUtils.isNotEmpty(hook)) {
  107. hookResult = textEmbeddingService.getEmbedding(hook);
  108. }
  109. if (StringUtils.isNotEmpty(value)) {
  110. valueResult = textEmbeddingService.getEmbedding(value);
  111. }
  112. if (StringUtils.isNotEmpty(urgency)) {
  113. urgencyResult = textEmbeddingService.getEmbedding(urgency);
  114. }
  115. log.info("hookResult = " + hookResult);
  116. log.info("valueResult = " + valueResult);
  117. log.info("urgencyResult = " + urgencyResult);
  118. if (hookResult == null && valueResult == null && urgencyResult == null) {
  119. return;
  120. }
  121. CreativeVideoSummarize hookCreativeVideoSummarize = null;
  122. CreativeVideoSummarize valueCreativeVideoSummarize = null;
  123. CreativeVideoSummarize urgencyCreativeVideoSummarize = null;
  124. if (hookResult != null) {
  125. hookCreativeVideoSummarize = new CreativeVideoSummarize(creativeVideoUnderstander.getId()
  126. , creativeVideoUnderstander.getCreativeId(), "creative_hook_embedding", hook,
  127. hookResult.getWords(), hookResult.getEmbeddingRes());
  128. }
  129. if (valueResult != null) {
  130. valueCreativeVideoSummarize = new CreativeVideoSummarize(creativeVideoUnderstander.getId()
  131. , creativeVideoUnderstander.getCreativeId(), "creative_why_embedding", value,
  132. valueResult.getWords(), valueResult.getEmbeddingRes());
  133. }
  134. if (urgencyResult != null) {
  135. urgencyCreativeVideoSummarize = new CreativeVideoSummarize(creativeVideoUnderstander.getId()
  136. , creativeVideoUnderstander.getCreativeId(), "creative_action_embedding", urgency,
  137. urgencyResult.getWords(), urgencyResult.getEmbeddingRes());
  138. }
  139. creativeVideoSummarizeService.saveRes(creativeVideoUnderstander, hookCreativeVideoSummarize,
  140. valueCreativeVideoSummarize, urgencyCreativeVideoSummarize);
  141. }
  142. @XxlJob("refreshEmbeddingJob")
  143. public ReturnT<String> refreshEmbedding(String param) throws InterruptedException {
  144. long l = creativeVideoSummarizeMapper.countByExample(new CreativeVideoSummarizeExample());
  145. int pageSize = 1000;
  146. long pageNum = l / pageSize + 1;
  147. for (int i = 0; i < pageNum; i++) {
  148. CreativeVideoSummarizeExample example = new CreativeVideoSummarizeExample();
  149. example.setPage(new Page<>(i + 1, pageSize));
  150. List<CreativeVideoSummarize> creativeVideoSummarizes = creativeVideoSummarizeMapper.selectByExample(example);
  151. if (CollectionUtils.isEmpty(creativeVideoSummarizes)) {
  152. continue;
  153. }
  154. for (CreativeVideoSummarize creativeVideoSummarize : creativeVideoSummarizes) {
  155. EmbeddingResult result = textEmbeddingService.getEmbedding(creativeVideoSummarize.getAiWordSplit());
  156. creativeVideoSummarize.setEmbedding(result.getEmbeddingRes());
  157. creativeVideoSummarize.setNlpWordSplit(result.getWords());
  158. creativeVideoSummarizeMapper.updateByPrimaryKeyWithBLOBs(creativeVideoSummarize);
  159. }
  160. }
  161. return ReturnT.SUCCESS;
  162. }
  163. }