|
|
@@ -0,0 +1,168 @@
|
|
|
+package com.tzld.videoVector.job;
|
|
|
+
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.DeconstructContentMapper;
|
|
|
+import com.tzld.videoVector.model.entity.DeconstructResult;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.DeconstructContent;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.DeconstructContentExample;
|
|
|
+import com.tzld.videoVector.service.DeconstructService;
|
|
|
+import com.tzld.videoVector.service.VectorizeService;
|
|
|
+import com.xxl.job.core.biz.model.ReturnT;
|
|
|
+import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 素材解构结果轮询定时任务
|
|
|
+ * <p>
|
|
|
+ * 扫描 deconstruct_content 中未完成(status=0 PENDING 或 status=1 RUNNING)的记录,
|
|
|
+ * 调用解构 API 查询结果,更新状态。
|
|
|
+ * 若解构成功(status=2),自动触发 VectorizeService 向量化。
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class MaterialDeconstructCheckJob {
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private DeconstructContentMapper deconstructContentMapper;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private DeconstructService deconstructService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private VectorizeService vectorizeService;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 素材解构结果轮询
|
|
|
+ * 扫描未完成的解构记录,查询 API 更新状态,完成后自动向量化
|
|
|
+ */
|
|
|
+ @XxlJob("checkMaterialDeconstructJob")
|
|
|
+ public ReturnT<String> checkMaterialDeconstructJob(String param) {
|
|
|
+ log.info("开始执行素材解构结果轮询任务, param: {}", param);
|
|
|
+
|
|
|
+ try {
|
|
|
+ int totalChecked = 0;
|
|
|
+ int totalCompleted = 0;
|
|
|
+ int totalVectorized = 0;
|
|
|
+ int totalFailed = 0;
|
|
|
+ int pageNum = 0;
|
|
|
+ int pageSize = 50;
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ // 分页查询未完成的解构记录(status=0 或 status=1)
|
|
|
+ List<DeconstructContent> contents = queryPendingContents(pageNum, pageSize);
|
|
|
+ if (CollectionUtils.isEmpty(contents)) {
|
|
|
+ log.info("第 {} 页无待查询数据,轮询结束", pageNum);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ log.info("第 {} 页查询到 {} 条未完成的解构记录", pageNum, contents.size());
|
|
|
+
|
|
|
+ for (DeconstructContent content : contents) {
|
|
|
+ totalChecked++;
|
|
|
+ int result = checkAndUpdateSingleContent(content);
|
|
|
+ if (result == 1) {
|
|
|
+ totalCompleted++;
|
|
|
+ try {
|
|
|
+ vectorizeService.vectorizeContent(content);
|
|
|
+ totalVectorized++;
|
|
|
+ log.info("解构完成并向量化成功,contentId={}, taskId={}",
|
|
|
+ content.getId(), content.getTaskId());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("向量化失败,contentId={}, taskId={}, error={}",
|
|
|
+ content.getId(), content.getTaskId(), e.getMessage(), e);
|
|
|
+ }
|
|
|
+ } else if (result == -1) {
|
|
|
+ totalFailed++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (contents.size() < pageSize) {
|
|
|
+ log.info("第 {} 页数据量 {} 小于 pageSize {},轮询结束",
|
|
|
+ pageNum, contents.size(), pageSize);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ pageNum++;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("素材解构结果轮询完成,检查: {}, 完成: {}, 向量化: {}, 失败: {}",
|
|
|
+ totalChecked, totalCompleted, totalVectorized, totalFailed);
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("素材解构结果轮询任务异常: {}", e.getMessage(), e);
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "任务异常: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 分页查询未完成的解构内容(status=0 PENDING 或 status=1 RUNNING)
|
|
|
+ */
|
|
|
+ private List<DeconstructContent> queryPendingContents(int pageNum, int pageSize) {
|
|
|
+ DeconstructContentExample example = new DeconstructContentExample();
|
|
|
+ DeconstructContentExample.Criteria criteria = example.createCriteria();
|
|
|
+ // status IN (0, 1)
|
|
|
+ List<Short> pendingStatuses = java.util.Arrays.asList((short) 0, (short) 1);
|
|
|
+ criteria.andStatusIn(pendingStatuses);
|
|
|
+ example.setOrderByClause("id ASC LIMIT " + pageSize + " OFFSET " + (pageNum * pageSize));
|
|
|
+ return deconstructContentMapper.selectByExample(example);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查并更新单条解构内容的状态
|
|
|
+ *
|
|
|
+ * @return 1=解构成功, -1=解构失败, 0=未完成或未变化
|
|
|
+ */
|
|
|
+ private int checkAndUpdateSingleContent(DeconstructContent content) {
|
|
|
+ try {
|
|
|
+ String taskId = content.getTaskId();
|
|
|
+ if (!StringUtils.hasText(taskId)) {
|
|
|
+ log.info("contentId={} 的 taskId 为空,跳过", content.getId());
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ DeconstructResult result = deconstructService.getDeconstructResult(taskId);
|
|
|
+ if (result == null) {
|
|
|
+ log.info("查询解构结果返回空,taskId={}", taskId);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ Short newStatus = result.getStatus() != null
|
|
|
+ ? result.getStatus().shortValue() : content.getStatus();
|
|
|
+
|
|
|
+ if (newStatus.equals(content.getStatus())) {
|
|
|
+ log.info("解构状态未变化,taskId={}, status={}", taskId, newStatus);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ content.setStatus(newStatus);
|
|
|
+ content.setResultJson(result.getResult());
|
|
|
+ content.setFailureReason(result.getReason());
|
|
|
+ content.setPointUrl(result.getPointUrl());
|
|
|
+ content.setWeightUrl(result.getWeightUrl());
|
|
|
+ content.setPatternUrl(result.getPatternUrl());
|
|
|
+ content.setUpdateTime(new Date());
|
|
|
+ deconstructContentMapper.updateByPrimaryKeySelective(content);
|
|
|
+
|
|
|
+ log.info("更新解构状态,contentId={}, taskId={}, status={} -> {}",
|
|
|
+ content.getId(), taskId, content.getStatus(), newStatus);
|
|
|
+
|
|
|
+ if (newStatus == 2) {
|
|
|
+ return 1;
|
|
|
+ } else if (newStatus == 3) {
|
|
|
+ log.error("解构失败,contentId={}, taskId={}, reason={}",
|
|
|
+ content.getId(), taskId, result.getReason());
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+ return 0;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理解构结果失败,contentId={}, error={}",
|
|
|
+ content.getId(), e.getMessage(), e);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|