فهرست منبع

processChannelConfig 执行优化

wangyunpeng 1 روز پیش
والد
کامیت
d0e71b99c7
1فایلهای تغییر یافته به همراه126 افزوده شده و 72 حذف شده
  1. 126 72
      core/src/main/java/com/tzld/videoVector/job/ChannelDemandMatchJob.java

+ 126 - 72
core/src/main/java/com/tzld/videoVector/job/ChannelDemandMatchJob.java

@@ -149,7 +149,8 @@ public class ChannelDemandMatchJob {
     }
 
     /**
-     * 处理单个渠道配置:查询ODPS需求数据 → 写入待匹配记录 → 逐条匹配
+     * 处理单个渠道配置:查询ODPS需求数据 → 边查边匹配(流水线化)
+     * 优化:ODPS流式读取与向量匹配并行执行,减少总体等待时间
      */
     private void processChannelConfig(ChannelDemandMatchConfig config, String dt,
                                       AtomicInteger totalDemands, AtomicInteger totalMatched, AtomicInteger totalFailed) {
@@ -173,88 +174,143 @@ public class ChannelDemandMatchJob {
         String sql = buildDemandSql(config, dt, minUv, minRov);
         log.info("查询ODPS需求, 渠道: {}, sql长度: {}", channelName, sql.length());
 
-        List<ChannelDemandMatchResult> demandResults = new ArrayList<>();
+        // 3. 流水线化:ODPS流式读取 → BlockingQueue → 匹配线程池消费
+        BlockingQueue<ChannelDemandMatchResult> demandQueue = new LinkedBlockingQueue<>(200);
+        // 毒丸对象,标记生产者结束
+        ChannelDemandMatchResult poisonPill = new ChannelDemandMatchResult();
+        AtomicInteger producedCount = new AtomicInteger(0);
 
-        OdpsUtil.getOdpsDataStream(sql, record -> {
+        // 生产者:ODPS流式读取,逐条放入队列
+        CompletableFuture<Void> producer = CompletableFuture.runAsync(() -> {
             try {
-                ChannelDemandMatchResult result = new ChannelDemandMatchResult();
-                result.setConfigId(config.getId());
-                result.setDt(dt);
-                result.setChannelName(record.getString("人群_渠道"));
-                result.setOnlineAction(record.getString("线上应用动作"));
-                result.setMatchExperimentId(record.getString("匹配实验id"));
-                result.setDemandId(record.getString("需求id"));
-                result.setCrowdSegment(record.getString("人群细分"));
-                result.setCrowdPackage(record.getString("人群包"));
-                result.setConversionTarget(record.getString("转化目标"));
-                result.setPartner(record.getString("合作方"));
-                result.setAccount(record.getString("账号"));
-                result.setSceneValue(record.getString("场景值"));
-                result.setDemandStrategy(record.getString("需求策略"));
-                result.setDriveDimensionTime(record.getString("驱动维度_时间"));
-                result.setDimension(record.getString("驱动维度_空间"));
-                result.setDemandFilterSortStrategy(record.getString("需求筛选排序策略"));
-                result.setDemandType(record.getString("需求类型"));
-                result.setDemandContentId(record.getString("需求内容id"));
-                result.setDemandContentTitle(record.getString("需求内容标题"));
-                result.setDemandContentTopic(record.getString("需求内容选题"));
-                result.setPointType(record.getString("需求特征点类型"));
-                result.setStandardElement(record.getString("需求特征点"));
-                result.setCategoryName(record.getString("需求特征类"));
-                result.setChannelLevel3(record.getString("三级渠道"));
-
-                // 统计指标
-                Double crowdCount = safeGetDouble(record, "人群总数量");
-                if (Objects.nonNull(crowdCount)) {
-                    result.setCrowdCount(crowdCount.intValue());
+                OdpsUtil.getOdpsDataStream(sql, record -> {
+                    try {
+                        ChannelDemandMatchResult result = parseDemandRecord(record, config, dt);
+                        demandQueue.put(result); // 队列满时阻塞,形成背压
+                        producedCount.incrementAndGet();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        log.error("ODPS生产者被中断, 渠道: {}", channelName);
+                    } catch (Exception e) {
+                        log.error("解析ODPS需求记录失败: {}", e.getMessage());
+                    }
+                });
+            } finally {
+                // 放入毒丸标记结束
+                try {
+                    demandQueue.put(poisonPill);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                 }
-                result.setVisitUv(safeGetLong(record, "群体访问uv"));
-                result.setUvRatio(safeGetDouble(record, "uv占比"));
-                result.setVideoCount(safeGetInt(record, "内容数量"));
-                result.setTotalRov(safeGetDouble(record, "效率值"));
+            }
+        }, CONFIG_EXECUTOR);
 
-                result.setMatchStatus((short) 0); // 待匹配
+        // 消费者:匹配线程池从队列中取需求并执行向量匹配
+        ExecutorService matchExecutor = Executors.newFixedThreadPool(MATCH_THREAD_POOL_SIZE);
+        List<Future<?>> matchFutures = new ArrayList<>();
 
-                synchronized (demandResults) {
-                    demandResults.add(result);
+        try {
+            while (true) {
+                ChannelDemandMatchResult demand = demandQueue.poll(5, TimeUnit.SECONDS);
+                if (demand == null) {
+                    // 超时检查生产者是否已完成
+                    if (producer.isDone()) {
+                        // 排空队列中剩余元素
+                        demand = demandQueue.poll();
+                        if (demand == null) {
+                            break;
+                        }
+                    } else {
+                        continue;
+                    }
+                }
+                if (demand == poisonPill) {
+                    break; // 生产者结束标记
                 }
-            } catch (Exception e) {
-                log.error("解析ODPS需求记录失败: {}", e.getMessage());
-            }
-        });
-
-        log.info("渠道 {} 查询到 {} 条需求", channelName, demandResults.size());
-        totalDemands.addAndGet(demandResults.size());
 
-        if (demandResults.isEmpty()) {
-            return;
+                totalDemands.incrementAndGet();
+                ChannelDemandMatchResult finalDemand = demand;
+                Future<?> future = matchExecutor.submit(() -> {
+                    try {
+                        matchDemandAndSave(finalDemand, topN);
+                        totalMatched.incrementAndGet();
+                    } catch (Exception e) {
+                        log.error("匹配需求失败, pointType={}, element={}: {}",
+                                finalDemand.getPointType(), finalDemand.getStandardElement(), e.getMessage());
+                        finalDemand.setMatchStatus((short) 3);
+                        resultMapper.insertSelective(finalDemand);
+                        totalFailed.incrementAndGet();
+                    }
+                });
+                matchFutures.add(future);
+            }
+        } catch (InterruptedException e) {
+            log.error("渠道 {} 消费者被中断", channelName);
+            Thread.currentThread().interrupt();
         }
 
-        // 3. 并发执行向量匹配并写入结果
-        ExecutorService executor = Executors.newFixedThreadPool(MATCH_THREAD_POOL_SIZE);
-        for (ChannelDemandMatchResult demand : demandResults) {
-            executor.submit(() -> {
-                try {
-                    matchDemandAndSave(demand, topN);
-                    totalMatched.incrementAndGet();
-                } catch (Exception e) {
-                    log.error("匹配需求失败, pointType={}, element={}: {}",
-                            demand.getPointType(), demand.getStandardElement(), e.getMessage());
-                    // 写入一条失败记录
-                    demand.setMatchStatus((short) 3);
-                    resultMapper.insertSelective(demand);
-                    totalFailed.incrementAndGet();
-                }
-            });
-        }
-        executor.shutdown();
+        // 等待所有匹配任务完成
+        matchExecutor.shutdown();
         try {
-            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+            matchExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             log.error("渠道 {} 匹配任务被中断", channelName);
-            executor.shutdownNow();
+            matchExecutor.shutdownNow();
             Thread.currentThread().interrupt();
         }
+
+        // 等待生产者确认完成(正常情况下此时已完成)
+        try {
+            producer.get(1, TimeUnit.MINUTES);
+        } catch (Exception e) {
+            log.error("渠道 {} ODPS生产者异常: {}", channelName, e.getMessage());
+        }
+
+        log.info("渠道 {} 流水线处理完成, 生产: {} 条需求", channelName, producedCount.get());
+    }
+
+    /**
+     * 解析ODPS单条需求记录
+     */
+    private ChannelDemandMatchResult parseDemandRecord(Record record, ChannelDemandMatchConfig config, String dt) {
+        ChannelDemandMatchResult result = new ChannelDemandMatchResult();
+        result.setConfigId(config.getId());
+        result.setDt(dt);
+        result.setChannelName(record.getString("人群_渠道"));
+        result.setOnlineAction(record.getString("线上应用动作"));
+        result.setMatchExperimentId(record.getString("匹配实验id"));
+        result.setDemandId(record.getString("需求id"));
+        result.setCrowdSegment(record.getString("人群细分"));
+        result.setCrowdPackage(record.getString("人群包"));
+        result.setConversionTarget(record.getString("转化目标"));
+        result.setPartner(record.getString("合作方"));
+        result.setAccount(record.getString("账号"));
+        result.setSceneValue(record.getString("场景值"));
+        result.setDemandStrategy(record.getString("需求策略"));
+        result.setDriveDimensionTime(record.getString("驱动维度_时间"));
+        result.setDimension(record.getString("驱动维度_空间"));
+        result.setDemandFilterSortStrategy(record.getString("需求筛选排序策略"));
+        result.setDemandType(record.getString("需求类型"));
+        result.setDemandContentId(record.getString("需求内容id"));
+        result.setDemandContentTitle(record.getString("需求内容标题"));
+        result.setDemandContentTopic(record.getString("需求内容选题"));
+        result.setPointType(record.getString("需求特征点类型"));
+        result.setStandardElement(record.getString("需求特征点"));
+        result.setCategoryName(record.getString("需求特征类"));
+        result.setChannelLevel3(record.getString("三级渠道"));
+
+        // 统计指标
+        Double crowdCount = safeGetDouble(record, "人群总数量");
+        if (Objects.nonNull(crowdCount)) {
+            result.setCrowdCount(crowdCount.intValue());
+        }
+        result.setVisitUv(safeGetLong(record, "群体访问uv"));
+        result.setUvRatio(safeGetDouble(record, "uv占比"));
+        result.setVideoCount(safeGetInt(record, "内容数量"));
+        result.setTotalRov(safeGetDouble(record, "效率值"));
+
+        result.setMatchStatus((short) 0); // 待匹配
+        return result;
     }
 
     /**
@@ -291,9 +347,7 @@ public class ChannelDemandMatchJob {
             return;
         }
 
-        for (List<ChannelDemandMatchResult> partition : Lists.partition(allBatchRows, 1000)) {
-            resultMapperExt.batchInsert(partition);
-        }
+        resultMapperExt.batchInsert(allBatchRows);
         log.info("需求匹配完成, channelName={}, demandId={}, 写入{}条视频",
                 demand.getChannelName(), demand.getDemandId(), allBatchRows.size());
     }