瀏覽代碼

support compress

丁云鹏 2 月之前
父節點
當前提交
71c0192d36

+ 6 - 0
recommend-feature-service/pom.xml

@@ -124,6 +124,12 @@
             <artifactId>commons-collections4</artifactId>
             <version>4.1</version>
         </dependency>
+        <!-- Snappy compression library -->
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>1.1.8.4</version>
+        </dependency>
     </dependencies>
     <build>
         <finalName>recommend-feature-service</finalName>

+ 18 - 4
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/service/FeatureV2Service.java

@@ -7,10 +7,13 @@ import com.tzld.piaoquan.recommend.feature.model.feature.FeatureKeyProto;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureRequest;
 import com.tzld.piaoquan.recommend.feature.model.feature.MultiGetFeatureResponse;
 import com.tzld.piaoquan.recommend.feature.util.CommonCollectionUtils;
+import com.tzld.piaoquan.recommend.feature.util.CompressionUtil;
+import com.tzld.piaoquan.recommend.feature.util.JSONUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 
@@ -30,6 +33,13 @@ public class FeatureV2Service {
     @ApolloJsonValue("${dts.config.v2:}")
     private List<DTSConfig> dtsConfigs;
 
+    @ApolloJsonValue("${dts.config:}")
+    private List<DTSConfig> newDtsConfigs;
+
+    @Value("${compress.switch: false}")
+    private boolean compressSwitch;
+
+
     public MultiGetFeatureResponse multiGetFeature(MultiGetFeatureRequest request) {
         if (request.getFeatureKeyCount() == 0) {
             return MultiGetFeatureResponse.newBuilder()
@@ -39,8 +49,11 @@ public class FeatureV2Service {
         // 目前都在一个Redis,所以放在一个list简化处理
         List<String> redisKeys = CommonCollectionUtils.toList(request.getFeatureKeyList(), fk -> redisKey(fk));
         List<String> values = redisTemplate.opsForValue().multiGet(redisKeys);
+        if (compressSwitch) {
+            values = CommonCollectionUtils.toList(values, CompressionUtil::snappyDecompress);
+        }
 
-        // log.info("feature key {} value {}", JSONUtils.toJson(redisKeys), JSONUtils.toJson(values));
+        log.info("feature key {} value {}", JSONUtils.toJson(redisKeys), JSONUtils.toJson(values));
 
         MultiGetFeatureResponse.Builder builder = MultiGetFeatureResponse.newBuilder();
         builder.setResult(Result.newBuilder().setCode(1));
@@ -53,9 +66,10 @@ public class FeatureV2Service {
 
     // Note:写入和读取的key生成规则应保持一致
     private String redisKey(FeatureKeyProto fk) {
-        Optional<DTSConfig> optional = dtsConfigs.stream()
-                .filter(c -> c.getOdps() != null && StringUtils.equals(c.getOdps().getTable(), fk.getTableName()))
-                .findFirst();
+
+        Optional<DTSConfig> optional = (compressSwitch ? newDtsConfigs : dtsConfigs)
+                .stream().filter(c -> c.getOdps() != null && StringUtils.equals(c.getOdps().getTable(),
+                        fk.getTableName())).findFirst();
         if (!optional.isPresent()) {
             log.error("table {} not config", fk.getTableName());
             return "";

+ 86 - 0
recommend-feature-service/src/main/java/com/tzld/piaoquan/recommend/feature/util/CompressionUtil.java

@@ -0,0 +1,86 @@
+package com.tzld.piaoquan.recommend.feature.util;
+
+import lombok.extern.slf4j.Slf4j;
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import org.apache.commons.lang3.StringUtils;
+import org.xerial.snappy.Snappy;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+
+/**
+ * @author dyp
+ */
+@Slf4j
+public class CompressionUtil {
+    public static String lz4Compress(String input) {
+        byte[] data = input.getBytes(StandardCharsets.UTF_8);
+        LZ4Factory factory = LZ4Factory.fastestInstance();
+        LZ4Compressor compressor = factory.fastCompressor();
+        int maxCompressedLength = compressor.maxCompressedLength(data.length);
+        byte[] compressed = new byte[maxCompressedLength];
+        int compressedLength = compressor.compress(data, 0, data.length, compressed, 0, maxCompressedLength);
+        byte[] result = new byte[compressedLength];
+        System.arraycopy(compressed, 0, result, 0, compressedLength);
+        return Base64.getEncoder().encodeToString(result);
+    }
+
+    public static String lz4Decompress(String input) {
+        // 将Base64编码的字符串解码为字节数组
+        byte[] data = Base64.getDecoder().decode(input);
+        LZ4Factory factory = LZ4Factory.fastestInstance();
+        LZ4SafeDecompressor decompressor = factory.safeDecompressor();
+
+        // 创建一个缓冲区来存储解压缩后的数据
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+        // 使用ByteBuffer来处理压缩数据
+        ByteBuffer buffer = ByteBuffer.wrap(data);
+
+        // 假设每次读取的块大小为4KB(可以根据实际情况调整)
+        byte[] chunk = new byte[4096];
+        byte[] decompressedChunk = new byte[4096 * 2]; // 解压缩后的块可能会更大
+
+        while (buffer.hasRemaining()) {
+            int remaining = Math.min(buffer.remaining(), chunk.length);
+            buffer.get(chunk, 0, remaining);
+
+            // 解压缩当前块
+            int decompressedLength = decompressor.decompress(chunk, 0, remaining, decompressedChunk, 0);
+
+            // 将解压缩后的数据写入输出流
+            outputStream.write(decompressedChunk, 0, decompressedLength);
+        }
+
+        // 返回解压后的数据
+        return new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
+    }
+
+    public static String snappyCompress(String input) throws IOException {
+        byte[] inputBytes = input.getBytes(StandardCharsets.UTF_8);
+        byte[] compressedBytes = Snappy.compress(inputBytes);
+        return Base64.getEncoder().encodeToString(compressedBytes);
+    }
+
+    // 将Snappy压缩后的String解压缩回String
+    public static String snappyDecompress(String compressedInput) {
+        if (StringUtils.isBlank(compressedInput)) {
+            return "";
+        }
+        try {
+            byte[] compressedBytes = Base64.getDecoder().decode(compressedInput);
+            byte[] decompressedBytes = Snappy.uncompress(compressedBytes);
+            return new String(decompressedBytes, StandardCharsets.UTF_8);
+        } catch (Exception e) {
+            log.error("snappyDecompress error compressedInput {}", compressedInput, e);
+            return "";
+        }
+    }
+
+}