Forráskód Böngészése

需求匹配增加分发维度 不同维度过滤比例配置

wangyunpeng 2 napja
szülő
commit
c711fcecf4

+ 139 - 100
core/src/main/java/com/tzld/videoVector/job/ChannelDemandMatchJob.java

@@ -15,6 +15,7 @@ import com.tzld.videoVector.service.VideoSearchService;
 import com.tzld.videoVector.util.OdpsUtil;
 import com.tzld.videoVector.util.RedisUtils;
 import com.tzld.videoVector.util.VectorUtils;
+import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
 import com.alibaba.fastjson.JSON;
@@ -119,6 +120,14 @@ public class ChannelDemandMatchJob {
     @Value("${channel.demand.dimension-stat.min-uv-ratio:0.002}")
     private double dimensionStatMinUvRatio;
 
+    /**
+     * 渠道维度过滤配置:按渠道+维度指定topN过滤比例
+     * 格式: {"渠道A": {"增长的头部": 0.25, "传播的头部": 0.3}}
+     * key=channelName, innerKey=dimension, innerValue=topRatio
+     */
+    @ApolloJsonValue("${channel.demand.filter.top-rov.config:{}}")
+    private Map<String, Map<String, Double>> topRovFilterConfig;
+
     /**
      * 多渠道配置并发执行线程池
      */
@@ -266,123 +275,86 @@ public class ChannelDemandMatchJob {
             return;
         }
 
-        // 2. 构造ODPS SQL并查询需求数据
-        String sql = buildDemandSql(config, dt, minUv, minRov);
-        log.info("查询ODPS需求, 渠道: {}, sql长度: {}", channelName, sql.length());
+        // 2. 解析空间维度,逐维度查询并处理(查一批处理一批,避免内存积压)
+        List<String> dimensions = parseJsonArray(config.getFilterDimensions());
+        if (CollectionUtils.isEmpty(dimensions)) {
+            // 未配置维度过滤时,传null表示不限制维度
+            dimensions = Collections.singletonList(null);
+        }
 
-        // 3. ODPS流式读取 → 队列收集(无界队列,避免生产者阻塞)
-        BlockingQueue<ChannelDemandMatchResult> demandQueue = new LinkedBlockingQueue<>();
-        // 毒丸对象,标记生产者结束
-        ChannelDemandMatchResult poisonPill = new ChannelDemandMatchResult();
         AtomicInteger producedCount = new AtomicInteger(0);
+        int totalFilteredCount = 0;
+        for (String dimension : dimensions) {
+            // 2a. 查询单个维度的需求数据
+            List<ChannelDemandMatchResult> dimResults = queryDemandsForDimension(config, dt, minUv, minRov, dimension, channelName, producedCount);
 
-        // 生产者:独立线程执行ODPS流式读取
-        Thread producerThread = new Thread(() -> {
-            try {
-                OdpsUtil.getOdpsDataStream(sql, record -> {
+            // 2b. 对该维度数据执行分组过滤
+            List<ChannelDemandMatchResult> filteredDemands = filterTopRovByGroup(dimResults, channelName);
+            log.info("渠道 {} dimension {} 分组过滤后 {} -> {} 条", channelName, dimension, dimResults.size(), filteredDemands.size());
+
+            if (filteredDemands.isEmpty()) {
+                continue;
+            }
+            totalFilteredCount += filteredDemands.size();
+
+            // 2c. 并发匹配该批过滤后的需求,完成后释放引用进入下一维度
+            ExecutorService matchExecutor = Executors.newFixedThreadPool(MATCH_THREAD_POOL_SIZE);
+            List<Future<?>> matchFutures = new ArrayList<>();
+            for (ChannelDemandMatchResult demand : filteredDemands) {
+                totalDemands.incrementAndGet();
+                ChannelDemandMatchResult finalDemand = demand;
+                Future<?> future = matchExecutor.submit(() -> {
                     try {
-                        ChannelDemandMatchResult result = parseDemandRecord(record, config, dt);
-                        demandQueue.put(result);
-                        producedCount.incrementAndGet();
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                        log.error("ODPS生产者被中断, 渠道: {}", channelName);
+                        matchDemandAndSave(finalDemand, topN);
+                        totalMatched.incrementAndGet();
                     } catch (Exception e) {
-                        log.error("解析ODPS需求记录失败: {}", e.getMessage());
+                        log.error("匹配需求失败, pointType={}, element={}: {}",
+                                finalDemand.getPointType(), finalDemand.getStandardElement(), e.getMessage());
+                        finalDemand.setMatchStatus((short) 3);
+                        resultMapper.insertSelective(finalDemand);
+                        totalFailed.incrementAndGet();
                     }
                 });
-            } finally {
-                // 放入毒丸标记结束
-                try {
-                    demandQueue.put(poisonPill);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
+                matchFutures.add(future);
             }
-        }, "odps-producer-" + channelName);
-        producerThread.setDaemon(true);
-        producerThread.start();
 
-        // 4. 等待生产者完成,将队列中所有需求收集到列表
-        List<ChannelDemandMatchResult> allDemands = new ArrayList<>();
-        try {
-            while (true) {
-                ChannelDemandMatchResult demand = demandQueue.take();
-                if (demand == poisonPill) {
-                    break;
-                }
-                allDemands.add(demand);
+            matchExecutor.shutdown();
+            try {
+                matchExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                log.error("渠道 {} dimension {} 匹配任务被中断", channelName, dimension);
+                matchExecutor.shutdownNow();
+                Thread.currentThread().interrupt();
             }
-        } catch (InterruptedException e) {
-            log.error("渠道 {} 收集需求被中断", channelName);
-            Thread.currentThread().interrupt();
-        }
-
-        try {
-            producerThread.join(60_000);
-        } catch (InterruptedException e) {
-            log.error("渠道 {} 等待ODPS生产者超时", channelName);
-            Thread.currentThread().interrupt();
-        }
-
-        log.info("渠道 {} 收集完成, 共 {} 条需求", channelName, allDemands.size());
-
-        // 5. 按 (crowdSegment, standardElement, dimension) 分组,按 totalRov 倒序,每组取前 25%
-        List<ChannelDemandMatchResult> filteredDemands = filterTopRovByGroup(allDemands);
-        log.info("渠道 {} 分组按totalRov倒序取前25%后剩余 {} 条需求", channelName, filteredDemands.size());
-
-        if (filteredDemands.isEmpty()) {
-            return;
-        }
-
-        // 6. 并发匹配过滤后的需求
-        ExecutorService matchExecutor = Executors.newFixedThreadPool(MATCH_THREAD_POOL_SIZE);
-        List<Future<?>> matchFutures = new ArrayList<>();
-        for (ChannelDemandMatchResult demand : filteredDemands) {
-            totalDemands.incrementAndGet();
-            ChannelDemandMatchResult finalDemand = demand;
-            Future<?> future = matchExecutor.submit(() -> {
-                try {
-                    matchDemandAndSave(finalDemand, topN);
-                    totalMatched.incrementAndGet();
-                } catch (Exception e) {
-                    log.error("匹配需求失败, pointType={}, element={}: {}",
-                            finalDemand.getPointType(), finalDemand.getStandardElement(), e.getMessage());
-                    finalDemand.setMatchStatus((short) 3);
-                    resultMapper.insertSelective(finalDemand);
-                    totalFailed.incrementAndGet();
-                }
-            });
-            matchFutures.add(future);
-        }
-
-        // 等待所有匹配任务完成
-        matchExecutor.shutdown();
-        try {
-            matchExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-            log.error("渠道 {} 匹配任务被中断", channelName);
-            matchExecutor.shutdownNow();
-            Thread.currentThread().interrupt();
         }
 
-        log.info("渠道 {} 处理完成, 生产: {} 条, 实际匹配: {} 条", channelName, producedCount.get(), filteredDemands.size());
+        log.info("渠道 {} 处理完成, 生产: {} 条, 实际匹配: {} 条", channelName, producedCount.get(), totalFilteredCount);
     }
 
     /**
-     * 仅对 dimension = '增长的头部' 的需求执行:按 (crowdSegment, channelLevel3, dimension) 分组,
-     * 按 totalRov 倒序,每组取前 25%(向上取整,至少1条);其他 dimension 的数据原样保留。
+     * 按Apollo配置对指定渠道+维度的需求执行分组过滤:按 (crowdSegment, channelLevel3, dimension) 分组,
+     * 按 totalRov 倒序,每组取配置比例(向上取整,至少1条);未配置的维度原样保留。
+     *
+     * @param demands     待过滤的需求列表
+     * @param channelName 渠道名称,用于匹配Apollo配置
      */
-    private static final String DIMENSION_GROWTH_HEAD = "增长的头部";
-
-    private List<ChannelDemandMatchResult> filterTopRovByGroup(List<ChannelDemandMatchResult> demands) {
+    private List<ChannelDemandMatchResult> filterTopRovByGroup(List<ChannelDemandMatchResult> demands, String channelName) {
         if (CollectionUtils.isEmpty(demands)) {
             return Collections.emptyList();
         }
+        // 获取该渠道的维度→比例配置
+        Map<String, Double> channelFilterConfig = topRovFilterConfig != null
+                ? topRovFilterConfig.getOrDefault(channelName, Collections.emptyMap())
+                : Collections.emptyMap();
+
+        if (channelFilterConfig.isEmpty()) {
+            return new ArrayList<>(demands);
+        }
+
         List<ChannelDemandMatchResult> toFilter = new ArrayList<>();
         List<ChannelDemandMatchResult> keepAsIs = new ArrayList<>();
         for (ChannelDemandMatchResult d : demands) {
-            if (DIMENSION_GROWTH_HEAD.equals(d.getDimension())) {
+            if (channelFilterConfig.containsKey(d.getDimension())) {
                 toFilter.add(d);
             } else {
                 keepAsIs.add(d);
@@ -406,7 +378,10 @@ public class ChannelDemandMatchJob {
                 double rovB = b.getTotalRov() != null ? b.getTotalRov() : 0.0;
                 return Double.compare(rovB, rovA);
             });
-            int topCount = Math.max(1, (int) Math.ceil(group.size() * 0.25));
+            // 从分组中取任一元素的dimension获取配置比例
+            String dim = group.get(0).getDimension();
+            double ratio = channelFilterConfig.getOrDefault(dim, 0.25);
+            int topCount = Math.max(5, (int) Math.ceil(group.size() * ratio));
             result.addAll(group.subList(0, Math.min(topCount, group.size())));
         }
         return result;
@@ -840,10 +815,73 @@ public class ChannelDemandMatchJob {
         return sb.toString();
     }
 
+    /**
+     * 对单个维度执行ODPS查询并返回需求列表
+     * @param dimension 单个维度值,传null表示不加维度过滤
+     */
+    private List<ChannelDemandMatchResult> queryDemandsForDimension(ChannelDemandMatchConfig config, String dt,
+                                                                     int minUv, double minRov, String dimension,
+                                                                     String channelName, AtomicInteger producedCount) {
+        String sql = buildDemandSql(config, dt, minUv, minRov, dimension);
+        log.info("查询ODPS需求, 渠道: {}, dimension: {}, sql长度: {}", channelName, dimension, sql.length());
+
+        BlockingQueue<ChannelDemandMatchResult> demandQueue = new LinkedBlockingQueue<>();
+        ChannelDemandMatchResult poisonPill = new ChannelDemandMatchResult();
+
+        Thread producerThread = new Thread(() -> {
+            try {
+                OdpsUtil.getOdpsDataStream(sql, record -> {
+                    try {
+                        ChannelDemandMatchResult result = parseDemandRecord(record, config, dt);
+                        demandQueue.put(result);
+                        producedCount.incrementAndGet();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        log.error("ODPS生产者被中断, 渠道: {}, dimension: {}", channelName, dimension);
+                    } catch (Exception e) {
+                        log.error("解析ODPS需求记录失败: {}", e.getMessage());
+                    }
+                });
+            } finally {
+                try {
+                    demandQueue.put(poisonPill);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }, "odps-producer-" + channelName + "-" + dimension);
+        producerThread.setDaemon(true);
+        producerThread.start();
+
+        List<ChannelDemandMatchResult> results = new ArrayList<>();
+        try {
+            while (true) {
+                ChannelDemandMatchResult demand = demandQueue.take();
+                if (demand == poisonPill) {
+                    break;
+                }
+                results.add(demand);
+            }
+        } catch (InterruptedException e) {
+            log.error("渠道 {} dimension {} 收集需求被中断", channelName, dimension);
+            Thread.currentThread().interrupt();
+        }
+
+        try {
+            producerThread.join(60_000);
+        } catch (InterruptedException e) {
+            log.error("渠道 {} dimension {} 等待ODPS生产者超时", channelName, dimension);
+            Thread.currentThread().interrupt();
+        }
+
+        log.info("渠道 {} dimension {} 查询到 {} 条需求", channelName, dimension, results.size());
+        return results;
+    }
+
     /**
      * 构造ODPS查询需求SQL(返回所有字段)
      */
-    private String buildDemandSql(ChannelDemandMatchConfig config, String dt, int minUv, double minRov) {
+    private String buildDemandSql(ChannelDemandMatchConfig config, String dt, int minUv, double minRov, String dimension) {
         StringBuilder sb = new StringBuilder();
         sb.append("SELECT 日期");
         sb.append(",线上应用动作");
@@ -893,9 +931,10 @@ public class ChannelDemandMatchJob {
         if (minRov > 0) {
             sb.append(" AND 效率值 >= ").append(minRov);
         }
-        // 空间维度过滤
-        List<String> dimensions = parseJsonArray(config.getFilterDimensions());
-        appendInCondition(sb, "驱动维度_空间", dimensions);
+        // 空间维度过滤:单个维度精确匹配
+        if (dimension != null) {
+            sb.append(" AND 驱动维度_空间 = '").append(dimension.replace("'", "''")).append("'");
+        }
         // 时间维度过滤
         List<String> timeDimensions = parseJsonArray(config.getFilterTimeDimensions());
         appendInCondition(sb, "驱动维度_时间", timeDimensions);