yaodaoseng 1 هفته پیش
والد
کامیت
5b0fbad145

+ 82 - 4
recommend-feature-client/src/main/java/com/tzld/piaoquan/recommend/feature/client/FeatureV2Client.java

@@ -11,10 +11,18 @@ import net.devh.boot.grpc.client.inject.GrpcClient;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * FeatureV2 gRPC 客户端
@@ -46,6 +54,42 @@ public class FeatureV2Client {
      */
     private static final long RETRY_DELAY_MS = 50;
 
+    /**
+     * 单次 gRPC 请求的拆包大小(按 key 数量)
+     *
+     * 说明:避免单次响应过大触发 gRPC 默认 4MB 限制/链路取消;用户侧指定默认 500。
+     */
+    private static final int BATCH_SIZE = 500;
+
+    /**
+     * 拆包并发线程池
+     *
+     * 说明:机器 4 核时并发建议 4~8,这里按 cpu*2 并限制在 [4, 8]。
+     */
+    private static final int BATCH_PARALLELISM = Math.max(4,
+            Math.min(8, Runtime.getRuntime().availableProcessors() * 2));
+
+    private static final ExecutorService BATCH_EXECUTOR = new ThreadPoolExecutor(
+            BATCH_PARALLELISM,
+            BATCH_PARALLELISM,
+            60L,
+            TimeUnit.SECONDS,
+            // 有界队列 + CallerRuns:避免极端情况下无界堆积;队列满时回退到调用线程执行。
+            new LinkedBlockingQueue<>(2048),
+            new ThreadFactory() {
+                private final AtomicInteger idx = new AtomicInteger(1);
+
+                @Override
+                public Thread newThread(Runnable r) {
+                    Thread t = new Thread(r);
+                    t.setName("feature-v2-client-batch-" + idx.getAndIncrement());
+                    t.setDaemon(true);
+                    return t;
+                }
+            },
+            new ThreadPoolExecutor.CallerRunsPolicy()
+    );
+
     /**
      * 批量获取特征数据
      * 
@@ -59,10 +103,44 @@ public class FeatureV2Client {
         }
 
         long startTime = System.currentTimeMillis();
-        // 从第 0 次尝试开始
-        Map<String, String> result = multiGetFeatureWithRetry(protos, 0);
-        log.info("multiGetFeature: end, result.size={}, cost={}", result != null ? result.size() : 0, System.currentTimeMillis() - startTime);
-        return result;
+
+        // 小请求不拆包:保持原行为
+        if (protos.size() <= BATCH_SIZE) {
+            Map<String, String> result = multiGetFeatureWithRetry(protos, 0);
+            log.info("multiGetFeature: end, protos.size={}, batchSize={}, batchCount=1, result.size={}, cost={}",
+                    protos.size(), BATCH_SIZE, result != null ? result.size() : 0, System.currentTimeMillis() - startTime);
+            return result;
+        }
+
+        // 大请求拆包并发执行,最终合并结果;单个 batch 失败时返回部分结果
+        int total = protos.size();
+        int batchCount = (total + BATCH_SIZE - 1) / BATCH_SIZE;
+        List<CompletableFuture<Map<String, String>>> futures = new ArrayList<>(batchCount);
+        for (int start = 0; start < total; start += BATCH_SIZE) {
+            int end = Math.min(start + BATCH_SIZE, total);
+            List<FeatureKeyProto> batch = protos.subList(start, end);
+            futures.add(CompletableFuture.supplyAsync(() -> multiGetFeatureWithRetry(batch, 0), BATCH_EXECUTOR));
+        }
+
+        Map<String, String> merged = new HashMap<>();
+        int exceptionBatchCount = 0;
+        for (CompletableFuture<Map<String, String>> f : futures) {
+            try {
+                Map<String, String> part = f.join();
+                if (part != null && !part.isEmpty()) {
+                    merged.putAll(part);
+                }
+            } catch (Exception e) {
+                // 理论上 multiGetFeatureWithRetry 已吞掉异常并返回空;这里兜底,确保其它 batch 不受影响
+                exceptionBatchCount++;
+                log.error("multiGetFeature: batch future failed, ignored for partial result", e);
+            }
+        }
+
+        log.info("multiGetFeature: end, protos.size={}, batchSize={}, batchCount={}, parallelism={}, merged.size={}, exceptionBatchCount={}, cost={}",
+                protos.size(), BATCH_SIZE, batchCount, BATCH_PARALLELISM, merged.size(), exceptionBatchCount,
+                System.currentTimeMillis() - startTime);
+        return merged;
     }
     
     /**