|
|
@@ -1,7 +1,5 @@
|
|
|
package com.tzld.piaoquan.recommend.feature.service;
|
|
|
|
|
|
-import com.ctrip.framework.apollo.model.ConfigChangeEvent;
|
|
|
-import com.ctrip.framework.apollo.spring.annotation.ApolloConfigChangeListener;
|
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
import com.google.common.base.Strings;
|
|
|
import com.tzld.piaoquan.recommend.feature.model.common.Result;
|
|
|
@@ -17,12 +15,8 @@ import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
-import javax.annotation.PostConstruct;
|
|
|
-import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
/**
|
|
|
* @author dyp
|
|
|
@@ -36,210 +30,46 @@ public class FeatureV2Service {
|
|
|
|
|
|
@ApolloJsonValue("${dts.config:}")
|
|
|
private List<DTSConfig> newDtsConfigs;
|
|
|
-
|
|
|
- // 配置缓存:tableName -> DTSConfig,避免每次遍历查找
|
|
|
- // 使用 ConcurrentHashMap 保证线程安全,volatile 保证可见性
|
|
|
- private volatile Map<String, DTSConfig> configCache = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
- // 用于缓存重建的锁对象,避免使用 this 锁
|
|
|
- private final Object cacheLock = new Object();
|
|
|
-
|
|
|
- // ========== redisKey 方法耗时统计(用于性能分析) ==========
|
|
|
- // 统计 redisKey 方法的总耗时和调用次数
|
|
|
- private volatile long redisKeyMaxTime = 0;
|
|
|
- private final Object statsLock = new Object();
|
|
|
|
|
|
public MultiGetFeatureResponse multiGetFeature(MultiGetFeatureRequest request) {
|
|
|
- long startTime = System.currentTimeMillis();
|
|
|
int keyCount = request.getFeatureKeyCount();
|
|
|
-
|
|
|
+
|
|
|
if (keyCount == 0) {
|
|
|
return MultiGetFeatureResponse.newBuilder()
|
|
|
.setResult(Result.newBuilder().setCode(1))
|
|
|
.build();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
// 1. 生成Redis Key列表
|
|
|
- long keyGenStart = System.currentTimeMillis();
|
|
|
List<String> redisKeys = CommonCollectionUtils.toList(request.getFeatureKeyList(), this::redisKey);
|
|
|
- long keyGenTime = System.currentTimeMillis() - keyGenStart;
|
|
|
-
|
|
|
+
|
|
|
// 2. Redis批量查询
|
|
|
- long redisStart = System.currentTimeMillis();
|
|
|
List<String> values = redisTemplate.opsForValue().multiGet(redisKeys);
|
|
|
- long redisTime = System.currentTimeMillis() - redisStart;
|
|
|
-
|
|
|
+
|
|
|
// 3. 数据解压缩
|
|
|
- long decompressStart = System.currentTimeMillis();
|
|
|
values = CommonCollectionUtils.toList(values, CompressionUtil::snappyDecompress);
|
|
|
- long decompressTime = System.currentTimeMillis() - decompressStart;
|
|
|
-
|
|
|
+
|
|
|
// 4. 构建响应
|
|
|
- long buildStart = System.currentTimeMillis();
|
|
|
MultiGetFeatureResponse.Builder builder = MultiGetFeatureResponse.newBuilder();
|
|
|
builder.setResult(Result.newBuilder().setCode(1));
|
|
|
for (int i = 0; i < keyCount; i++) {
|
|
|
builder.putFeature(request.getFeatureKeyList().get(i).getUniqueKey(),
|
|
|
Strings.nullToEmpty(values.get(i)));
|
|
|
}
|
|
|
- MultiGetFeatureResponse response = builder.build();
|
|
|
- long buildTime = System.currentTimeMillis() - buildStart;
|
|
|
-
|
|
|
- long totalTime = System.currentTimeMillis() - startTime;
|
|
|
-
|
|
|
- // 性能日志:超过阈值或关键指标记录
|
|
|
- if (totalTime > 100 || keyCount > 50) {
|
|
|
- log.warn("multiGetFeature performance: total={}ms, keyCount={}, keyGen={}ms, redis={}ms, decompress={}ms, build={}ms",
|
|
|
- totalTime, keyCount, keyGenTime, redisTime, decompressTime, buildTime);
|
|
|
- } else if (log.isDebugEnabled()) {
|
|
|
- log.debug("multiGetFeature performance: total={}ms, keyCount={}, keyGen={}ms, redis={}ms, decompress={}ms, build={}ms",
|
|
|
- totalTime, keyCount, keyGenTime, redisTime, decompressTime, buildTime);
|
|
|
- }
|
|
|
-
|
|
|
- return response;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 初始化配置缓存
|
|
|
- */
|
|
|
- @PostConstruct
|
|
|
- public void init() {
|
|
|
- refreshConfigCache();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 监听 Apollo 配置变更,动态刷新配置缓存
|
|
|
- * 当 dts.config 配置更新时,所有实例都会收到通知并刷新本地缓存
|
|
|
- */
|
|
|
- @ApolloConfigChangeListener(interestedKeys = {"dts.config"})
|
|
|
- public void onConfigChange(ConfigChangeEvent changeEvent) {
|
|
|
- if (changeEvent.isChanged("dts.config")) {
|
|
|
- try {
|
|
|
- // ApolloJsonValue 会自动更新 newDtsConfigs,这里只需要刷新缓存
|
|
|
- refreshConfigCache();
|
|
|
- log.info("DTS config updated. Cache refreshed. Config count: {}", configCache.size());
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("Failed to refresh DTS config cache", e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 刷新配置缓存
|
|
|
- * 从 newDtsConfigs 重新构建 configCache
|
|
|
- */
|
|
|
- private void refreshConfigCache() {
|
|
|
- if (newDtsConfigs == null || newDtsConfigs.isEmpty()) {
|
|
|
- log.warn("newDtsConfigs is null or empty, clear configCache");
|
|
|
- configCache.clear();
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- Map<String, DTSConfig> newCache = new HashMap<>();
|
|
|
- for (DTSConfig c : newDtsConfigs) {
|
|
|
- if (c.getOdps() != null && StringUtils.isNotBlank(c.getOdps().getTable())) {
|
|
|
- newCache.put(c.getOdps().getTable(), c);
|
|
|
- }
|
|
|
- }
|
|
|
- configCache = newCache;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 获取配置,如果缓存中没有则尝试从 newDtsConfigs 查找并更新缓存
|
|
|
- * 使用双重检查锁定模式,但优化了逻辑:即使缓存不为空,如果缺少某个key也会尝试更新
|
|
|
- */
|
|
|
- private DTSConfig getConfig(String tableName) {
|
|
|
- // 第一次检查:直接从缓存获取
|
|
|
- DTSConfig config = configCache.get(tableName);
|
|
|
- if (config != null) {
|
|
|
- return config;
|
|
|
- }
|
|
|
-
|
|
|
- // 缓存中没有,尝试从 newDtsConfigs 查找
|
|
|
- if (newDtsConfigs == null || newDtsConfigs.isEmpty()) {
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- // 双重检查锁定:防止并发重建缓存
|
|
|
- synchronized (cacheLock) {
|
|
|
- // 第二次检查:可能其他线程已经更新了缓存
|
|
|
- config = configCache.get(tableName);
|
|
|
- if (config != null) {
|
|
|
- return config;
|
|
|
- }
|
|
|
-
|
|
|
- // 从 newDtsConfigs 中查找该 tableName 的配置
|
|
|
- for (DTSConfig c : newDtsConfigs) {
|
|
|
- if (c.getOdps() != null && StringUtils.isNotBlank(c.getOdps().getTable())
|
|
|
- && c.getOdps().getTable().equals(tableName)) {
|
|
|
- // 找到配置,更新缓存(只更新单个key,不影响其他key)
|
|
|
- configCache.put(tableName, c);
|
|
|
- return c;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // 如果整个缓存为空,重建整个缓存(兼容旧逻辑)
|
|
|
- if (configCache.isEmpty()) {
|
|
|
- refreshConfigCache();
|
|
|
- return configCache.get(tableName);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 生成 Redis Key(历史逻辑版本:每次从 newDtsConfigs 中 stream 查找)
|
|
|
- * 当前使用此方法,用于统计耗时,后续可切换到 redisKeyV2 使用缓存优化
|
|
|
- */
|
|
|
- private String redisKey(FeatureKeyProto fk) {
|
|
|
- long startTime = System.currentTimeMillis();
|
|
|
-
|
|
|
- try {
|
|
|
- Optional<DTSConfig> optional = newDtsConfigs.stream()
|
|
|
- .filter(c -> c.getOdps() != null && StringUtils.equals(c.getOdps().getTable(), fk.getTableName()))
|
|
|
- .findFirst();
|
|
|
- if (!optional.isPresent()) {
|
|
|
- log.error("table {} not config", fk.getTableName());
|
|
|
- return "";
|
|
|
- }
|
|
|
- DTSConfig config = optional.get();
|
|
|
-
|
|
|
- // Note:写入和读取的key生成规则应保持一致
|
|
|
- List<String> fields = config.getRedis().getKey();
|
|
|
- if (org.apache.commons.collections4.CollectionUtils.isEmpty(fields)) {
|
|
|
- return config.getRedis().getPrefix();
|
|
|
- }
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- if (StringUtils.isNotBlank(config.getRedis().getPrefix())) {
|
|
|
- sb.append(config.getRedis().getPrefix());
|
|
|
- }
|
|
|
- for (String field : fields) {
|
|
|
- sb.append(":");
|
|
|
- sb.append(fk.getFieldValueMap().get(field));
|
|
|
- }
|
|
|
- return sb.toString();
|
|
|
- } finally {
|
|
|
- // 统计最大耗时(单位:毫秒)
|
|
|
- long elapsedTime = System.currentTimeMillis() - startTime;
|
|
|
- synchronized (statsLock) {
|
|
|
- if (elapsedTime > redisKeyMaxTime) {
|
|
|
- redisKeyMaxTime = elapsedTime;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
+ return builder.build();
|
|
|
}
|
|
|
|
|
|
// Note:写入和读取的key生成规则应保持一致
|
|
|
- private String redisKeyV2(FeatureKeyProto fk) {
|
|
|
- String tableName = fk.getTableName();
|
|
|
+ private String redisKey(FeatureKeyProto fk) {
|
|
|
|
|
|
- // 从缓存获取配置,如果不存在则查找并缓存
|
|
|
- DTSConfig config = getConfig(tableName);
|
|
|
- if (config == null) {
|
|
|
- log.error("table {} not config", tableName);
|
|
|
+ Optional<DTSConfig> optional = newDtsConfigs.stream()
|
|
|
+ .filter(c -> c.getOdps() != null && StringUtils.equals(c.getOdps().getTable(), fk.getTableName()))
|
|
|
+ .findFirst();
|
|
|
+ if (!optional.isPresent()) {
|
|
|
+ log.error("table {} not config", fk.getTableName());
|
|
|
return "";
|
|
|
}
|
|
|
+ DTSConfig config = optional.get();
|
|
|
|
|
|
// Note:写入和读取的key生成规则应保持一致
|
|
|
List<String> fields = config.getRedis().getKey();
|