|
@@ -3,7 +3,6 @@ package com.tzld.videoVector.job;
|
|
|
import com.alibaba.fastjson.JSON;
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.alibaba.fastjson.JSONArray;
|
|
import com.alibaba.fastjson.JSONArray;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
-import com.aliyun.odps.data.Record;
|
|
|
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
import com.google.common.collect.Lists;
|
|
import com.google.common.collect.Lists;
|
|
|
import com.tzld.videoVector.api.AigcApiService;
|
|
import com.tzld.videoVector.api.AigcApiService;
|
|
@@ -11,11 +10,9 @@ import com.tzld.videoVector.api.VideoApiService;
|
|
|
import com.tzld.videoVector.common.constant.VectorConstants;
|
|
import com.tzld.videoVector.common.constant.VectorConstants;
|
|
|
import com.tzld.videoVector.dao.mapper.pgVector.DeconstructContentMapper;
|
|
import com.tzld.videoVector.dao.mapper.pgVector.DeconstructContentMapper;
|
|
|
import com.tzld.videoVector.dao.mapper.pgVector.DeconstructVectorConfigMapper;
|
|
import com.tzld.videoVector.dao.mapper.pgVector.DeconstructVectorConfigMapper;
|
|
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.ext.VideoDeconstructResultMapperExt;
|
|
|
import com.tzld.videoVector.model.entity.DeconstructResult;
|
|
import com.tzld.videoVector.model.entity.DeconstructResult;
|
|
|
-import com.tzld.videoVector.model.po.pgVector.DeconstructContent;
|
|
|
|
|
-import com.tzld.videoVector.model.po.pgVector.DeconstructContentExample;
|
|
|
|
|
-import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfig;
|
|
|
|
|
-import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfigExample;
|
|
|
|
|
|
|
+import com.tzld.videoVector.model.po.pgVector.*;
|
|
|
import com.tzld.videoVector.service.DeconstructService;
|
|
import com.tzld.videoVector.service.DeconstructService;
|
|
|
import com.tzld.videoVector.service.EmbeddingService;
|
|
import com.tzld.videoVector.service.EmbeddingService;
|
|
|
import com.tzld.videoVector.service.VectorStoreService;
|
|
import com.tzld.videoVector.service.VectorStoreService;
|
|
@@ -76,6 +73,9 @@ public class VideoVectorJob {
|
|
|
@Resource
|
|
@Resource
|
|
|
private RedisUtils redisUtils;
|
|
private RedisUtils redisUtils;
|
|
|
|
|
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private VideoDeconstructResultMapperExt videoDeconstructResultMapperExt;
|
|
|
|
|
+
|
|
|
/** 本次 Job 执行中已缓存 decode 的 videoId,避免多配置下重复写入 */
|
|
/** 本次 Job 执行中已缓存 decode 的 videoId,避免多配置下重复写入 */
|
|
|
private final Set<Long> decodeCachedInThisRun = ConcurrentHashMap.newKeySet();
|
|
private final Set<Long> decodeCachedInThisRun = ConcurrentHashMap.newKeySet();
|
|
|
|
|
|
|
@@ -162,7 +162,7 @@ public class VideoVectorJob {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 处理单个配置下的 raw_result 视频向量化
|
|
* 处理单个配置下的 raw_result 视频向量化
|
|
|
- * 包含:空text检查清理、过滤已有向量、流式查询ODPS并并发embedding
|
|
|
|
|
|
|
+ * 包含:空text检查清理、过滤已有向量、从本地DB查询解构结果并并发embedding
|
|
|
*/
|
|
*/
|
|
|
private void processConfigForRawResult(DeconstructVectorConfig config, List<Long> auditPassedIds,
|
|
private void processConfigForRawResult(DeconstructVectorConfig config, List<Long> auditPassedIds,
|
|
|
AtomicInteger totalSuccessCount, AtomicInteger totalFailCount) {
|
|
AtomicInteger totalSuccessCount, AtomicInteger totalFailCount) {
|
|
@@ -182,22 +182,23 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
|
|
|
- // 3. 流式查询并并发embedding
|
|
|
|
|
|
|
+ // 3. 从本地DB批量查询解构结果并并发embedding
|
|
|
ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
|
|
ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
|
|
|
Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
|
|
Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
|
|
|
List<Future<?>> futures = new ArrayList<>();
|
|
List<Future<?>> futures = new ArrayList<>();
|
|
|
|
|
|
|
|
try {
|
|
try {
|
|
|
for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
- String idsStr = batchIds.stream().map(String::valueOf).collect(Collectors.joining(","));
|
|
|
|
|
- String sql = String.format(
|
|
|
|
|
- "SELECT content_id, raw_result FROM videoods.content_profile " +
|
|
|
|
|
- "WHERE status = 3 AND is_deleted = 0 AND content_id IN (%s);", idsStr);
|
|
|
|
|
-
|
|
|
|
|
- OdpsUtil.getOdpsDataStream(sql, record ->
|
|
|
|
|
- submitEmbeddingTask(record, config, embedExecutor, inFlightLimiter, futures,
|
|
|
|
|
- totalSuccessCount, totalFailCount, "raw_result")
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ List<VideoDeconstructResult> results = videoDeconstructResultMapperExt
|
|
|
|
|
+ .selectResultsByVideoIds("result_json", batchIds);
|
|
|
|
|
+ for (VideoDeconstructResult r : results) {
|
|
|
|
|
+ if (!StringUtils.hasText(r.getResult())) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ submitLocalEmbeddingTask(r.getVideoId(), r.getResult(), config,
|
|
|
|
|
+ embedExecutor, inFlightLimiter, futures,
|
|
|
|
|
+ totalSuccessCount, totalFailCount, "raw_result");
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
} finally {
|
|
} finally {
|
|
|
awaitAndShutdown(futures, embedExecutor, 30, "embedding");
|
|
awaitAndShutdown(futures, embedExecutor, 30, "embedding");
|
|
@@ -208,15 +209,13 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 提交单条记录的 embedding 任务到线程池
|
|
|
|
|
|
|
+ * 提交单条记录的 embedding 任务到线程池(基于本地DB数据)
|
|
|
* 使用 Semaphore 控制在途并发数
|
|
* 使用 Semaphore 控制在途并发数
|
|
|
*/
|
|
*/
|
|
|
- private void submitEmbeddingTask(com.aliyun.odps.data.Record record, DeconstructVectorConfig config,
|
|
|
|
|
- ExecutorService executor, Semaphore inFlightLimiter,
|
|
|
|
|
- List<Future<?>> futures, AtomicInteger successCount,
|
|
|
|
|
- AtomicInteger failCount, String dataType) {
|
|
|
|
|
- Long videoId = Long.valueOf(record.getString(0));
|
|
|
|
|
- String rawData = record.getString(1);
|
|
|
|
|
|
|
+ private void submitLocalEmbeddingTask(Long videoId, String rawData, DeconstructVectorConfig config,
|
|
|
|
|
+ ExecutorService executor, Semaphore inFlightLimiter,
|
|
|
|
|
+ List<Future<?>> futures, AtomicInteger successCount,
|
|
|
|
|
+ AtomicInteger failCount, String dataType) {
|
|
|
if (videoId == null || !StringUtils.hasText(rawData)) {
|
|
if (videoId == null || !StringUtils.hasText(rawData)) {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
@@ -516,38 +515,20 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 分页查询 videoId 列表
|
|
|
|
|
|
|
+ * 分页查询 videoId 列表(从本地解构结果表查询 result_json 来源)
|
|
|
* @param pageNum 页码(从0开始)
|
|
* @param pageNum 页码(从0开始)
|
|
|
* @param pageSize 每页数量
|
|
* @param pageSize 每页数量
|
|
|
* @return videoId 列表
|
|
* @return videoId 列表
|
|
|
*/
|
|
*/
|
|
|
private List<Long> queryVideoIdsByPage(int pageNum, int pageSize) {
|
|
private List<Long> queryVideoIdsByPage(int pageNum, int pageSize) {
|
|
|
int offset = pageNum * pageSize;
|
|
int offset = pageNum * pageSize;
|
|
|
- String sql = String.format(
|
|
|
|
|
- "SELECT content_id " +
|
|
|
|
|
- "FROM videoods.content_profile " +
|
|
|
|
|
- "WHERE status = 3 AND is_deleted = 0 " +
|
|
|
|
|
- "ORDER BY content_id " +
|
|
|
|
|
- "LIMIT %d, %d;",
|
|
|
|
|
- offset, pageSize);
|
|
|
|
|
- List<Record> records = OdpsUtil.getOdpsData(sql);
|
|
|
|
|
- if (records == null || records.isEmpty()) {
|
|
|
|
|
- return new ArrayList<>();
|
|
|
|
|
- }
|
|
|
|
|
- List<Long> videoIds = new ArrayList<>();
|
|
|
|
|
- for (Record record : records) {
|
|
|
|
|
- Long contentId = Long.valueOf(record.getString(0));
|
|
|
|
|
- if (contentId != null) {
|
|
|
|
|
- videoIds.add(contentId);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- return videoIds;
|
|
|
|
|
|
|
+ return videoDeconstructResultMapperExt.selectVideoIdsBySourcePaged("result_json", offset, pageSize);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* AIGC 来源视频向量化任务
|
|
* AIGC 来源视频向量化任务
|
|
|
- * 从 AIGC API(id=46)拉取视频列表,经过向量存在性检查和审核过滤后,
|
|
|
|
|
- * 调用 detail 接口获取解构详情(dataContent),提取选题文本并向量化写入 Redis
|
|
|
|
|
|
|
+ * 从本地数据库读取 aigc_deconstruct 来源的解构结果,
|
|
|
|
|
+ * 经过向量存在性检查和审核过滤后,提取文本并向量化
|
|
|
*
|
|
*
|
|
|
* @param param 参数
|
|
* @param param 参数
|
|
|
* @return 执行结果
|
|
* @return 执行结果
|
|
@@ -563,54 +544,48 @@ public class VideoVectorJob {
|
|
|
return ReturnT.SUCCESS;
|
|
return ReturnT.SUCCESS;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 2. 从 AIGC API 获取任务输入列表(bizUniqueId 为视频 ID),循环所有配置的任务ID
|
|
|
|
|
- Map<Long, Long> videoIdToTaskInstanceId = new HashMap<>();
|
|
|
|
|
- for (Integer taskId : aigcDeconstructTaskIds) {
|
|
|
|
|
- List<AigcApiService.AigcTaskInput> taskInputList = aigcApiService.getTaskInputList(taskId);
|
|
|
|
|
- if (CollectionUtils.isEmpty(taskInputList)) {
|
|
|
|
|
- log.info("AIGC API taskId={} 未返回任务输入数据", taskId);
|
|
|
|
|
- continue;
|
|
|
|
|
|
|
+ // 2. 从本地DB分页查询 aigc_deconstruct 来源的 videoId
|
|
|
|
|
+ AtomicInteger totalSuccessCount = new AtomicInteger(0);
|
|
|
|
|
+ AtomicInteger totalFailCount = new AtomicInteger(0);
|
|
|
|
|
+ int pageNum = 0;
|
|
|
|
|
+
|
|
|
|
|
+ while (true) {
|
|
|
|
|
+ int offset = pageNum * VectorConstants.PAGE_SIZE;
|
|
|
|
|
+ List<Long> videoIds = videoDeconstructResultMapperExt.selectVideoIdsBySourcePaged(
|
|
|
|
|
+ "aigc_deconstruct", offset, VectorConstants.PAGE_SIZE);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(videoIds)) {
|
|
|
|
|
+ log.info("第 {} 页没有查询到数据,分页查询结束", pageNum);
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
- log.info("taskId={} 获取到 {} 条 AIGC 任务输入数据", taskId, taskInputList.size());
|
|
|
|
|
|
|
+ log.info("第 {} 页查询到 {} 个 videoId", pageNum, videoIds.size());
|
|
|
|
|
|
|
|
- // 构建 videoId -> taskInstanceId 映射
|
|
|
|
|
- for (AigcApiService.AigcTaskInput input : taskInputList) {
|
|
|
|
|
- try {
|
|
|
|
|
- Long videoId = Long.parseLong(input.getBizUniqueId());
|
|
|
|
|
- videoIdToTaskInstanceId.put(videoId, input.getTaskInstanceId());
|
|
|
|
|
- } catch (NumberFormatException e) {
|
|
|
|
|
- log.warn("bizUniqueId 格式非法,跳过: {}", input.getBizUniqueId());
|
|
|
|
|
|
|
+ // 3. 审核过滤
|
|
|
|
|
+ List<Long> auditPassedIds = filterAuditPassedIds(videoIds);
|
|
|
|
|
+ if (auditPassedIds.isEmpty()) {
|
|
|
|
|
+ log.info("第 {} 页所有视频均未通过审核,跳过", pageNum);
|
|
|
|
|
+ if (videoIds.size() < VectorConstants.PAGE_SIZE) {
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
|
|
+ pageNum++;
|
|
|
|
|
+ continue;
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
- if (videoIdToTaskInstanceId.isEmpty()) {
|
|
|
|
|
- log.info("无有效 videoId,任务结束");
|
|
|
|
|
- return ReturnT.SUCCESS;
|
|
|
|
|
- }
|
|
|
|
|
- List<Long> allVideoIds = new ArrayList<>(videoIdToTaskInstanceId.keySet());
|
|
|
|
|
- log.info("共 {} 个有效 videoId(来自 {} 个任务)", allVideoIds.size(), aigcDeconstructTaskIds.size());
|
|
|
|
|
-
|
|
|
|
|
- // 4. 先进行审核过滤(只过滤一次,避免在 config 循环内重复调用)
|
|
|
|
|
- List<Long> auditPassedIds = filterAuditPassedIds(allVideoIds);
|
|
|
|
|
- if (auditPassedIds.isEmpty()) {
|
|
|
|
|
- log.info("所有视频均未通过审核,任务结束");
|
|
|
|
|
- return ReturnT.SUCCESS;
|
|
|
|
|
- }
|
|
|
|
|
- log.info("审核通过 {} 个视频", auditPassedIds.size());
|
|
|
|
|
|
|
+ log.info("第 {} 页审核通过 {} 个视频", pageNum, auditPassedIds.size());
|
|
|
|
|
|
|
|
- AtomicInteger totalSuccessCount = new AtomicInteger(0);
|
|
|
|
|
- AtomicInteger totalFailCount = new AtomicInteger(0);
|
|
|
|
|
|
|
+ // 4. 对每个配置并发处理
|
|
|
|
|
+ ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
|
|
|
|
|
+ List<Future<?>> configFutures = new ArrayList<>();
|
|
|
|
|
+ for (DeconstructVectorConfig config : configs) {
|
|
|
|
|
+ configFutures.add(configExecutor.submit(() ->
|
|
|
|
|
+ processConfigForAigc(config, auditPassedIds, totalSuccessCount, totalFailCount)
|
|
|
|
|
+ ));
|
|
|
|
|
+ }
|
|
|
|
|
+ awaitAndShutdown(configFutures, configExecutor, 30, "配置并发");
|
|
|
|
|
|
|
|
- // 5. 对每个配置并发处理
|
|
|
|
|
- ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
|
|
|
|
|
- List<Future<?>> configFutures = new ArrayList<>();
|
|
|
|
|
- for (DeconstructVectorConfig config : configs) {
|
|
|
|
|
- configFutures.add(configExecutor.submit(() ->
|
|
|
|
|
- processConfigForAigc(config, auditPassedIds, videoIdToTaskInstanceId,
|
|
|
|
|
- totalSuccessCount, totalFailCount)
|
|
|
|
|
- ));
|
|
|
|
|
|
|
+ if (videoIds.size() < VectorConstants.PAGE_SIZE) {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ pageNum++;
|
|
|
}
|
|
}
|
|
|
- awaitAndShutdown(configFutures, configExecutor, 30, "配置并发");
|
|
|
|
|
|
|
|
|
|
log.info("AIGC 来源视频向量化任务完成,总成功: {}, 总失败: {}", totalSuccessCount.get(), totalFailCount.get());
|
|
log.info("AIGC 来源视频向量化任务完成,总成功: {}, 总失败: {}", totalSuccessCount.get(), totalFailCount.get());
|
|
|
return ReturnT.SUCCESS;
|
|
return ReturnT.SUCCESS;
|
|
@@ -622,10 +597,9 @@ public class VideoVectorJob {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 处理单个配置下的 AIGC 视频向量化
|
|
* 处理单个配置下的 AIGC 视频向量化
|
|
|
- * 包含:空text检查清理、过滤已有向量、并发调用detail接口并向量化
|
|
|
|
|
|
|
+ * 包含:空text检查清理、过滤已有向量、从本地DB查询解构结果并向量化
|
|
|
*/
|
|
*/
|
|
|
private void processConfigForAigc(DeconstructVectorConfig config, List<Long> auditPassedIds,
|
|
private void processConfigForAigc(DeconstructVectorConfig config, List<Long> auditPassedIds,
|
|
|
- Map<Long, Long> videoIdToTaskInstanceId,
|
|
|
|
|
AtomicInteger totalSuccessCount, AtomicInteger totalFailCount) {
|
|
AtomicInteger totalSuccessCount, AtomicInteger totalFailCount) {
|
|
|
String configCode = config.getConfigCode();
|
|
String configCode = config.getConfigCode();
|
|
|
try {
|
|
try {
|
|
@@ -643,62 +617,61 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
|
|
|
- // 3. 并发调用 detail 接口,提取文本并向量化
|
|
|
|
|
- ExecutorService detailExecutor = Executors.newFixedThreadPool(VectorConstants.AIGC_DETAIL_PARALLELISM);
|
|
|
|
|
- List<Future<?>> detailFutures = new ArrayList<>();
|
|
|
|
|
- for (Long videoId : needProcessIds) {
|
|
|
|
|
- detailFutures.add(detailExecutor.submit(() ->
|
|
|
|
|
- processAigcSingleVideo(videoId, config, videoIdToTaskInstanceId,
|
|
|
|
|
- totalSuccessCount, totalFailCount)
|
|
|
|
|
- ));
|
|
|
|
|
- }
|
|
|
|
|
- awaitAndShutdown(detailFutures, detailExecutor, 30, "AIGC detail");
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /**
|
|
|
|
|
- * 处理单个 AIGC 视频的向量化
|
|
|
|
|
- */
|
|
|
|
|
- private void processAigcSingleVideo(Long videoId, DeconstructVectorConfig config,
|
|
|
|
|
- Map<Long, Long> videoIdToTaskInstanceId,
|
|
|
|
|
- AtomicInteger successCount, AtomicInteger failCount) {
|
|
|
|
|
- String configCode = config.getConfigCode();
|
|
|
|
|
- try {
|
|
|
|
|
- Long taskInstanceId = videoIdToTaskInstanceId.get(videoId);
|
|
|
|
|
- if (taskInstanceId == null) {
|
|
|
|
|
- log.warn("videoId={} 无对应 taskInstanceId,跳过", videoId);
|
|
|
|
|
- failCount.incrementAndGet();
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
|
|
|
|
|
- if (dataContent == null) {
|
|
|
|
|
- log.warn("videoId={} taskInstanceId={} 获取 dataContent 失败,跳过", videoId, taskInstanceId);
|
|
|
|
|
- failCount.incrementAndGet();
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // 尝试缓存 decode 结果
|
|
|
|
|
- tryCacheDecodeResult(videoId, dataContent);
|
|
|
|
|
|
|
+ // 3. 从本地DB批量查询解构结果并并发embedding
|
|
|
|
|
+ ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
|
|
|
|
|
+ Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
|
|
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
|
|
|
|
|
- List<String> texts = extractTextsFromDataContent(dataContent, config);
|
|
|
|
|
- if (CollectionUtils.isEmpty(texts)) {
|
|
|
|
|
- log.info("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
|
|
|
|
|
- failCount.incrementAndGet();
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ try {
|
|
|
|
|
+ for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
|
|
+ List<VideoDeconstructResult> results = videoDeconstructResultMapperExt
|
|
|
|
|
+ .selectResultsByVideoIds("aigc_deconstruct", batchIds);
|
|
|
|
|
+ for (VideoDeconstructResult r : results) {
|
|
|
|
|
+ if (!StringUtils.hasText(r.getResult())) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ Long videoId = r.getVideoId();
|
|
|
|
|
+ JSONObject dataContent = JSON.parseObject(r.getResult());
|
|
|
|
|
+ if (dataContent == null) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ // 尝试缓存 decode 结果
|
|
|
|
|
+ tryCacheDecodeResult(videoId, dataContent);
|
|
|
|
|
|
|
|
- int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
- if (storeCount > 0) {
|
|
|
|
|
- successCount.incrementAndGet();
|
|
|
|
|
- } else {
|
|
|
|
|
- failCount.incrementAndGet();
|
|
|
|
|
|
|
+ try {
|
|
|
|
|
+ inFlightLimiter.acquire();
|
|
|
|
|
+ } catch (InterruptedException e) {
|
|
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ futures.add(embedExecutor.submit(() -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ List<String> texts = extractTextsFromDataContent(dataContent, config);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(texts)) {
|
|
|
|
|
+ log.info("videoId={} 配置 {} 未提取到选题文本,跳过", videoId, configCode);
|
|
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ int storeCount = vectorizeAndStore(config, videoId, texts);
|
|
|
|
|
+ if (storeCount > 0) {
|
|
|
|
|
+ totalSuccessCount.incrementAndGet();
|
|
|
|
|
+ } else {
|
|
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ inFlightLimiter.release();
|
|
|
|
|
+ }
|
|
|
|
|
+ }));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ } finally {
|
|
|
|
|
+ awaitAndShutdown(futures, embedExecutor, 30, "AIGC embedding");
|
|
|
}
|
|
}
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- log.error("处理 videoId={} 配置 {} 时发生异常: {}", videoId, configCode, e.getMessage(), e);
|
|
|
|
|
- failCount.incrementAndGet();
|
|
|
|
|
|
|
+ log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -1013,8 +986,7 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 分页查询 result_log 视频池中的 videoId
|
|
|
|
|
- * 条件:播放量>10000 且有 result_log 记录(dt > 20240001)
|
|
|
|
|
|
|
+ * 分页查询 result_log 视频池中的 videoId(从本地解构结果表查询)
|
|
|
*
|
|
*
|
|
|
* @param pageNum 页码(从0开始)
|
|
* @param pageNum 页码(从0开始)
|
|
|
* @param pageSize 每页数量
|
|
* @param pageSize 每页数量
|
|
@@ -1022,33 +994,12 @@ public class VideoVectorJob {
|
|
|
*/
|
|
*/
|
|
|
private List<Long> queryResultLogVideoIdsByPage(int pageNum, int pageSize) {
|
|
private List<Long> queryResultLogVideoIdsByPage(int pageNum, int pageSize) {
|
|
|
int offset = pageNum * pageSize;
|
|
int offset = pageNum * pageSize;
|
|
|
- String sql = String.format(
|
|
|
|
|
- "SELECT v.id " +
|
|
|
|
|
- "FROM videoods.wx_video v " +
|
|
|
|
|
- "INNER JOIN loghubods.result_log r " +
|
|
|
|
|
- "ON v.id = r.video_id " +
|
|
|
|
|
- "WHERE v.play_count_total > 10000 " +
|
|
|
|
|
- "AND r.dt > 20240001 " +
|
|
|
|
|
- "ORDER BY v.id " +
|
|
|
|
|
- "LIMIT %d, %d;",
|
|
|
|
|
- offset, pageSize);
|
|
|
|
|
- List<Record> records = OdpsUtil.getOdpsData(sql);
|
|
|
|
|
- if (records == null || records.isEmpty()) {
|
|
|
|
|
- return new ArrayList<>();
|
|
|
|
|
- }
|
|
|
|
|
- List<Long> videoIds = new ArrayList<>();
|
|
|
|
|
- for (Record record : records) {
|
|
|
|
|
- Long videoId = Long.valueOf(record.getString(0));
|
|
|
|
|
- if (videoId != null) {
|
|
|
|
|
- videoIds.add(videoId);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- return videoIds;
|
|
|
|
|
|
|
+ return videoDeconstructResultMapperExt.selectVideoIdsBySourcePaged("result_log", offset, pageSize);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 处理单个配置下的 result_log 视频向量化
|
|
* 处理单个配置下的 result_log 视频向量化
|
|
|
- * 包含:空text检查清理、过滤已有向量、流式查询ODPS并并发embedding
|
|
|
|
|
|
|
+ * 包含:空text检查清理、过滤已有向量、从本地DB查询解构结果并并发embedding
|
|
|
*/
|
|
*/
|
|
|
private void processConfigForResultLog(DeconstructVectorConfig config, List<Long> auditPassedIds,
|
|
private void processConfigForResultLog(DeconstructVectorConfig config, List<Long> auditPassedIds,
|
|
|
AtomicInteger totalSuccessCount, AtomicInteger totalFailCount) {
|
|
AtomicInteger totalSuccessCount, AtomicInteger totalFailCount) {
|
|
@@ -1068,22 +1019,23 @@ public class VideoVectorJob {
|
|
|
}
|
|
}
|
|
|
log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
log.info("配置 {} 需要处理 {} 个视频", configCode, needProcessIds.size());
|
|
|
|
|
|
|
|
- // 3. 流式查询并并发embedding
|
|
|
|
|
|
|
+ // 3. 从本地DB批量查询解构结果并并发embedding
|
|
|
ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
|
|
ExecutorService embedExecutor = Executors.newFixedThreadPool(VectorConstants.EMBEDDING_PARALLELISM);
|
|
|
Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
|
|
Semaphore inFlightLimiter = new Semaphore(VectorConstants.MAX_EMBEDDING_IN_FLIGHT);
|
|
|
List<Future<?>> futures = new ArrayList<>();
|
|
List<Future<?>> futures = new ArrayList<>();
|
|
|
|
|
|
|
|
try {
|
|
try {
|
|
|
for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
for (List<Long> batchIds : Lists.partition(needProcessIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
- String idsStr = batchIds.stream().map(String::valueOf).collect(Collectors.joining(","));
|
|
|
|
|
- String sql = String.format(
|
|
|
|
|
- "SELECT video_id, data FROM loghubods.result_log " +
|
|
|
|
|
- "WHERE video_id IN (%s) AND dt > 20240001;", idsStr);
|
|
|
|
|
-
|
|
|
|
|
- OdpsUtil.getOdpsDataStream(sql, record ->
|
|
|
|
|
- submitEmbeddingTask(record, config, embedExecutor, inFlightLimiter, futures,
|
|
|
|
|
- totalSuccessCount, totalFailCount, "result_log")
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ List<VideoDeconstructResult> results = videoDeconstructResultMapperExt
|
|
|
|
|
+ .selectResultsByVideoIds("result_log", batchIds);
|
|
|
|
|
+ for (VideoDeconstructResult r : results) {
|
|
|
|
|
+ if (!StringUtils.hasText(r.getResult())) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ submitLocalEmbeddingTask(r.getVideoId(), r.getResult(), config,
|
|
|
|
|
+ embedExecutor, inFlightLimiter, futures,
|
|
|
|
|
+ totalSuccessCount, totalFailCount, "result_log");
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
} finally {
|
|
} finally {
|
|
|
awaitAndShutdown(futures, embedExecutor, 30, "embedding");
|
|
awaitAndShutdown(futures, embedExecutor, 30, "embedding");
|
|
@@ -1197,6 +1149,276 @@ public class VideoVectorJob {
|
|
|
return false;
|
|
return false;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // ========================== 解构结果同步任务 ==========================
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 同步视频解构结果到本地数据库
|
|
|
|
|
+ * 从三种来源(result_json / aigc_deconstruct / result_log)拉取解构数据,
|
|
|
|
|
+ * 检查该来源是否已有记录,无则插入
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param param 参数
|
|
|
|
|
+ * @return 执行结果
|
|
|
|
|
+ */
|
|
|
|
|
+ @XxlJob("syncDeconstructResultJob")
|
|
|
|
|
+ public ReturnT<String> syncDeconstructResultJob(String param) {
|
|
|
|
|
+ log.info("开始执行解构结果同步任务, param: {}", param);
|
|
|
|
|
+
|
|
|
|
|
+ AtomicInteger totalInsertCount = new AtomicInteger(0);
|
|
|
|
|
+ AtomicInteger totalSkipCount = new AtomicInteger(0);
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 三个来源并发同步
|
|
|
|
|
+ ExecutorService syncExecutor = Executors.newFixedThreadPool(3);
|
|
|
|
|
+ List<Future<?>> syncFutures = new ArrayList<>();
|
|
|
|
|
+
|
|
|
|
|
+ syncFutures.add(syncExecutor.submit(() -> syncResultJsonSource(totalInsertCount, totalSkipCount)));
|
|
|
|
|
+ syncFutures.add(syncExecutor.submit(() -> syncAigcDeconstructSource(totalInsertCount, totalSkipCount)));
|
|
|
|
|
+ syncFutures.add(syncExecutor.submit(() -> syncResultLogSource(totalInsertCount, totalSkipCount)));
|
|
|
|
|
+
|
|
|
|
|
+ awaitAndShutdown(syncFutures, syncExecutor, 60, "解构结果同步");
|
|
|
|
|
+
|
|
|
|
|
+ log.info("解构结果同步任务完成,总插入: {}, 总跳过: {}", totalInsertCount.get(), totalSkipCount.get());
|
|
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("解构结果同步任务执行失败: {}", e.getMessage(), e);
|
|
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "任务执行失败: " + e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 同步 result_json 来源:从 ODPS content_profile 拉取 raw_result
|
|
|
|
|
+ */
|
|
|
|
|
+ private void syncResultJsonSource(AtomicInteger insertCount, AtomicInteger skipCount) {
|
|
|
|
|
+ log.info("开始同步 result_json 来源");
|
|
|
|
|
+ int pageNum = 0;
|
|
|
|
|
+
|
|
|
|
|
+ while (true) {
|
|
|
|
|
+ List<Long> videoIds = queryOdpsVideoIdsByPage(pageNum, VectorConstants.PAGE_SIZE);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(videoIds)) {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 检查哪些已存在
|
|
|
|
|
+ Set<Long> existingIds = new HashSet<>(videoDeconstructResultMapperExt.selectExistingVideoIds("result_json", videoIds));
|
|
|
|
|
+ List<Long> needSyncIds = videoIds.stream()
|
|
|
|
|
+ .filter(id -> !existingIds.contains(id))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ skipCount.addAndGet(existingIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ if (!needSyncIds.isEmpty()) {
|
|
|
|
|
+ // 分批查询 ODPS 并插入
|
|
|
|
|
+ for (List<Long> batchIds : Lists.partition(needSyncIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
|
|
+ String idsStr = batchIds.stream().map(String::valueOf).collect(Collectors.joining(","));
|
|
|
|
|
+ String sql = String.format(
|
|
|
|
|
+ "SELECT content_id, raw_result FROM videoods.content_profile " +
|
|
|
|
|
+ "WHERE status = 3 AND is_deleted = 0 AND content_id IN (%s);", idsStr);
|
|
|
|
|
+
|
|
|
|
|
+ List<VideoDeconstructResult> batch = new ArrayList<>();
|
|
|
|
|
+ OdpsUtil.getOdpsDataStream(sql, record -> {
|
|
|
|
|
+ String videoIdStr = record.getString(0);
|
|
|
|
|
+ String rawResult = record.getString(1);
|
|
|
|
|
+ if (StringUtils.hasText(videoIdStr) && StringUtils.hasText(rawResult)
|
|
|
|
|
+ && !"\\N".equals(rawResult)) {
|
|
|
|
|
+ VideoDeconstructResult r = new VideoDeconstructResult();
|
|
|
|
|
+ r.setVideoId(Long.valueOf(videoIdStr));
|
|
|
|
|
+ r.setSource("result_json");
|
|
|
|
|
+ r.setResult(rawResult);
|
|
|
|
|
+ batch.add(r);
|
|
|
|
|
+ if (batch.size() >= 200) {
|
|
|
|
|
+ insertCount.addAndGet(videoDeconstructResultMapperExt.batchInsertIgnore(new ArrayList<>(batch)));
|
|
|
|
|
+ batch.clear();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
|
|
+ insertCount.addAndGet(videoDeconstructResultMapperExt.batchInsertIgnore(batch));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (videoIds.size() < VectorConstants.PAGE_SIZE) {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ pageNum++;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("result_json 来源同步完成");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 同步 aigc_deconstruct 来源:从 AIGC API 拉取 dataContent
|
|
|
|
|
+ */
|
|
|
|
|
+ private void syncAigcDeconstructSource(AtomicInteger insertCount, AtomicInteger skipCount) {
|
|
|
|
|
+ log.info("开始同步 aigc_deconstruct 来源");
|
|
|
|
|
+
|
|
|
|
|
+ // 从 AIGC API 获取任务输入列表
|
|
|
|
|
+ Map<Long, Long> videoIdToTaskInstanceId = new HashMap<>();
|
|
|
|
|
+ for (Integer taskId : aigcDeconstructTaskIds) {
|
|
|
|
|
+ List<AigcApiService.AigcTaskInput> taskInputList = aigcApiService.getTaskInputList(taskId);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(taskInputList)) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ for (AigcApiService.AigcTaskInput input : taskInputList) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ Long videoId = Long.parseLong(input.getBizUniqueId());
|
|
|
|
|
+ videoIdToTaskInstanceId.put(videoId, input.getTaskInstanceId());
|
|
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
|
|
+ // 跳过非法格式
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (videoIdToTaskInstanceId.isEmpty()) {
|
|
|
|
|
+ log.info("aigc_deconstruct 来源无有效数据");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ List<Long> allVideoIds = new ArrayList<>(videoIdToTaskInstanceId.keySet());
|
|
|
|
|
+ // 分批检查已存在的
|
|
|
|
|
+ for (List<Long> batchIds : Lists.partition(allVideoIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
|
|
+ Set<Long> existingIds = new HashSet<>(videoDeconstructResultMapperExt.selectExistingVideoIds("aigc_deconstruct", batchIds));
|
|
|
|
|
+ skipCount.addAndGet(existingIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ List<Long> needSyncIds = batchIds.stream()
|
|
|
|
|
+ .filter(id -> !existingIds.contains(id))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+
|
|
|
|
|
+ // 并发调用 detail 接口
|
|
|
|
|
+ ExecutorService executor = Executors.newFixedThreadPool(VectorConstants.AIGC_DETAIL_PARALLELISM);
|
|
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
|
|
+ List<VideoDeconstructResult> batch = Collections.synchronizedList(new ArrayList<>());
|
|
|
|
|
+
|
|
|
|
|
+ for (Long videoId : needSyncIds) {
|
|
|
|
|
+ futures.add(executor.submit(() -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ Long taskInstanceId = videoIdToTaskInstanceId.get(videoId);
|
|
|
|
|
+ if (taskInstanceId == null) return;
|
|
|
|
|
+ JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
|
|
|
|
|
+ if (dataContent != null) {
|
|
|
|
|
+ VideoDeconstructResult r = new VideoDeconstructResult();
|
|
|
|
|
+ r.setVideoId(videoId);
|
|
|
|
|
+ r.setSource("aigc_deconstruct");
|
|
|
|
|
+ r.setResult(dataContent.toJSONString());
|
|
|
|
|
+ batch.add(r);
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.warn("同步 aigc videoId={} 失败: {}", videoId, e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ }));
|
|
|
|
|
+ }
|
|
|
|
|
+ awaitAndShutdown(futures, executor, 30, "aigc同步");
|
|
|
|
|
+
|
|
|
|
|
+ // 批量插入
|
|
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
|
|
+ for (List<VideoDeconstructResult> subBatch : Lists.partition(batch, 200)) {
|
|
|
|
|
+ insertCount.addAndGet(videoDeconstructResultMapperExt.batchInsertIgnore(subBatch));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("aigc_deconstruct 来源同步完成");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 同步 result_log 来源:从 ODPS result_log 拉取 data
|
|
|
|
|
+ */
|
|
|
|
|
+ private void syncResultLogSource(AtomicInteger insertCount, AtomicInteger skipCount) {
|
|
|
|
|
+ log.info("开始同步 result_log 来源");
|
|
|
|
|
+ int pageNum = 0;
|
|
|
|
|
+
|
|
|
|
|
+ while (true) {
|
|
|
|
|
+ List<Long> videoIds = queryOdpsResultLogVideoIdsByPage(pageNum, VectorConstants.PAGE_SIZE);
|
|
|
|
|
+ if (CollectionUtils.isEmpty(videoIds)) {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 检查哪些已存在
|
|
|
|
|
+ Set<Long> existingIds = new HashSet<>(videoDeconstructResultMapperExt.selectExistingVideoIds("result_log", videoIds));
|
|
|
|
|
+ List<Long> needSyncIds = videoIds.stream()
|
|
|
|
|
+ .filter(id -> !existingIds.contains(id))
|
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
+ skipCount.addAndGet(existingIds.size());
|
|
|
|
|
+
|
|
|
|
|
+ if (!needSyncIds.isEmpty()) {
|
|
|
|
|
+ for (List<Long> batchIds : Lists.partition(needSyncIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
|
|
+ String idsStr = batchIds.stream().map(String::valueOf).collect(Collectors.joining(","));
|
|
|
|
|
+ String sql = String.format(
|
|
|
|
|
+ "SELECT video_id, data FROM loghubods.result_log " +
|
|
|
|
|
+ "WHERE video_id IN (%s) AND dt > 20240001;", idsStr);
|
|
|
|
|
+
|
|
|
|
|
+ List<VideoDeconstructResult> batch = new ArrayList<>();
|
|
|
|
|
+ OdpsUtil.getOdpsDataStream(sql, record -> {
|
|
|
|
|
+ String videoIdStr = record.getString(0);
|
|
|
|
|
+ String data = record.getString(1);
|
|
|
|
|
+ if (StringUtils.hasText(videoIdStr) && StringUtils.hasText(data)
|
|
|
|
|
+ && !"\\N".equals(data)) {
|
|
|
|
|
+ VideoDeconstructResult r = new VideoDeconstructResult();
|
|
|
|
|
+ r.setVideoId(Long.valueOf(videoIdStr));
|
|
|
|
|
+ r.setSource("result_log");
|
|
|
|
|
+ r.setResult(data);
|
|
|
|
|
+ batch.add(r);
|
|
|
|
|
+ if (batch.size() >= 200) {
|
|
|
|
|
+ insertCount.addAndGet(videoDeconstructResultMapperExt.batchInsertIgnore(new ArrayList<>(batch)));
|
|
|
|
|
+ batch.clear();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
|
|
+ insertCount.addAndGet(videoDeconstructResultMapperExt.batchInsertIgnore(batch));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (videoIds.size() < VectorConstants.PAGE_SIZE) {
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+ pageNum++;
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("result_log 来源同步完成");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // ========================== ODPS 查询(仅同步任务使用) ==========================
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 从 ODPS 分页查询 content_profile 的 videoId(仅同步任务使用)
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<Long> queryOdpsVideoIdsByPage(int pageNum, int pageSize) {
|
|
|
|
|
+ int offset = pageNum * pageSize;
|
|
|
|
|
+ String sql = String.format(
|
|
|
|
|
+ "SELECT content_id FROM videoods.content_profile " +
|
|
|
|
|
+ "WHERE status = 3 AND is_deleted = 0 ORDER BY content_id LIMIT %d, %d;",
|
|
|
|
|
+ offset, pageSize);
|
|
|
|
|
+ List<com.aliyun.odps.data.Record> records = OdpsUtil.getOdpsData(sql);
|
|
|
|
|
+ if (records == null || records.isEmpty()) {
|
|
|
|
|
+ return new ArrayList<>();
|
|
|
|
|
+ }
|
|
|
|
|
+ List<Long> videoIds = new ArrayList<>();
|
|
|
|
|
+ for (com.aliyun.odps.data.Record record : records) {
|
|
|
|
|
+ videoIds.add(Long.valueOf(record.getString(0)));
|
|
|
|
|
+ }
|
|
|
|
|
+ return videoIds;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 从 ODPS 分页查询 result_log 视频池的 videoId(仅同步任务使用)
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<Long> queryOdpsResultLogVideoIdsByPage(int pageNum, int pageSize) {
|
|
|
|
|
+ int offset = pageNum * pageSize;
|
|
|
|
|
+ String sql = String.format(
|
|
|
|
|
+ "SELECT v.id FROM videoods.wx_video v " +
|
|
|
|
|
+ "INNER JOIN loghubods.result_log r ON v.id = r.video_id " +
|
|
|
|
|
+ "WHERE v.play_count_total > 10000 AND r.dt > 20240001 " +
|
|
|
|
|
+ "ORDER BY v.id LIMIT %d, %d;",
|
|
|
|
|
+ offset, pageSize);
|
|
|
|
|
+ List<com.aliyun.odps.data.Record> records = OdpsUtil.getOdpsData(sql);
|
|
|
|
|
+ if (records == null || records.isEmpty()) {
|
|
|
|
|
+ return new ArrayList<>();
|
|
|
|
|
+ }
|
|
|
|
|
+ List<Long> videoIds = new ArrayList<>();
|
|
|
|
|
+ for (com.aliyun.odps.data.Record record : records) {
|
|
|
|
|
+ videoIds.add(Long.valueOf(record.getString(0)));
|
|
|
|
|
+ }
|
|
|
|
|
+ return videoIds;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// ========================== Decode 缓存相关方法 ==========================
|
|
// ========================== Decode 缓存相关方法 ==========================
|
|
|
|
|
|
|
|
/**
|
|
/**
|