Explorar el Código

syncVideoDetailJob 优化

wangyunpeng hace 5 días
padre
commit
59c64db30b

+ 72 - 39
core/src/main/java/com/tzld/videoVector/job/VideoDetailSyncJob.java

@@ -18,6 +18,10 @@ import java.time.LocalDate;
 import java.time.format.DateTimeFormatter;
 import java.util.*;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -41,7 +45,7 @@ public class VideoDetailSyncJob {
     private RedisUtils redisUtils;
 
     /** ODPS SQL IN 子句批次大小 */
-    private static final int ODPS_BATCH_SIZE = 1000;
+    private static final int ODPS_BATCH_SIZE = 5000;
 
     // ========================== 维度字段(取最新日期) ==========================
 
@@ -128,7 +132,7 @@ public class VideoDetailSyncJob {
 
     /**
      * 处理单批次视频详情的 ODPS 查询与 Redis 写入
-     * DIMENSION_FIELDS 仅在 365d 时查询一次,写入所有日期维度的 Redis key
+     * 优化:维度查询与所有指标查询并行执行(P1+P3),Redis 写入改为 Pipeline 批量提交(P2)
      */
     private void processBatchVideoDetail(List<Long> batchIds, String dtMax,
                                          AtomicInteger totalSuccess, AtomicInteger totalFail,
@@ -138,60 +142,89 @@ public class VideoDetailSyncJob {
                     .map(String::valueOf)
                     .collect(Collectors.joining(","));
 
-            // 1. 查询维度字段(仅用 365 天范围查询一次,结果写入所有 key)
             String dtMinFor365 = LocalDate.now().minusDays(365).format(DateTimeFormatter.BASIC_ISO_DATE);
-            Map<Long, JSONObject> dimensionMap = queryDimensionData(idsStr, dtMinFor365, dtMax);
-            log.info("批次 {}-{} 维度查询完成(365d范围),获取 {} 条记录", batchStart, batchEnd, dimensionMap.size());
-
-            // 2. 每个日期维度单独查询指标,合并维度字段后写入对应 Redis key
-            for (int days : VectorConstants.SUPPORTED_DATE_RANGES) {
-                String dtMinForMetrics = LocalDate.now().minusDays(days).format(DateTimeFormatter.BASIC_ISO_DATE);
-                String metricsSql = buildMetricsSql(idsStr, dtMinForMetrics, dtMax);
-
-                // 收集该维度的指标数据
-                Map<Long, JSONObject> metricsMap = new HashMap<>();
-                OdpsUtil.getOdpsDataStream(metricsSql, record -> {
-                    try {
-                        Long videoId = record.getBigint("视频id");
-                        if (videoId != null) {
-                            metricsMap.put(videoId, buildVideoDetail(record));
-                        }
-                    } catch (Exception e) {
-                        log.error("处理{}天指标记录失败: {}", days, e.getMessage());
-                    }
-                });
 
-                // 合并维度字段(来自365d查询) + 该维度指标,写入 Redis key
-                Set<Long> videoIdsForDay = new HashSet<>();
-                videoIdsForDay.addAll(dimensionMap.keySet());
-                videoIdsForDay.addAll(metricsMap.keySet());
+            // 线程池大小 = 1个维度查询 + N个指标查询
+            int parallelism = VectorConstants.SUPPORTED_DATE_RANGES.size() + 1;
+            ExecutorService executor = Executors.newFixedThreadPool(parallelism);
+
+            try {
+                // P3: 维度查询与指标查询并行执行(而非先等维度再串行跑6个指标)
+                CompletableFuture<Map<Long, JSONObject>> dimFuture = CompletableFuture
+                        .supplyAsync(() -> queryDimensionData(idsStr, dtMinFor365, dtMax), executor)
+                        .exceptionally(ex -> {
+                            log.error("批次 {}-{} 维度查询失败: {}", batchStart, batchEnd, ex.getMessage());
+                            return new HashMap<>();
+                        });
+
+                // P1: 6个指标查询(3/7/15/30/180/365天)并行执行
+                Map<Integer, CompletableFuture<Map<Long, JSONObject>>> metricsFutures = new LinkedHashMap<>();
+                for (int days : VectorConstants.SUPPORTED_DATE_RANGES) {
+                    String dtMinForMetrics = LocalDate.now().minusDays(days).format(DateTimeFormatter.BASIC_ISO_DATE);
+                    String metricsSql = buildMetricsSql(idsStr, dtMinForMetrics, dtMax);
+                    metricsFutures.put(days, CompletableFuture
+                            .supplyAsync(() -> {
+                                Map<Long, JSONObject> m = new HashMap<>();
+                                OdpsUtil.getOdpsDataStream(metricsSql, record -> {
+                                    try {
+                                        Long videoId = record.getBigint("视频id");
+                                        if (videoId != null) {
+                                            m.put(videoId, buildVideoDetail(record));
+                                        }
+                                    } catch (Exception e) {
+                                        log.error("处理{}天指标记录失败: {}", days, e.getMessage());
+                                    }
+                                });
+                                return m;
+                            }, executor)
+                            .exceptionally(ex -> {
+                                log.error("批次 {}-{} {}天指标查询失败: {}", batchStart, batchEnd, days, ex.getMessage());
+                                return new HashMap<>();
+                            }));
+                }
+
+                // 等待维度查询结果
+                Map<Long, JSONObject> dimensionMap = dimFuture.join();
+                log.info("批次 {}-{} 维度查询完成(365d范围),获取 {} 条记录", batchStart, batchEnd, dimensionMap.size());
 
-                for (Long videoId : videoIdsForDay) {
-                    try {
+                // P2: 收集所有待写入 Redis 的 key-value,后续 Pipeline 批量提交
+                Map<String, String> redisKeyValues = new LinkedHashMap<>();
+
+                for (Map.Entry<Integer, CompletableFuture<Map<Long, JSONObject>>> entry : metricsFutures.entrySet()) {
+                    int days = entry.getKey();
+                    Map<Long, JSONObject> metricsMap = entry.getValue().join();
+
+                    Set<Long> videoIdsForDay = new HashSet<>();
+                    videoIdsForDay.addAll(dimensionMap.keySet());
+                    videoIdsForDay.addAll(metricsMap.keySet());
+
+                    for (Long videoId : videoIdsForDay) {
                         JSONObject detail = new JSONObject();
-                        // 写入维度字段(365d查询结果复用到所有key)
                         JSONObject dimension = dimensionMap.get(videoId);
                         if (dimension != null) {
                             detail.putAll(dimension);
                         }
-                        // 写入该日期维度的指标数据
                         JSONObject metrics = metricsMap.get(videoId);
                         if (metrics != null) {
                             detail.putAll(metrics);
                         }
                         if (!detail.isEmpty()) {
-                            // key格式: video:detail:3d:12345
                             String redisKey = VectorConstants.VIDEO_DETAIL_DAYS_KEY_PREFIX + days + "d:" + videoId;
-                            redisUtils.set(redisKey, detail.toJSONString(),
-                                    VectorConstants.VIDEO_DETAIL_EXPIRE_SECONDS);
-                            totalSuccess.incrementAndGet();
+                            redisKeyValues.put(redisKey, detail.toJSONString());
                         }
-                    } catch (Exception e) {
-                        log.error("写入Redis失败,days={}, videoId={}: {}", days, videoId, e.getMessage());
-                        totalFail.incrementAndGet();
                     }
+                    log.info("批次 {}-{} {}天指标查询完成,待写入 {} 条", batchStart, batchEnd, days, videoIdsForDay.size());
                 }
-                log.info("批次 {}-{} {}天指标查询并写入完成,处理 {} 条", batchStart, batchEnd, days, videoIdsForDay.size());
+
+                // P2: Pipeline 批量写入 Redis(单次网络往返,替代逐条 SET)
+                if (!redisKeyValues.isEmpty()) {
+                    redisUtils.batchSetWithExpire(redisKeyValues, VectorConstants.VIDEO_DETAIL_EXPIRE_SECONDS);
+                    totalSuccess.addAndGet(redisKeyValues.size());
+                    log.info("批次 {}-{} Pipeline Redis 写入完成,共 {} 条", batchStart, batchEnd, redisKeyValues.size());
+                }
+
+            } finally {
+                executor.shutdown();
             }
 
         } catch (Exception e) {

+ 20 - 0
core/src/main/java/com/tzld/videoVector/util/RedisUtils.java

@@ -12,6 +12,7 @@ import org.springframework.util.CollectionUtils;
 
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -258,4 +259,23 @@ public class RedisUtils {
     public List<String> mGet(List<String> keys) {
         return redisTemplate.opsForValue().multiGet(keys);
     }
+
+    /**
+     * Pipeline 批量写入 key-value 并设置过期时间
+     *
+     * @param keyValues     key -> value 映射
+     * @param expireSeconds 过期时间(秒)
+     */
+    public void batchSetWithExpire(Map<String, String> keyValues, long expireSeconds) {
+        if (keyValues == null || keyValues.isEmpty()) {
+            return;
+        }
+        RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
+        redisTemplate.executePipelined((RedisCallback<Object>) redisConnection -> {
+            keyValues.forEach((key, value) -> {
+                redisConnection.setEx(key.getBytes(), expireSeconds, value.getBytes());
+            });
+            return null;
+        }, serializer);
+    }
 }