jiandong.liu 4 дней назад
Родитель
Сommit
edfc2f4cee

+ 3 - 3
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/util/JSONUtils.java

@@ -10,13 +10,14 @@ import java.io.Serializable;
 @Slf4j
 public class JSONUtils implements Serializable {
 
+    private static final Gson GSON = new Gson();
 
     public static String toJson(Object obj) {
         if (obj == null) {
             return "";
         }
         try {
-            return new Gson().toJson(obj);
+            return GSON.toJson(obj);
         } catch (Exception e) {
             log.error("toJson exception", e);
             return "";
@@ -24,12 +25,11 @@ public class JSONUtils implements Serializable {
     }
 
     public static <T> T fromJson(String value, TypeToken<T> typeToken, T defaultValue) {
-
         if (StringUtils.isBlank(value)) {
             return defaultValue;
         }
         try {
-            return new Gson().fromJson(value, typeToken.getType());
+            return GSON.fromJson(value, typeToken.getType());
         } catch (Exception e) {
             log.error("fromJson error! value=[{}]", value, e);
         }

+ 6 - 0
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/Application.java

@@ -1,5 +1,7 @@
 package com.tzld.piaoquan.recommend.feature;
 
+import com.alibaba.fastjson.parser.ParserConfig;
+import com.alibaba.fastjson.serializer.SerializeConfig;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration;
@@ -23,6 +25,10 @@ import org.springframework.context.annotation.EnableAspectJAutoProxy;
 @EnableAspectJAutoProxy
 public class Application {
     public static void main(String[] args) {
+        // 禁用 FastJSON ASM,避免 Metaspace 泄漏
+        ParserConfig.getGlobalInstance().setAsmEnable(false);
+        SerializeConfig.getGlobalInstance().setAsmEnable(false);
+
         SpringApplication.run(Application.class, args);
     }
 }

+ 2 - 2
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/common/ThreadPoolFactory.java

@@ -17,14 +17,14 @@ public final class ThreadPoolFactory {
             0L, TimeUnit.SECONDS,
             new LinkedBlockingQueue<>(1000),
             new ThreadFactoryBuilder().setNameFormat("DEFAULT-%d").build(),
-            new ThreadPoolExecutor.AbortPolicy());
+            new ThreadPoolExecutor.CallerRunsPolicy());
     private final static ExecutorService MULTI_GET_FEATURE = new CommonThreadPoolExecutor(
             256,
             256,
             0L, TimeUnit.SECONDS,
             new LinkedBlockingQueue<>(1000),
             new ThreadFactoryBuilder().setNameFormat("MultiGetFeaturePool-%d").build(),
-            new ThreadPoolExecutor.AbortPolicy());
+            new ThreadPoolExecutor.CallerRunsPolicy());
 
     public static ExecutorService defaultPool() {
         return DEFAULT;

+ 96 - 9
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/service/FeatureV2Service.java

@@ -2,21 +2,26 @@ package com.tzld.piaoquan.recommend.feature.service;
 
 import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
 import com.google.common.base.Strings;
+import com.tzld.piaoquan.recommend.feature.common.ThreadPoolFactory;
 import com.tzld.piaoquan.recommend.feature.model.common.Result;
 import com.tzld.piaoquan.recommend.feature.model.feature.FeatureKeyProto;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureResponse;
-import com.tzld.piaoquan.recommend.feature.util.CommonCollectionUtils;
 import com.tzld.piaoquan.recommend.feature.util.CompressionUtil;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 /**
  * @author dyp
@@ -24,6 +29,9 @@ import java.util.Optional;
 @Service
 @Slf4j
 public class FeatureV2Service {
+
+    private static final int BATCH_SIZE = 300;
+
     @Qualifier("redisTemplate")
     @Autowired
     private RedisTemplate<String, String> redisTemplate;
@@ -40,25 +48,104 @@ public class FeatureV2Service {
                     .build();
         }
 
-        // 1. 生成Redis Key列表
-        List<String> redisKeys = CommonCollectionUtils.toList(request.getFeatureKeyList(), this::redisKey);
+        List<FeatureKeyProto> featureKeys = request.getFeatureKeyList();
+
+        // 1. 生成 Redis Key 列表
+        List<String> redisKeys = featureKeys.stream()
+                .map(this::redisKey)
+                .collect(Collectors.toList());
 
-        // 2. Redis批量查询
-        List<String> values = redisTemplate.opsForValue().multiGet(redisKeys);
+        // 2. 分批并行查询 Redis
+        List<String> values = batchMultiGet(redisKeys);
 
-        // 3. 数据解压缩
-        values = CommonCollectionUtils.toList(values, CompressionUtil::snappyDecompress);
+        // 3. 并行解压缩
+        values = batchDecompress(values);
 
         // 4. 构建响应
         MultiGetFeatureResponse.Builder builder = MultiGetFeatureResponse.newBuilder();
         builder.setResult(Result.newBuilder().setCode(1));
         for (int i = 0; i < keyCount; i++) {
-            builder.putFeature(request.getFeatureKeyList().get(i).getUniqueKey(),
+            builder.putFeature(featureKeys.get(i).getUniqueKey(),
                     Strings.nullToEmpty(values.get(i)));
         }
         return builder.build();
     }
 
+    /**
+     * 分批并行查询 Redis
+     */
+    private List<String> batchMultiGet(List<String> redisKeys) {
+        if (CollectionUtils.isEmpty(redisKeys)) {
+            return Collections.emptyList();
+        }
+
+        // 如果数量小于批次大小,直接查询
+        if (redisKeys.size() <= BATCH_SIZE) {
+            List<String> result = redisTemplate.opsForValue().multiGet(redisKeys);
+            return result != null ? result : Collections.emptyList();
+        }
+
+        // 分批
+        List<List<String>> batches = partition(redisKeys, BATCH_SIZE);
+
+        // 并行查询
+        List<CompletableFuture<List<String>>> futures = batches.stream()
+                .map(batch -> CompletableFuture.supplyAsync(() -> {
+                    List<String> result = redisTemplate.opsForValue().multiGet(batch);
+                    return result != null ? result : new ArrayList<String>();
+                }, ThreadPoolFactory.multiGetFeaturePool()))
+                .collect(Collectors.toList());
+
+        // 合并结果(保持顺序)
+        return futures.stream()
+                .map(CompletableFuture::join)
+                .flatMap(List::stream)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * 并行解压缩
+     */
+    private List<String> batchDecompress(List<String> values) {
+        if (CollectionUtils.isEmpty(values)) {
+            return Collections.emptyList();
+        }
+
+        // 如果数量小于批次大小,直接解压
+        if (values.size() <= BATCH_SIZE) {
+            return values.stream()
+                    .map(CompressionUtil::snappyDecompress)
+                    .collect(Collectors.toList());
+        }
+
+        // 分批并行解压
+        List<List<String>> batches = partition(values, BATCH_SIZE);
+
+        List<CompletableFuture<List<String>>> futures = batches.stream()
+                .map(batch -> CompletableFuture.supplyAsync(() ->
+                        batch.stream()
+                                .map(CompressionUtil::snappyDecompress)
+                                .collect(Collectors.toList()),
+                        ThreadPoolFactory.multiGetFeaturePool()))
+                .collect(Collectors.toList());
+
+        return futures.stream()
+                .map(CompletableFuture::join)
+                .flatMap(List::stream)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * 将列表分批
+     */
+    private <T> List<List<T>> partition(List<T> list, int size) {
+        List<List<T>> partitions = new ArrayList<>();
+        for (int i = 0; i < list.size(); i += size) {
+            partitions.add(list.subList(i, Math.min(i + size, list.size())));
+        }
+        return partitions;
+    }
+
     // Note:写入和读取的key生成规则应保持一致
     private String redisKey(FeatureKeyProto fk) {
 
@@ -73,7 +160,7 @@ public class FeatureV2Service {
 
         // Note:写入和读取的key生成规则应保持一致
         List<String> fields = config.getRedis().getKey();
-        if (org.apache.commons.collections4.CollectionUtils.isEmpty(fields)) {
+        if (CollectionUtils.isEmpty(fields)) {
             return config.getRedis().getPrefix();
         }
         StringBuilder sb = new StringBuilder();

+ 4 - 5
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/util/JSONUtils.java

@@ -1,6 +1,5 @@
 package com.tzld.piaoquan.recommend.feature.util;
 
-import com.alibaba.fastjson.JSONObject;
 import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
@@ -9,13 +8,14 @@ import org.apache.commons.lang3.StringUtils;
 @Slf4j
 public class JSONUtils {
 
+    private static final Gson GSON = new Gson();
 
     public static String toJson(Object obj) {
         if (obj == null) {
             return "";
         }
         try {
-            return new Gson().toJson(obj);
+            return GSON.toJson(obj);
         } catch (Exception e) {
             log.error("toJson exception", e);
             return "";
@@ -23,14 +23,13 @@ public class JSONUtils {
     }
 
     public static <T> T fromJson(String value, TypeToken<T> typeToken, T defaultValue) {
-
         if (StringUtils.isBlank(value)) {
             return defaultValue;
         }
         try {
-            return JSONObject.parseObject(value, typeToken.getType());
+            return GSON.fromJson(value, typeToken.getType());
         } catch (Exception e) {
-            log.error("parseObject error! value=[{}]", value, e);
+            log.error("fromJson error! value=[{}]", value, e);
         }
         return defaultValue;
     }