فهرست منبع

Merge branch 'feature_20260612_zhaohaipeng_compression' of algorithm/recommend-feature into master

zhaohaipeng 1 هفته پیش
والد
کامیت
f8ba94cb0f

+ 7 - 1
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/service/RedisService.java

@@ -5,12 +5,12 @@ import com.tzld.piaoquan.recommend.feature.produce.util.CompressionUtil;
 import com.tzld.piaoquan.recommend.feature.produce.util.JSONUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.StringUtils;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.Pipeline;
 
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -91,6 +91,12 @@ public class RedisService implements Serializable {
                 pipeline.setex(e.getKey(), expireSeconds, CompressionUtil.snappyCompress(e.getValue()));
             } catch (Exception ex) {
             }
+
+            try {
+                String newKey = String.format("%s:v2", e.getKey());
+                pipeline.setex(newKey.getBytes(StandardCharsets.UTF_8), expireSeconds, CompressionUtil.snappyCompressV2(e.getValue()));
+            } catch (Exception ignore) {
+            }
         }
         pipeline.sync();
 

+ 5 - 0
recommend-feature-produce/src/main/java/com/tzld/piaoquan/recommend/feature/produce/util/CompressionUtil.java

@@ -66,6 +66,11 @@ public class CompressionUtil {
         return Base64.getEncoder().encodeToString(compressedBytes);
     }
 
+    public static byte[] snappyCompressV2(String input) throws IOException{
+        byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
+        return Snappy.compress(inputBytes);
+    }
+
     // 将Snappy压缩后的String解压缩回String
     public static String snappyDecompress(String compressedInput) throws IOException {
         byte[] compressedBytes = Base64.getDecoder().decode(compressedInput);

+ 86 - 7
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/service/FeatureV2Service.java

@@ -21,7 +21,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.*;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 /**
  * @author dyp
@@ -33,12 +36,19 @@ public class FeatureV2Service {
     @Autowired
     private RedisTemplate<String, String> redisTemplate;
 
+    @Qualifier("byteRedisTemplate")
+    @Autowired
+    private RedisTemplate<String, byte[]> byteRedisTemplate;
+
     @ApolloJsonValue("${dts.config:}")
     private List<DTSConfig> newDtsConfigs;
 
     @Value("${feature.mget.batch.size:300}")
     private Integer batchSize;
 
+    @Value("${remove.base64.switch:false}")
+    private Boolean removeBase64Switch;
+
     public MultiGetFeatureResponse multiGetFeature(MultiGetFeatureRequest request) {
         int keyCount = request.getFeatureKeyCount();
 
@@ -48,6 +58,12 @@ public class FeatureV2Service {
                     .build();
         }
 
+
+        // 移除Base64编解码的key
+        if (Boolean.TRUE.equals(removeBase64Switch)) {
+            return this.multiGetFeatureV2(request);
+        }
+
         long startTime = System.currentTimeMillis();
 
         // 1. 生成Redis Key列表
@@ -56,19 +72,19 @@ public class FeatureV2Service {
         int safeBatchSize = (batchSize == null || batchSize <= 0) ? 300 : batchSize;
 
         // 2. 分批并行查询(必须保证返回值与 key 顺序一致)
-        List<BatchFuture> futures = new ArrayList<>();
+        List<BatchFuture<String>> futures = new ArrayList<>();
         for (int start = 0; start < redisKeys.size(); start += safeBatchSize) {
             int end = Math.min(start + safeBatchSize, redisKeys.size());
             List<String> batchKeys = redisKeys.subList(start, end);
             Future<List<String>> future = ThreadPoolFactory.multiGetFeaturePool()
                     .submit(() -> redisTemplate.opsForValue().multiGet(batchKeys));
-            futures.add(new BatchFuture(start, end - start, future));
+            futures.add(new BatchFuture<>(start, end - start, future));
         }
 
         // 3. 收集结果(整体最多等待约 2s;超时/异常的批次用 null 填充,避免错位)
         List<String> allValues = new ArrayList<>(Collections.nCopies(keyCount, null));
         long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(2000);
-        for (BatchFuture bf : futures) {
+        for (BatchFuture<String> bf : futures) {
             long remainingNanos = deadlineNanos - System.nanoTime();
             if (remainingNanos <= 0) {
                 bf.future.cancel(true);
@@ -106,12 +122,75 @@ public class FeatureV2Service {
         return build;
     }
 
-    private static final class BatchFuture {
+    private MultiGetFeatureResponse multiGetFeatureV2(MultiGetFeatureRequest request) {
+        long startTime = System.currentTimeMillis();
+        int keyCount = request.getFeatureKeyCount();
+
+        // 1. 生成Redis Key列表, v2去除了base64编码 key格式也变了
+        List<String> redisKeys = request.getFeatureKeyList().stream()
+                .map(this::redisKey)
+                .map(s -> String.format("%s:v2", s))
+                .collect(Collectors.toList());
+
+        int safeBatchSize = (batchSize == null || batchSize <= 0) ? 300 : batchSize;
+
+        // 2. 分批并行查询(必须保证返回值与 key 顺序一致)
+        List<BatchFuture<byte[]>> futures = new ArrayList<>();
+        for (int start = 0; start < redisKeys.size(); start += safeBatchSize) {
+            int end = Math.min(start + safeBatchSize, redisKeys.size());
+            List<String> batchKeys = redisKeys.subList(start, end);
+            Future<List<byte[]>> future = ThreadPoolFactory.multiGetFeaturePool()
+                    .submit(() -> byteRedisTemplate.opsForValue().multiGet(batchKeys));
+            futures.add(new BatchFuture<byte[]>(start, end - start, future));
+        }
+
+        // 3. 收集结果(整体最多等待约 2s;超时/异常的批次用 null 填充,避免错位)
+        List<byte[]> allValues = new ArrayList<>(Collections.nCopies(keyCount, null));
+        long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(2000);
+        for (BatchFuture<byte[]> bf : futures) {
+            long remainingNanos = deadlineNanos - System.nanoTime();
+            if (remainingNanos <= 0) {
+                bf.future.cancel(true);
+                continue;
+            }
+            try {
+                List<byte[]> batchValues = bf.future.get(remainingNanos, TimeUnit.NANOSECONDS);
+                if (batchValues == null) {
+                    continue;
+                }
+                int copyLen = Math.min(bf.size, batchValues.size());
+                for (int i = 0; i < copyLen; i++) {
+                    allValues.set(bf.startIndex + i, batchValues.get(i));
+                }
+            } catch (TimeoutException e) {
+                bf.future.cancel(true);
+                log.warn("Batch mGetV2 timeout, startIndex={}, size={}", bf.startIndex, bf.size);
+            } catch (Exception e) {
+                log.error("Batch mGetV2 failed, startIndex={}, size={}", bf.startIndex, bf.size, e);
+            }
+        }
+
+        // 4. 解压缩
+        List<String> values = CommonCollectionUtils.toList(allValues, CompressionUtil::snappyDecompressV2);
+
+        // 5. 构建响应
+        MultiGetFeatureResponse.Builder builder = MultiGetFeatureResponse.newBuilder();
+        builder.setResult(Result.newBuilder().setCode(1));
+        for (int i = 0; i < keyCount; i++) {
+            builder.putFeature(request.getFeatureKeyList().get(i).getUniqueKey(),
+                    Strings.nullToEmpty(values.get(i)));
+        }
+        MultiGetFeatureResponse build = builder.build();
+        log.info("multiGetFeatureV2, cost={}ms", System.currentTimeMillis() - startTime);
+        return build;
+    }
+
+    private static final class BatchFuture<T> {
         private final int startIndex;
         private final int size;
-        private final Future<List<String>> future;
+        private final Future<List<T>> future;
 
-        private BatchFuture(int startIndex, int size, Future<List<String>> future) {
+        private BatchFuture(int startIndex, int size, Future<List<T>> future) {
             this.startIndex = startIndex;
             this.size = size;
             this.future = future;

+ 13 - 0
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/util/CompressionUtil.java

@@ -12,6 +12,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
+import java.util.Objects;
 
 
 /**
@@ -83,4 +84,16 @@ public class CompressionUtil {
         }
     }
 
+    public static String snappyDecompressV2(byte[] compressedInput) {
+        if (Objects.isNull(compressedInput)) {
+            return "";
+        }
+        try {
+            byte[] decompressedBytes = Snappy.uncompress(compressedInput);
+            return new String(decompressedBytes, StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            log.error("snappyDecompressV2 error compressedInput {}", compressedInput, e);
+            return "";
+        }
+    }
 }

+ 2 - 2
recommend-feature-service/src/main/resources/application-dev.yml

@@ -18,7 +18,7 @@ eureka:
 
 spring:
   redis:
-    hostName: r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
+    hostName: r-bp1wwqqkjqwwkxgbup.redis.rds.aliyuncs.com
     port: 6379
     password: Wqsd@2019
     timeout: 1000
@@ -29,7 +29,7 @@ spring:
         max-idle: 8
         min-idle: 0
   tair:
-    hostName: r-bp1pi8wyv6lzvgjy5z.redis.rds.aliyuncs.com
+    hostName: r-bp1wwqqkjqwwkxgbup.redis.rds.aliyuncs.com
     port: 6379
     password: Wqsd@2019
     timeout: 1000

+ 32 - 0
recommend-feature-service/src/test/java/com/tzld/piaoquan/recommend/feature/service/FeatureV2ServiceTest.java

@@ -0,0 +1,32 @@
+package com.tzld.piaoquan.recommend.feature.service;
+
+import com.tzld.piaoquan.recommend.feature.util.CompressionUtil;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.redis.core.RedisTemplate;
+
+import java.util.Arrays;
+import java.util.List;
+
+@SpringBootTest
+public class FeatureV2ServiceTest {
+
+    @Autowired
+    @Qualifier("byteRedisTemplate")
+    private RedisTemplate<String, byte[]> byteRedisTemplate;
+
+    @Test
+    public void featureV2Test() {
+
+        List<String> keys = Arrays.asList(
+                "snappy:alg_videoid_feature:70282758:v2",
+                "snappy:alg_videoid_feature:70268863:v2"
+        );
+
+        for (byte[] bytes : byteRedisTemplate.opsForValue().multiGet(keys)) {
+            System.out.println(CompressionUtil.snappyDecompressV2(bytes));
+        }
+    }
+}