|
|
@@ -0,0 +1,418 @@
|
|
|
+package com.tzld.videoVector.job;
|
|
|
+
|
|
|
+import com.tzld.videoVector.common.constant.VectorConstants;
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.ContentVectorMapper;
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.DeconstructContentMapper;
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.DeconstructVectorConfigMapper;
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.VideoVectorMapper;
|
|
|
+import com.tzld.videoVector.dao.mapper.videoVector.deconstruct.MysqlDeconstructContentVectorMapper;
|
|
|
+import com.tzld.videoVector.dao.mapper.videoVector.deconstruct.MysqlDeconstructContentMapper;
|
|
|
+import com.tzld.videoVector.dao.mapper.videoVector.deconstruct.MysqlDeconstructVectorConfigMapper;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.DeconstructContent;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfig;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfigExample;
|
|
|
+import com.tzld.videoVector.model.po.videoVector.deconstruct.*;
|
|
|
+import com.xxl.job.core.biz.model.ReturnT;
|
|
|
+import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
+import org.springframework.data.redis.core.StringRedisTemplate;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.LinkedHashSet;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 数据迁移任务
|
|
|
+ * 1. MySQL deconstruct_content / deconstruct_vector_config → PG
|
|
|
+ * 2. MySQL deconstruct_content_vector → PG content_vectors
|
|
|
+ * 3. Redis 视频向量缓存 → PG video_vectors
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class DataMigrationJob {
|
|
|
+
|
|
|
+ // ==================== MySQL 数据源 Mapper ====================
|
|
|
+ @Resource
|
|
|
+ private MysqlDeconstructContentMapper mysqlContentMapper;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private MysqlDeconstructVectorConfigMapper mysqlConfigMapper;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private MysqlDeconstructContentVectorMapper mysqlContentVectorMapper;
|
|
|
+
|
|
|
+ // ==================== PG 数据源 Mapper ====================
|
|
|
+ @Resource
|
|
|
+ private DeconstructContentMapper pgContentMapper;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private DeconstructVectorConfigMapper pgConfigMapper;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private ContentVectorMapper pgContentVectorMapper;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private VideoVectorMapper pgVideoVectorMapper;
|
|
|
+
|
|
|
+ // ==================== Redis ====================
|
|
|
+ @Resource
|
|
|
+ private RedisTemplate<String, Object> redisTemplate;
|
|
|
+
|
|
|
+ // 向量值是纯 JSON 字符串,没有 Jackson 类型信息,需用 StringRedisTemplate 读取
|
|
|
+ @Resource
|
|
|
+ private StringRedisTemplate stringRedisTemplate;
|
|
|
+
|
|
|
+ // ==================== 迁移批次大小 ====================
|
|
|
+ private static final int BATCH_SIZE = 200;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * MySQL deconstruct_content 表迁移到 PG
|
|
|
+ */
|
|
|
+ @XxlJob("migrateContentToPgJob")
|
|
|
+ public ReturnT<String> migrateContentToPgJob(String param) {
|
|
|
+ log.info("开始迁移 deconstruct_content 数据到 PG, param: {}", param);
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 从 MySQL 分页读取
|
|
|
+ MysqlDeconstructContentExample example =
|
|
|
+ new MysqlDeconstructContentExample();
|
|
|
+ example.setOrderByClause("id ASC");
|
|
|
+ List<MysqlDeconstructContent> allRecords =
|
|
|
+ mysqlContentMapper.selectByExampleWithBLOBs(example);
|
|
|
+
|
|
|
+ if (CollectionUtils.isEmpty(allRecords)) {
|
|
|
+ log.info("MySQL deconstruct_content 无数据,跳过");
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("查询到 {} 条 deconstruct_content 记录待迁移", allRecords.size());
|
|
|
+
|
|
|
+ int successCount = 0;
|
|
|
+ int skipCount = 0;
|
|
|
+ int failCount = 0;
|
|
|
+
|
|
|
+ for (MysqlDeconstructContent mysql : allRecords) {
|
|
|
+ try {
|
|
|
+ // 转换为 PG 实体
|
|
|
+ DeconstructContent pg = convertContent(mysql);
|
|
|
+ // 插入 PG(使用 insertSelective 避免主键冲突时可手动处理)
|
|
|
+ pgContentMapper.insertSelective(pg);
|
|
|
+ successCount++;
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (e.getMessage() != null && e.getMessage().contains("duplicate key")) {
|
|
|
+ skipCount++;
|
|
|
+ } else {
|
|
|
+ failCount++;
|
|
|
+ log.error("迁移 content id={} 失败: {}", mysql.getId(), e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("deconstruct_content 迁移完成,成功: {}, 跳过(已存在): {}, 失败: {}",
|
|
|
+ successCount, skipCount, failCount);
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("deconstruct_content 迁移失败: {}", e.getMessage(), e);
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "迁移失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * MySQL deconstruct_vector_config 表迁移到 PG
|
|
|
+ */
|
|
|
+ @XxlJob("migrateVectorConfigToPgJob")
|
|
|
+ public ReturnT<String> migrateVectorConfigToPgJob(String param) {
|
|
|
+ log.info("开始迁移 deconstruct_vector_config 数据到 PG, param: {}", param);
|
|
|
+
|
|
|
+ try {
|
|
|
+ MysqlDeconstructVectorConfigExample example =
|
|
|
+ new MysqlDeconstructVectorConfigExample();
|
|
|
+ example.setOrderByClause("id ASC");
|
|
|
+ List<MysqlDeconstructVectorConfig> allRecords =
|
|
|
+ mysqlConfigMapper.selectByExample(example);
|
|
|
+
|
|
|
+ if (CollectionUtils.isEmpty(allRecords)) {
|
|
|
+ log.info("MySQL deconstruct_vector_config 无数据,跳过");
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("查询到 {} 条 deconstruct_vector_config 记录待迁移", allRecords.size());
|
|
|
+
|
|
|
+ int successCount = 0;
|
|
|
+ int skipCount = 0;
|
|
|
+ int failCount = 0;
|
|
|
+
|
|
|
+ for (MysqlDeconstructVectorConfig mysql : allRecords) {
|
|
|
+ try {
|
|
|
+ DeconstructVectorConfig pg = convertVectorConfig(mysql);
|
|
|
+ pgConfigMapper.insertSelective(pg);
|
|
|
+ successCount++;
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (e.getMessage() != null && e.getMessage().contains("duplicate key")) {
|
|
|
+ skipCount++;
|
|
|
+ } else {
|
|
|
+ failCount++;
|
|
|
+ log.error("迁移 vector_config id={} 失败: {}", mysql.getId(), e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("deconstruct_vector_config 迁移完成,成功: {}, 跳过(已存在): {}, 失败: {}",
|
|
|
+ successCount, skipCount, failCount);
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("deconstruct_vector_config 迁移失败: {}", e.getMessage(), e);
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "迁移失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * MySQL deconstruct_content_vector 表迁移到 PG content_vectors
|
|
|
+ * 将 vector_data(JSON 数组)转为 pgvector embedding
|
|
|
+ */
|
|
|
+ @XxlJob("migrateContentVectorToPgJob")
|
|
|
+ public ReturnT<String> migrateContentVectorToPgJob(String param) {
|
|
|
+ log.info("开始迁移 deconstruct_content_vector 数据到 PG content_vectors, param: {}", param);
|
|
|
+
|
|
|
+ try {
|
|
|
+ MysqlDeconstructContentVectorExample example = new MysqlDeconstructContentVectorExample();
|
|
|
+ example.setOrderByClause("id ASC");
|
|
|
+ List<MysqlDeconstructContentVector> allRecords = mysqlContentVectorMapper.selectByExampleWithBLOBs(example);
|
|
|
+
|
|
|
+ if (CollectionUtils.isEmpty(allRecords)) {
|
|
|
+ log.info("MySQL deconstruct_content_vector 无数据,跳过");
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("查询到 {} 条 deconstruct_content_vector 记录待迁移", allRecords.size());
|
|
|
+
|
|
|
+ int successCount = 0;
|
|
|
+ int skipCount = 0;
|
|
|
+ int failCount = 0;
|
|
|
+
|
|
|
+ for (MysqlDeconstructContentVector mysql : allRecords) {
|
|
|
+ try {
|
|
|
+ String vectorData = mysql.getVectorData();
|
|
|
+ if (!StringUtils.hasText(vectorData)) {
|
|
|
+ log.debug("content_vector id={} vectorData 为空,跳过", mysql.getId());
|
|
|
+ skipCount++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // vectorData 是 JSON 数组格式 "[0.1,0.2,...]",pgvector 接受此格式
|
|
|
+ pgContentVectorMapper.upsertWithEmbedding(
|
|
|
+ mysql.getContentId(),
|
|
|
+ mysql.getTaskId(),
|
|
|
+ mysql.getConfigCode(),
|
|
|
+ mysql.getSourceField(),
|
|
|
+ mysql.getSourcePath(),
|
|
|
+ mysql.getTextHash(),
|
|
|
+ mysql.getEmbeddingModel(),
|
|
|
+ mysql.getSegmentIndex(),
|
|
|
+ mysql.getSegmentTotal(),
|
|
|
+ mysql.getSourceText(),
|
|
|
+ vectorData
|
|
|
+ );
|
|
|
+ successCount++;
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (e.getMessage() != null && e.getMessage().contains("duplicate key")) {
|
|
|
+ skipCount++;
|
|
|
+ } else {
|
|
|
+ failCount++;
|
|
|
+ log.error("迁移 content_vector id={} 失败: {}", mysql.getId(), e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("deconstruct_content_vector → content_vectors 迁移完成,成功: {}, 跳过: {}, 失败: {}",
|
|
|
+ successCount, skipCount, failCount);
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("content_vector 迁移失败: {}", e.getMessage(), e);
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "迁移失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Redis 视频向量缓存迁移到 PG video_vectors
|
|
|
+ * Redis 存储结构:
|
|
|
+ * Key: video:vector:{configCode}:{videoId} Value: JSON 数组字符串
|
|
|
+ * Key: video:vector:{configCode}:ids 类型: Set(存储所有 videoId)
|
|
|
+ */
|
|
|
+ @XxlJob("migrateRedisVectorToPgJob")
|
|
|
+ public ReturnT<String> migrateRedisVectorToPgJob(String param) {
|
|
|
+ log.info("开始迁移 Redis 视频向量缓存到 PG video_vectors, param: {}", param);
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 1. 获取所有启用的配置编码
|
|
|
+ List<String> configCodes = getAllConfigCodes();
|
|
|
+ if (configCodes.isEmpty()) {
|
|
|
+ log.warn("未获取到任何配置编码");
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ }
|
|
|
+ log.info("待迁移的配置编码: {}", configCodes);
|
|
|
+
|
|
|
+ int totalSuccess = 0;
|
|
|
+ int totalSkip = 0;
|
|
|
+ int totalFail = 0;
|
|
|
+
|
|
|
+ // 2. 逐个配置迁移
|
|
|
+ for (String configCode : configCodes) {
|
|
|
+ log.info("开始迁移配置 {} 的向量数据", configCode);
|
|
|
+
|
|
|
+ // 从 Redis Set 获取所有 videoId
|
|
|
+ String idsKey = VectorConstants.VECTOR_KEY_PREFIX + configCode + ":ids";
|
|
|
+ Set<Object> idMembers = redisTemplate.opsForSet().members(idsKey);
|
|
|
+
|
|
|
+ if (idMembers == null || idMembers.isEmpty()) {
|
|
|
+ log.info("配置 {} 在 Redis 中无 videoId 索引,跳过", configCode);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("配置 {} 共有 {} 个 videoId 待迁移", configCode, idMembers.size());
|
|
|
+
|
|
|
+ // Redis value 可能是 Integer/Long/String,统一转为 String
|
|
|
+ List<String> idList = new ArrayList<>();
|
|
|
+ for (Object member : idMembers) {
|
|
|
+ idList.add(String.valueOf(member));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 分批处理
|
|
|
+ for (int i = 0; i < idList.size(); i += BATCH_SIZE) {
|
|
|
+ int end = Math.min(i + BATCH_SIZE, idList.size());
|
|
|
+ List<String> batch = idList.subList(i, end);
|
|
|
+
|
|
|
+ for (String idStr : batch) {
|
|
|
+ try {
|
|
|
+ Long videoId = Long.parseLong(idStr);
|
|
|
+ String vectorKey = VectorConstants.VECTOR_KEY_PREFIX + configCode + ":" + videoId;
|
|
|
+ String vectorJson = stringRedisTemplate.opsForValue().get(vectorKey);
|
|
|
+
|
|
|
+ if (!StringUtils.hasText(vectorJson)) {
|
|
|
+ totalSkip++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Redis 中存储的是归一化后的 JSON 数组 "[0.1,0.2,...]"
|
|
|
+ // pgvector 接受此格式
|
|
|
+ pgVideoVectorMapper.upsertVector(videoId, configCode, vectorJson);
|
|
|
+ totalSuccess++;
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ log.warn("非法 videoId: {}", idStr);
|
|
|
+ totalSkip++;
|
|
|
+ } catch (Exception e) {
|
|
|
+ if (e.getMessage() != null && e.getMessage().contains("duplicate key")) {
|
|
|
+ totalSkip++;
|
|
|
+ } else {
|
|
|
+ totalFail++;
|
|
|
+ log.error("迁移 Redis 向量失败,configCode={}, videoId={}, error={}",
|
|
|
+ configCode, idStr, e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("配置 {} 进度: {}/{}", configCode, Math.min(end, idList.size()), idList.size());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("Redis 视频向量迁移完成,总成功: {}, 总跳过: {}, 总失败: {}",
|
|
|
+ totalSuccess, totalSkip, totalFail);
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Redis 向量迁移失败: {}", e.getMessage(), e);
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "迁移失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // ==================== 工具方法 ====================
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取所有非多点模式的配置编码(从 PG 查询 + 默认编码)
|
|
|
+ * 多点模式(extract_rule 非空)的数据不迁移,由 Job 重新生成
|
|
|
+ */
|
|
|
+ private List<String> getAllConfigCodes() {
|
|
|
+ Set<String> codes = new LinkedHashSet<>();
|
|
|
+ codes.add(VectorConstants.DEFAULT_CONFIG_CODE);
|
|
|
+
|
|
|
+ try {
|
|
|
+ DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
|
|
|
+ example.createCriteria().andEnabledEqualTo((short) 1);
|
|
|
+ List<DeconstructVectorConfig> configs = pgConfigMapper.selectByExample(example);
|
|
|
+ if (configs != null) {
|
|
|
+ for (DeconstructVectorConfig config : configs) {
|
|
|
+ if (StringUtils.hasText(config.getConfigCode())
|
|
|
+ && !StringUtils.hasText(config.getExtractRule())) {
|
|
|
+ // 仅迁移非多点模式(extract_rule 为空)的配置
|
|
|
+ codes.add(config.getConfigCode());
|
|
|
+ } else if (StringUtils.hasText(config.getExtractRule())) {
|
|
|
+ log.info("跳过多点模式配置: {}(extract_rule 非空,由 Job 重新生成)", config.getConfigCode());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("获取配置编码失败: {}", e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ return new ArrayList<>(codes);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * MySQL DeconstructContent → PG DeconstructContent 实体转换
|
|
|
+ * 主要差异:Byte → Short
|
|
|
+ */
|
|
|
+ private DeconstructContent convertContent(
|
|
|
+ MysqlDeconstructContent mysql) {
|
|
|
+ DeconstructContent pg = new DeconstructContent();
|
|
|
+ // 不设置 id,让 PG 自增生成
|
|
|
+ pg.setTaskId(mysql.getTaskId());
|
|
|
+ pg.setBizType(mysql.getBizType() != null ? mysql.getBizType().shortValue() : null);
|
|
|
+ pg.setContentType(mysql.getContentType() != null ? mysql.getContentType().shortValue() : null);
|
|
|
+ pg.setChannelContentId(mysql.getChannelContentId());
|
|
|
+ pg.setTitle(mysql.getTitle());
|
|
|
+ pg.setBodyText(mysql.getBodyText());
|
|
|
+ pg.setVideoUrl(mysql.getVideoUrl());
|
|
|
+ pg.setImages(mysql.getImages());
|
|
|
+ pg.setChannelAccountId(mysql.getChannelAccountId());
|
|
|
+ pg.setChannelAccountName(mysql.getChannelAccountName());
|
|
|
+ pg.setStatus(mysql.getStatus() != null ? mysql.getStatus().shortValue() : null);
|
|
|
+ pg.setResultJson(mysql.getResultJson());
|
|
|
+ pg.setFailureReason(mysql.getFailureReason());
|
|
|
+ pg.setPointUrl(mysql.getPointUrl());
|
|
|
+ pg.setWeightUrl(mysql.getWeightUrl());
|
|
|
+ pg.setPatternUrl(mysql.getPatternUrl());
|
|
|
+ pg.setCreateTime(mysql.getCreateTime());
|
|
|
+ pg.setUpdateTime(mysql.getUpdateTime());
|
|
|
+ return pg;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * MySQL DeconstructVectorConfig → PG DeconstructVectorConfig 实体转换
|
|
|
+ */
|
|
|
+ private DeconstructVectorConfig convertVectorConfig(
|
|
|
+ MysqlDeconstructVectorConfig mysql) {
|
|
|
+ DeconstructVectorConfig pg = new DeconstructVectorConfig();
|
|
|
+ pg.setConfigCode(mysql.getConfigCode());
|
|
|
+ pg.setConfigName(mysql.getConfigName());
|
|
|
+ pg.setBizType(mysql.getBizType() != null ? mysql.getBizType().shortValue() : null);
|
|
|
+ pg.setContentType(mysql.getContentType() != null ? mysql.getContentType().shortValue() : null);
|
|
|
+ pg.setSourceField(mysql.getSourceField());
|
|
|
+ pg.setSourcePath(mysql.getSourcePath());
|
|
|
+ pg.setExtractRule(mysql.getExtractRule());
|
|
|
+ pg.setEmbeddingModel(mysql.getEmbeddingModel());
|
|
|
+ pg.setDimension(mysql.getDimension());
|
|
|
+ pg.setMaxLength(mysql.getMaxLength());
|
|
|
+ pg.setEnableSegment(mysql.getEnableSegment() != null ? mysql.getEnableSegment().shortValue() : null);
|
|
|
+ pg.setSegmentSize(mysql.getSegmentSize());
|
|
|
+ pg.setPriority(mysql.getPriority());
|
|
|
+ pg.setEnabled(mysql.getEnabled() != null ? mysql.getEnabled().shortValue() : null);
|
|
|
+ pg.setCreateTime(mysql.getCreateTime());
|
|
|
+ pg.setUpdateTime(mysql.getUpdateTime());
|
|
|
+ return pg;
|
|
|
+ }
|
|
|
+}
|