Kaynağa Gözat

Merge branch 'refs/heads/ljd/feature-eureka-fix' into test

# Conflicts:
#	recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/config/RedisTemplateConfig.java
#	recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/config/TairTemplateConfig.java
#	recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/service/FeatureV2Service.java
jiandong.liu 3 gün önce
ebeveyn
işleme
5a78841f10

+ 1 - 1
recommend-feature-client/pom.xml

@@ -10,7 +10,7 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>recommend-feature-client</artifactId>
-    <version>1.1.24</version>
+    <version>1.1.35</version>
 
     <dependencies>
         <dependency>

+ 102 - 60
recommend-feature-client/src/main/java/com/tzld/piaoquan/recommend/feature/client/FeatureV2Client.java

@@ -11,137 +11,179 @@ import net.devh.boot.grpc.client.inject.GrpcClient;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * FeatureV2 gRPC 客户端
- * 
+ *
  * 优化说明:
  * 1. 增加异常处理:捕获 StatusRuntimeException,避免错误向上传播
  * 2. 增加重试机制:对 UNAVAILABLE 等可恢复错误自动重试(50ms 延迟)
  * 3. 增加降级处理:重试失败后返回空结果,保证服务稳定
  * 4. 增加详细日志:记录错误详情,便于问题排查
- * 
+ *
  * @author dyp
  */
 @Component
 @Slf4j
 public class FeatureV2Client {
-    
+
     @GrpcClient("recommend-feature")
     private FeatureV2ServiceGrpc.FeatureV2ServiceBlockingStub client;
-    
+
     /**
-     * 最大重试次数
-     * 说明:对于网络连接问题,重试可以触发连接重建
+     * 单次 gRPC 请求的拆包大小(按 key 数量)
+     *
+     * 说明:避免单次响应过大触发 gRPC 默认 4MB 限制/链路取消;用户侧指定默认 500。
      */
-    private static final int MAX_RETRY_ATTEMPTS = 2;
-    
+    private static final int BATCH_SIZE = 500;
+
     /**
-     * 重试延迟(毫秒)
-     * 说明:50ms 快速重试,给连接重建预留时间
+     * 拆包并发线程池
+     *
+     * 说明:机器 4 核时并发建议 4~8,这里按 cpu*2 并限制在 [4, 8]。
      */
-    private static final long RETRY_DELAY_MS = 50;
+    private static final int BATCH_PARALLELISM = Math.max(4,
+            Math.min(8, Runtime.getRuntime().availableProcessors() * 2));
+
+    private static final ExecutorService BATCH_EXECUTOR = new ThreadPoolExecutor(
+            BATCH_PARALLELISM,
+            BATCH_PARALLELISM,
+            60L,
+            TimeUnit.SECONDS,
+            // 有界队列 + CallerRuns:避免极端情况下无界堆积;队列满时回退到调用线程执行。
+            new LinkedBlockingQueue<>(2048),
+            new ThreadFactory() {
+                private final AtomicInteger idx = new AtomicInteger(1);
+
+                @Override
+                public Thread newThread(Runnable r) {
+                    Thread t = new Thread(r);
+                    t.setName("feature-v2-client-batch-" + idx.getAndIncrement());
+                    t.setDaemon(true);
+                    return t;
+                }
+            },
+            new ThreadPoolExecutor.CallerRunsPolicy()
+    );
 
     /**
      * 批量获取特征数据
-     * 
+     *
      * @param protos 特征请求列表
      * @return 特征数据 Map,key 为 uniqueKey,value 为特征值 JSON 字符串
      */
     public Map<String, String> multiGetFeature(List<FeatureKeyProto> protos) {
         if (CollectionUtils.isEmpty(protos)) {
+            log.warn("multiGetFeature: protos is empty, return empty map");
             return Collections.emptyMap();
         }
-        
-        // 从第 0 次尝试开始
-        return multiGetFeatureWithRetry(protos, 0);
+
+        long startTime = System.currentTimeMillis();
+
+        // 小请求不拆包:保持原行为
+        if (protos.size() <= BATCH_SIZE) {
+            Map<String, String> result = multiGetFeatureWithRetry(protos, 0);
+            return result;
+        }
+
+        // 大请求拆包并发执行,最终合并结果;单个 batch 失败时返回部分结果
+        int total = protos.size();
+        int batchCount = (total + BATCH_SIZE - 1) / BATCH_SIZE;
+        List<CompletableFuture<Map<String, String>>> futures = new ArrayList<>(batchCount);
+        for (int start = 0; start < total; start += BATCH_SIZE) {
+            int end = Math.min(start + BATCH_SIZE, total);
+            List<FeatureKeyProto> batch = protos.subList(start, end);
+            futures.add(CompletableFuture.supplyAsync(() -> multiGetFeatureWithRetry(batch, 0), BATCH_EXECUTOR));
+        }
+
+        Map<String, String> merged = new HashMap<>();
+        int exceptionBatchCount = 0;
+        for (CompletableFuture<Map<String, String>> f : futures) {
+            try {
+                Map<String, String> part = f.join();
+                if (part != null && !part.isEmpty()) {
+                    merged.putAll(part);
+                }
+            } catch (Exception e) {
+                // 理论上 multiGetFeatureWithRetry 已吞掉异常并返回空;这里兜底,确保其它 batch 不受影响
+                exceptionBatchCount++;
+                log.error("multiGetFeature: batch future failed, ignored for partial result", e);
+            }
+        }
+
+        log.info("multiGetFeature: end, protos.size={}, batchSize={}, batchCount={}, parallelism={}, merged.size={}, exceptionBatchCount={}, cost={}",
+                protos.size(), BATCH_SIZE, batchCount, BATCH_PARALLELISM, merged.size(), exceptionBatchCount,
+                System.currentTimeMillis() - startTime);
+        return merged;
     }
-    
+
     /**
      * 带重试的特征获取方法
-     * 
+     *
      * @param protos 特征请求列表
      * @param attemptCount 当前重试次数(从 0 开始)
      * @return 特征数据 Map
      */
     private Map<String, String> multiGetFeatureWithRetry(List<FeatureKeyProto> protos, int attemptCount) {
+
         MultiGetFeatureRequest request = MultiGetFeatureRequest.newBuilder()
                 .addAllFeatureKey(protos)
                 .build();
-        
+
         try {
             // 调用 gRPC 服务,设置 3 秒超时
             MultiGetFeatureResponse response = client
                     .withDeadlineAfter(3, TimeUnit.SECONDS)
                     .multiGetFeature(request);
-            
+
             // 响应为空或没有结果
             if (response == null || !response.hasResult()) {
                 log.info("multiGetFeature grpc error: response is null or has no result, attempt={}", attemptCount);
                 return Collections.emptyMap();
             }
-            
+
             // 业务错误码检查
             if (response.getResult().getCode() != 1) {
                 log.info("multiGetFeature grpc code={}, msg={}, attempt={}", response.getResult().getCode(),
                         response.getResult().getMessage(), attemptCount);
                 return Collections.emptyMap();
             }
-            
+
             // 特征数据为空
             if (response.getFeatureCount() == 0) {
                 log.info("multiGetFeature no feature, attempt={}", attemptCount);
                 return Collections.emptyMap();
             }
-            
+
             // 成功返回特征数据
             return response.getFeatureMap();
-            
+
         } catch (StatusRuntimeException e) {
             Status.Code code = e.getStatus().getCode();
             String description = e.getStatus().getDescription();
-            
-            // 记录详细的错误信息
-            log.error("gRPC call failed: code={}, description={}, attempt={}/{}, protos.size={}", 
-                    code, description, attemptCount + 1, MAX_RETRY_ATTEMPTS + 1, protos.size(), e);
-            
-            // 判断是否应该重试
-            if (shouldRetry(code) && attemptCount < MAX_RETRY_ATTEMPTS) {
-                log.warn("Retrying gRPC call after {}ms, attempt={}/{}, reason={}", RETRY_DELAY_MS, attemptCount + 1, MAX_RETRY_ATTEMPTS, code);
-                
-                // 等待一段时间后重试(给连接重建预留时间)
-                try {
-                    Thread.sleep(RETRY_DELAY_MS);
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    log.warn("Retry sleep interrupted", ie);
-                }
-                
-                // 递归调用,进行重试
-                return multiGetFeatureWithRetry(protos, attemptCount + 1);
-            }
-            
-            // 重试失败或不可重试的错误,降级返回空结果
-            log.error("gRPC call failed after {} attempts, returning empty result for graceful degradation. code={}", 
-                    attemptCount + 1, code);
+
+            // 记录详细的错误信息(使用 error 级别确保一定会输出)
+            log.error("gRPC call failed: code={}, description={}, protos.size={}, exception={}",
+                    code, description, protos.size(), e.getClass().getName(), e);
+            return Collections.emptyMap();
+        } catch (Exception e) {
+            // 捕获所有其他异常(不应该发生,但为了安全起见)
+            log.error("multiGetFeatureWithRetry: unexpected exception, protos.size={}, exception={}",
+                    attemptCount + 1, protos.size(), e.getClass().getName(), e);
             return Collections.emptyMap();
         }
     }
-    
-    /**
-     * 判断错误是否应该重试
-     * 
-     * @param code gRPC 状态码
-     * @return true 表示应该重试,false 表示不应该重试
-     */
-    private boolean shouldRetry(Status.Code code) {
-        // UNAVAILABLE: 连接不可用(如网络断开、连接关闭)- 应该重试
-        // DEADLINE_EXCEEDED: 超时 - 应该重试
-        // RESOURCE_EXHAUSTED: 资源耗尽(如连接池满)- 应该重试
-        return code == Status.Code.UNAVAILABLE || code == Status.Code.DEADLINE_EXCEEDED || code == Status.Code.RESOURCE_EXHAUSTED;
-    }
+
 }

+ 12 - 0
recommend-feature-client/src/main/java/com/tzld/piaoquan/recommend/feature/domain/ad/base/AdRankItem.java

@@ -59,11 +59,23 @@ public class AdRankItem implements Comparable<AdRankItem> {
     private Long id;
     // 广告id
     private Long skuId;
+
+    private Long adSkuId;
+    private String adSkuCode;
+    private String adSkuName;
+
     //客户id
     private Long customerId;
     //行业
     private String profession;
 
+    private String adProfessionName;
+
+    private String adCategoryId;
+    private String adCategoryName;
+
+    private Integer landingPageType;
+
     // 特征
     private Map<String, Object> ext = new HashMap<>();
 

+ 2 - 2
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/config/RedisTemplateConfig.java

@@ -20,7 +20,7 @@ import java.time.Duration;
 @Configuration
 public class RedisTemplateConfig {
 
-    @Value("${spring.redis.timeout:1000}")
+    @Value("${spring.redis.timeout:5000}")
     private long redisTimeout;
 
     @Bean("redisPool")
@@ -31,7 +31,7 @@ public class RedisTemplateConfig {
 
     @Bean("redisConfig")
     @ConfigurationProperties(prefix = "spring.redis")
-    public RedisStandaloneConfiguration tairConfig() {
+    public RedisStandaloneConfiguration redisConfig() {
         return new RedisStandaloneConfiguration();
     }
 

+ 1 - 1
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/config/TairTemplateConfig.java

@@ -20,7 +20,7 @@ import java.time.Duration;
 @Configuration
 public class TairTemplateConfig {
 
-    @Value("${spring.tair.timeout:1000}")
+    @Value("${spring.tair.timeout:5000}")
     private long tairTimeout;
 
     @Bean("tairPool")

+ 65 - 12
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/service/FeatureV2Service.java

@@ -7,12 +7,14 @@ import com.tzld.piaoquan.recommend.feature.model.common.Result;
 import com.tzld.piaoquan.recommend.feature.model.feature.FeatureKeyProto;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureResponse;
+import com.tzld.piaoquan.recommend.feature.util.CommonCollectionUtils;
 import com.tzld.piaoquan.recommend.feature.util.CompressionUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 
@@ -20,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
@@ -39,6 +42,9 @@ public class FeatureV2Service {
     @ApolloJsonValue("${dts.config:}")
     private List<DTSConfig> newDtsConfigs;
 
+    @Value("${feature.mget.batch.size:300}")
+    private Integer batchSize;
+
     public MultiGetFeatureResponse multiGetFeature(MultiGetFeatureRequest request) {
         int keyCount = request.getFeatureKeyCount();
 
@@ -48,27 +54,74 @@ public class FeatureV2Service {
                     .build();
         }
 
-        List<FeatureKeyProto> featureKeys = request.getFeatureKeyList();
+        long startTime = System.currentTimeMillis();
 
-        // 1. 生成 Redis Key 列表
-        List<String> redisKeys = featureKeys.stream()
-                .map(this::redisKey)
-                .collect(Collectors.toList());
+        // 1. 生成Redis Key列表
+        List<String> redisKeys = CommonCollectionUtils.toList(request.getFeatureKeyList(), this::redisKey);
+
+        int safeBatchSize = (batchSize == null || batchSize <= 0) ? 300 : batchSize;
+
+        // 2. 分批并行查询(必须保证返回值与 key 顺序一致)
+        List<BatchFuture> futures = new ArrayList<>();
+        for (int start = 0; start < redisKeys.size(); start += safeBatchSize) {
+            int end = Math.min(start + safeBatchSize, redisKeys.size());
+            List<String> batchKeys = redisKeys.subList(start, end);
+            Future<List<String>> future = ThreadPoolFactory.multiGetFeaturePool()
+                    .submit(() -> redisTemplate.opsForValue().multiGet(batchKeys));
+            futures.add(new BatchFuture(start, end - start, future));
+        }
 
-        // 2. 分批并行查询 Redis
-        List<String> values = batchMultiGet(redisKeys);
+        // 3. 收集结果(整体最多等待约 2s;超时/异常的批次用 null 填充,避免错位)
+        List<String> allValues = new ArrayList<>(Collections.nCopies(keyCount, null));
+        long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(2000);
+        for (BatchFuture bf : futures) {
+            long remainingNanos = deadlineNanos - System.nanoTime();
+            if (remainingNanos <= 0) {
+                bf.future.cancel(true);
+                continue;
+            }
+            try {
+                List<String> batchValues = bf.future.get(remainingNanos, TimeUnit.NANOSECONDS);
+                if (batchValues == null) {
+                    continue;
+                }
+                int copyLen = Math.min(bf.size, batchValues.size());
+                for (int i = 0; i < copyLen; i++) {
+                    allValues.set(bf.startIndex + i, batchValues.get(i));
+                }
+            } catch (TimeoutException e) {
+                bf.future.cancel(true);
+                log.warn("Batch mGet timeout, startIndex={}, size={}", bf.startIndex, bf.size);
+            } catch (Exception e) {
+                log.error("Batch mGet failed, startIndex={}, size={}", bf.startIndex, bf.size, e);
+            }
+        }
 
-        // 3. 并行解压缩
-        values = batchDecompress(values);
+        // 4. 解压缩
+        List<String> values = CommonCollectionUtils.toList(allValues, CompressionUtil::snappyDecompress);
 
-        // 4. 构建响应
+        // 5. 构建响应
         MultiGetFeatureResponse.Builder builder = MultiGetFeatureResponse.newBuilder();
         builder.setResult(Result.newBuilder().setCode(1));
         for (int i = 0; i < keyCount; i++) {
-            builder.putFeature(featureKeys.get(i).getUniqueKey(),
+            builder.putFeature(request.getFeatureKeyList().get(i).getUniqueKey(),
                     Strings.nullToEmpty(values.get(i)));
         }
-        return builder.build();
+        MultiGetFeatureResponse build = builder.build();
+        log.info("multiGetFeature, cost={}ms", System.currentTimeMillis() - startTime);
+        return build;
+    }
+
+    private static final class BatchFuture {
+        private final int startIndex;
+        private final int size;
+        private final Future<List<String>> future;
+
+        private BatchFuture(int startIndex, int size, Future<List<String>> future) {
+            this.startIndex = startIndex;
+            this.size = size;
+            this.future = future;
+        }
     }
 
     /**

+ 1 - 1
recommend-feature-service/src/main/resources/application-dev.yml

@@ -8,7 +8,7 @@ grpc:
 eureka:
   instance:
     prefer-ip-address: true #是否优先使用IP地址作为主机名的标识,默认false
-    instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port} #注册到eureka上的唯一实例ID
+    instance-id: ${spring.application.name}:${POD_NAME}:${server.port} #注册到eureka上的唯一实例ID
     lease-renewal-interval-in-seconds: 10 #表示eureka client发送心跳给server端的频率,默认30
     lease-expiration-duration-in-seconds: 30 #表示eureka server至上一次收到client的心跳之后,等待下一次心跳的超时时间,在这个时间内若没收到下一次心跳,则将移除该instance,默认90
   client:

+ 1 - 1
recommend-feature-service/src/main/resources/application-pre.yml

@@ -4,7 +4,7 @@ server:
 eureka:
   instance:
     prefer-ip-address: true #是否优先使用IP地址作为主机名的标识,默认false
-    instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port} #注册到eureka上的唯一实例ID
+    instance-id: ${spring.application.name}:${POD_NAME}:${server.port} #注册到eureka上的唯一实例ID
     lease-renewal-interval-in-seconds: 10 #表示eureka client发送心跳给server端的频率,默认30
     lease-expiration-duration-in-seconds: 30 #表示eureka server至上一次收到client的心跳之后,等待下一次心跳的超时时间,在这个时间内若没收到下一次心跳,则将移除该instance,默认90
   client:

+ 3 - 3
recommend-feature-service/src/main/resources/application-prod.yml

@@ -28,7 +28,7 @@ thread:
 eureka:
   instance:
     prefer-ip-address: true #是否优先使用IP地址作为主机名的标识,默认false
-    instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port} #注册到eureka上的唯一实例ID
+    instance-id: ${spring.application.name}:${POD_NAME}:${server.port} #注册到eureka上的唯一实例ID
     lease-renewal-interval-in-seconds: 10 #表示eureka client发送心跳给server端的频率,默认30
     lease-expiration-duration-in-seconds: 30 #表示eureka server至上一次收到client的心跳之后,等待下一次心跳的超时时间,在这个时间内若没收到下一次心跳,则将移除该instance,默认90
   client:
@@ -41,7 +41,7 @@ spring:
     hostName: r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
     port: 6379
     password: Wqsd@2019
-    timeout: 1000
+    timeout: 5000  # 增加到5秒,防止批量查询超时
     lettuce:
       pool:
         # 增大连接池容量,防止高并发时连接等待
@@ -53,7 +53,7 @@ spring:
     hostName: r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
     port: 6379
     password: Wqsd@2019
-    timeout: 1000
+    timeout: 5000  # 增加到5秒,防止批量查询超时
     lettuce:
       pool:
         max-active: 64

+ 1 - 1
recommend-feature-service/src/main/resources/application-test.yml

@@ -4,7 +4,7 @@ server:
 eureka:
   instance:
     prefer-ip-address: true #是否优先使用IP地址作为主机名的标识,默认false
-    instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port} #注册到eureka上的唯一实例ID
+    instance-id: ${spring.application.name}:${POD_NAME}:${server.port} #注册到eureka上的唯一实例ID
     lease-renewal-interval-in-seconds: 10 #表示eureka client发送心跳给server端的频率,默认30
     lease-expiration-duration-in-seconds: 30 #表示eureka server至上一次收到client的心跳之后,等待下一次心跳的超时时间,在这个时间内若没收到下一次心跳,则将移除该instance,默认90
   client:

+ 6 - 0
recommend-feature-service/src/main/resources/application.yml

@@ -1,8 +1,14 @@
 spring:
+  lifecycle:
+    timeout-per-shutdown-phase: 30s
   profiles:
     active: dev
   application:
     name: recommend-feature
+
+server:
+  shutdown: graceful
+
 logging:
   file:
     path: /datalog/weblog/${spring.application.name}/