Prechádzať zdrojové kódy

增加 增长的头部

wangyunpeng 6 dní pred
rodič
commit
3274366b18

+ 92 - 48
core/src/main/java/com/tzld/videoVector/job/ChannelDemandMatchJob.java

@@ -66,9 +66,9 @@ public class ChannelDemandMatchJob {
     private static final String RECALL_CACHE_PREFIX = "channel_demand:recall:";
 
     /**
-     * 召回结果缓存过期时间(秒),默认2小时
+     * 召回结果缓存过期时间(秒)
      */
-    private static final long RECALL_CACHE_EXPIRE = 2 * 60 * 60;
+    private static final long RECALL_CACHE_EXPIRE = 6 * 60 * 60;
 
     /**
      * 需求匹配并发线程数(单个渠道内部的需求匹配)
@@ -216,19 +216,19 @@ public class ChannelDemandMatchJob {
         String sql = buildDemandSql(config, dt, minUv, minRov);
         log.info("查询ODPS需求, 渠道: {}, sql长度: {}", channelName, sql.length());
 
-        // 3. 流水线化:ODPS流式读取 → BlockingQueue → 匹配线程池消费
-        BlockingQueue<ChannelDemandMatchResult> demandQueue = new LinkedBlockingQueue<>(200);
+        // 3. ODPS流式读取 → 队列收集(无界队列,避免生产者阻塞)
+        BlockingQueue<ChannelDemandMatchResult> demandQueue = new LinkedBlockingQueue<>();
         // 毒丸对象,标记生产者结束
         ChannelDemandMatchResult poisonPill = new ChannelDemandMatchResult();
         AtomicInteger producedCount = new AtomicInteger(0);
 
-        // 生产者:独立线程执行ODPS流式读取(避免与消费者共用CONFIG_EXECUTOR导致线程饥饿死锁)
+        // 生产者:独立线程执行ODPS流式读取
         Thread producerThread = new Thread(() -> {
             try {
                 OdpsUtil.getOdpsDataStream(sql, record -> {
                     try {
                         ChannelDemandMatchResult result = parseDemandRecord(record, config, dt);
-                        demandQueue.put(result); // 队列满时阻塞,形成背压
+                        demandQueue.put(result);
                         producedCount.incrementAndGet();
                     } catch (InterruptedException e) {
                         Thread.currentThread().interrupt();
@@ -249,50 +249,59 @@ public class ChannelDemandMatchJob {
         producerThread.setDaemon(true);
         producerThread.start();
 
-        // 消费者:匹配线程池从队列中取需求并执行向量匹配
-        ExecutorService matchExecutor = Executors.newFixedThreadPool(MATCH_THREAD_POOL_SIZE);
-        List<Future<?>> matchFutures = new ArrayList<>();
-
+        // 4. 等待生产者完成,将队列中所有需求收集到列表
+        List<ChannelDemandMatchResult> allDemands = new ArrayList<>();
         try {
             while (true) {
-                ChannelDemandMatchResult demand = demandQueue.poll(5, TimeUnit.SECONDS);
-                if (demand == null) {
-                    // 超时检查生产者线程是否已结束
-                    if (!producerThread.isAlive()) {
-                        // 排空队列中剩余元素
-                        demand = demandQueue.poll();
-                        if (demand == null) {
-                            break;
-                        }
-                    } else {
-                        continue;
-                    }
-                }
+                ChannelDemandMatchResult demand = demandQueue.take();
                 if (demand == poisonPill) {
-                    break; // 生产者结束标记
+                    break;
                 }
-
-                totalDemands.incrementAndGet();
-                ChannelDemandMatchResult finalDemand = demand;
-                Future<?> future = matchExecutor.submit(() -> {
-                    try {
-                        matchDemandAndSave(finalDemand, topN);
-                        totalMatched.incrementAndGet();
-                    } catch (Exception e) {
-                        log.error("匹配需求失败, pointType={}, element={}: {}",
-                                finalDemand.getPointType(), finalDemand.getStandardElement(), e.getMessage());
-                        finalDemand.setMatchStatus((short) 3);
-                        resultMapper.insertSelective(finalDemand);
-                        totalFailed.incrementAndGet();
-                    }
-                });
-                matchFutures.add(future);
+                allDemands.add(demand);
             }
         } catch (InterruptedException e) {
-            log.error("渠道 {} 消费者被中断", channelName);
+            log.error("渠道 {} 收集需求被中断", channelName);
             Thread.currentThread().interrupt();
         }
 
+        try {
+            producerThread.join(60_000);
+        } catch (InterruptedException e) {
+            log.error("渠道 {} 等待ODPS生产者超时", channelName);
+            Thread.currentThread().interrupt();
+        }
+
+        log.info("渠道 {} 收集完成, 共 {} 条需求", channelName, allDemands.size());
+
+        // 5. 按 (crowdSegment, standardElement, dimension) 分组,按 totalRov 倒序,每组取前 25%
+        List<ChannelDemandMatchResult> filteredDemands = filterTopRovByGroup(allDemands);
+        log.info("渠道 {} 分组按totalRov倒序取前25%后剩余 {} 条需求", channelName, filteredDemands.size());
+
+        if (filteredDemands.isEmpty()) {
+            return;
+        }
+
+        // 6. 并发匹配过滤后的需求
+        ExecutorService matchExecutor = Executors.newFixedThreadPool(MATCH_THREAD_POOL_SIZE);
+        List<Future<?>> matchFutures = new ArrayList<>();
+        for (ChannelDemandMatchResult demand : filteredDemands) {
+            totalDemands.incrementAndGet();
+            ChannelDemandMatchResult finalDemand = demand;
+            Future<?> future = matchExecutor.submit(() -> {
+                try {
+                    matchDemandAndSave(finalDemand, topN);
+                    totalMatched.incrementAndGet();
+                } catch (Exception e) {
+                    log.error("匹配需求失败, pointType={}, element={}: {}",
+                            finalDemand.getPointType(), finalDemand.getStandardElement(), e.getMessage());
+                    finalDemand.setMatchStatus((short) 3);
+                    resultMapper.insertSelective(finalDemand);
+                    totalFailed.incrementAndGet();
+                }
+            });
+            matchFutures.add(future);
+        }
+
         // 等待所有匹配任务完成
         matchExecutor.shutdown();
         try {
@@ -303,15 +312,50 @@ public class ChannelDemandMatchJob {
             Thread.currentThread().interrupt();
         }
 
-        // 等待生产者线程确认完成(正常情况下此时已完成)
-        try {
-            producerThread.join(60_000);
-        } catch (InterruptedException e) {
-            log.error("渠道 {} 等待ODPS生产者超时", channelName);
-            Thread.currentThread().interrupt();
+        log.info("渠道 {} 处理完成, 生产: {} 条, 实际匹配: {} 条", channelName, producedCount.get(), filteredDemands.size());
+    }
+
+    /**
+     * 仅对 dimension = '增长的头部' 的需求执行:按 (crowdSegment, channelLevel3, dimension) 分组,
+     * 按 totalRov 倒序,每组取前 25%(向上取整,至少1条);其他 dimension 的数据原样保留。
+     */
+    private static final String DIMENSION_GROWTH_HEAD = "增长的头部";
+
+    private List<ChannelDemandMatchResult> filterTopRovByGroup(List<ChannelDemandMatchResult> demands) {
+        if (CollectionUtils.isEmpty(demands)) {
+            return Collections.emptyList();
+        }
+        List<ChannelDemandMatchResult> toFilter = new ArrayList<>();
+        List<ChannelDemandMatchResult> keepAsIs = new ArrayList<>();
+        for (ChannelDemandMatchResult d : demands) {
+            if (DIMENSION_GROWTH_HEAD.equals(d.getDimension())) {
+                toFilter.add(d);
+            } else {
+                keepAsIs.add(d);
+            }
         }
 
-        log.info("渠道 {} 流水线处理完成, 生产: {} 条需求", channelName, producedCount.get());
+        List<ChannelDemandMatchResult> result = new ArrayList<>(keepAsIs);
+
+        if (toFilter.isEmpty()) {
+            return result;
+        }
+
+        Map<String, List<ChannelDemandMatchResult>> grouped = toFilter.stream()
+                .collect(Collectors.groupingBy(d ->
+                        nullToEmpty(d.getCrowdSegment()) + "|"
+                                + nullToEmpty(d.getChannelLevel3()) + "|"
+                                + nullToEmpty(d.getDimension())));
+        for (List<ChannelDemandMatchResult> group : grouped.values()) {
+            group.sort((a, b) -> {
+                double rovA = a.getTotalRov() != null ? a.getTotalRov() : 0.0;
+                double rovB = b.getTotalRov() != null ? b.getTotalRov() : 0.0;
+                return Double.compare(rovB, rovA);
+            });
+            int topCount = Math.max(1, (int) Math.ceil(group.size() * 0.25));
+            result.addAll(group.subList(0, Math.min(topCount, group.size())));
+        }
+        return result;
     }
 
     /**

+ 1 - 1
core/src/main/java/com/tzld/videoVector/service/impl/VideoSearchServiceImpl.java

@@ -723,7 +723,7 @@ public class VideoSearchServiceImpl implements VideoSearchService {
         // 等待所有配置搜索完成并汇总
         for (CompletableFuture<List<VideoMatchResult>> future : futures) {
             try {
-                result.addAll(future.get(30, TimeUnit.SECONDS));
+                result.addAll(future.get(60, TimeUnit.SECONDS));
             } catch (TimeoutException e) {
                 log.error("配置搜索超时: {}", e.getMessage());
             } catch (Exception e) {