|
|
@@ -1,5 +1,7 @@
|
|
|
package com.tzld.piaoquan.recommend.feature.service;
|
|
|
|
|
|
+import com.ctrip.framework.apollo.model.ConfigChangeEvent;
|
|
|
+import com.ctrip.framework.apollo.spring.annotation.ApolloConfigChangeListener;
|
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
import com.google.common.base.Strings;
|
|
|
import com.tzld.piaoquan.recommend.feature.model.common.Result;
|
|
|
@@ -15,8 +17,12 @@ import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Optional;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
|
/**
|
|
|
* @author dyp
|
|
|
@@ -30,41 +36,210 @@ public class FeatureV2Service {
|
|
|
|
|
|
@ApolloJsonValue("${dts.config:}")
|
|
|
private List<DTSConfig> newDtsConfigs;
|
|
|
+
|
|
|
+ // 配置缓存:tableName -> DTSConfig,避免每次遍历查找
|
|
|
+ // 使用 ConcurrentHashMap 保证线程安全,volatile 保证可见性
|
|
|
+ private volatile Map<String, DTSConfig> configCache = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ // 用于缓存重建的锁对象,避免使用 this 锁
|
|
|
+ private final Object cacheLock = new Object();
|
|
|
+
|
|
|
+ // ========== redisKey 方法耗时统计(用于性能分析) ==========
|
|
|
+ // 统计 redisKey 方法的总耗时和调用次数
|
|
|
+ private volatile long redisKeyMaxTime = 0;
|
|
|
+ private final Object statsLock = new Object();
|
|
|
|
|
|
public MultiGetFeatureResponse multiGetFeature(MultiGetFeatureRequest request) {
|
|
|
- if (request.getFeatureKeyCount() == 0) {
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ int keyCount = request.getFeatureKeyCount();
|
|
|
+
|
|
|
+ if (keyCount == 0) {
|
|
|
return MultiGetFeatureResponse.newBuilder()
|
|
|
.setResult(Result.newBuilder().setCode(1))
|
|
|
.build();
|
|
|
}
|
|
|
- // 目前都在一个Redis,所以放在一个list简化处理
|
|
|
- List<String> redisKeys = CommonCollectionUtils.toList(request.getFeatureKeyList(), fk -> redisKey(fk));
|
|
|
+
|
|
|
+ // 1. 生成Redis Key列表
|
|
|
+ long keyGenStart = System.currentTimeMillis();
|
|
|
+ List<String> redisKeys = CommonCollectionUtils.toList(request.getFeatureKeyList(), this::redisKey);
|
|
|
+ long keyGenTime = System.currentTimeMillis() - keyGenStart;
|
|
|
+
|
|
|
+ // 2. Redis批量查询
|
|
|
+ long redisStart = System.currentTimeMillis();
|
|
|
List<String> values = redisTemplate.opsForValue().multiGet(redisKeys);
|
|
|
- // 兼容老的数据
|
|
|
+ long redisTime = System.currentTimeMillis() - redisStart;
|
|
|
+
|
|
|
+ // 3. 数据解压缩
|
|
|
+ long decompressStart = System.currentTimeMillis();
|
|
|
values = CommonCollectionUtils.toList(values, CompressionUtil::snappyDecompress);
|
|
|
-
|
|
|
- //log.info("feature key {} value {}", JSONUtils.toJson(redisKeys), JSONUtils.toJson(values));
|
|
|
-
|
|
|
+ long decompressTime = System.currentTimeMillis() - decompressStart;
|
|
|
+
|
|
|
+ // 4. 构建响应
|
|
|
+ long buildStart = System.currentTimeMillis();
|
|
|
MultiGetFeatureResponse.Builder builder = MultiGetFeatureResponse.newBuilder();
|
|
|
builder.setResult(Result.newBuilder().setCode(1));
|
|
|
- for (int i = 0; i < request.getFeatureKeyCount(); i++) {
|
|
|
+ for (int i = 0; i < keyCount; i++) {
|
|
|
builder.putFeature(request.getFeatureKeyList().get(i).getUniqueKey(),
|
|
|
Strings.nullToEmpty(values.get(i)));
|
|
|
}
|
|
|
- return builder.build();
|
|
|
+ MultiGetFeatureResponse response = builder.build();
|
|
|
+ long buildTime = System.currentTimeMillis() - buildStart;
|
|
|
+
|
|
|
+ long totalTime = System.currentTimeMillis() - startTime;
|
|
|
+
|
|
|
+ // 性能日志:超过阈值或关键指标记录
|
|
|
+ if (totalTime > 100 || keyCount > 50) {
|
|
|
+ log.warn("multiGetFeature performance: total={}ms, keyCount={}, keyGen={}ms, redis={}ms, decompress={}ms, build={}ms",
|
|
|
+ totalTime, keyCount, keyGenTime, redisTime, decompressTime, buildTime);
|
|
|
+ } else if (log.isDebugEnabled()) {
|
|
|
+ log.debug("multiGetFeature performance: total={}ms, keyCount={}, keyGen={}ms, redis={}ms, decompress={}ms, build={}ms",
|
|
|
+ totalTime, keyCount, keyGenTime, redisTime, decompressTime, buildTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ return response;
|
|
|
}
|
|
|
|
|
|
- // Note:写入和读取的key生成规则应保持一致
|
|
|
+ /**
|
|
|
+ * 初始化配置缓存
|
|
|
+ */
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ refreshConfigCache();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 监听 Apollo 配置变更,动态刷新配置缓存
|
|
|
+ * 当 dts.config 配置更新时,所有实例都会收到通知并刷新本地缓存
|
|
|
+ */
|
|
|
+ @ApolloConfigChangeListener(interestedKeys = {"dts.config"})
|
|
|
+ public void onConfigChange(ConfigChangeEvent changeEvent) {
|
|
|
+ if (changeEvent.isChanged("dts.config")) {
|
|
|
+ try {
|
|
|
+ // ApolloJsonValue 会自动更新 newDtsConfigs,这里只需要刷新缓存
|
|
|
+ refreshConfigCache();
|
|
|
+ log.info("DTS config updated. Cache refreshed. Config count: {}", configCache.size());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("Failed to refresh DTS config cache", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 刷新配置缓存
|
|
|
+ * 从 newDtsConfigs 重新构建 configCache
|
|
|
+ */
|
|
|
+ private void refreshConfigCache() {
|
|
|
+ if (newDtsConfigs == null || newDtsConfigs.isEmpty()) {
|
|
|
+ log.warn("newDtsConfigs is null or empty, clear configCache");
|
|
|
+ configCache.clear();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, DTSConfig> newCache = new HashMap<>();
|
|
|
+ for (DTSConfig c : newDtsConfigs) {
|
|
|
+ if (c.getOdps() != null && StringUtils.isNotBlank(c.getOdps().getTable())) {
|
|
|
+ newCache.put(c.getOdps().getTable(), c);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ configCache = newCache;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取配置,如果缓存中没有则尝试从 newDtsConfigs 查找并更新缓存
|
|
|
+ * 使用双重检查锁定模式,但优化了逻辑:即使缓存不为空,如果缺少某个key也会尝试更新
|
|
|
+ */
|
|
|
+ private DTSConfig getConfig(String tableName) {
|
|
|
+ // 第一次检查:直接从缓存获取
|
|
|
+ DTSConfig config = configCache.get(tableName);
|
|
|
+ if (config != null) {
|
|
|
+ return config;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 缓存中没有,尝试从 newDtsConfigs 查找
|
|
|
+ if (newDtsConfigs == null || newDtsConfigs.isEmpty()) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 双重检查锁定:防止并发重建缓存
|
|
|
+ synchronized (cacheLock) {
|
|
|
+ // 第二次检查:可能其他线程已经更新了缓存
|
|
|
+ config = configCache.get(tableName);
|
|
|
+ if (config != null) {
|
|
|
+ return config;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 从 newDtsConfigs 中查找该 tableName 的配置
|
|
|
+ for (DTSConfig c : newDtsConfigs) {
|
|
|
+ if (c.getOdps() != null && StringUtils.isNotBlank(c.getOdps().getTable())
|
|
|
+ && c.getOdps().getTable().equals(tableName)) {
|
|
|
+ // 找到配置,更新缓存(只更新单个key,不影响其他key)
|
|
|
+ configCache.put(tableName, c);
|
|
|
+ return c;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 如果整个缓存为空,重建整个缓存(兼容旧逻辑)
|
|
|
+ if (configCache.isEmpty()) {
|
|
|
+ refreshConfigCache();
|
|
|
+ return configCache.get(tableName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生成 Redis Key(历史逻辑版本:每次从 newDtsConfigs 中 stream 查找)
|
|
|
+ * 当前使用此方法,用于统计耗时,后续可切换到 redisKeyV2 使用缓存优化
|
|
|
+ */
|
|
|
private String redisKey(FeatureKeyProto fk) {
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ try {
|
|
|
+ Optional<DTSConfig> optional = newDtsConfigs.stream()
|
|
|
+ .filter(c -> c.getOdps() != null && StringUtils.equals(c.getOdps().getTable(), fk.getTableName()))
|
|
|
+ .findFirst();
|
|
|
+ if (!optional.isPresent()) {
|
|
|
+ log.error("table {} not config", fk.getTableName());
|
|
|
+ return "";
|
|
|
+ }
|
|
|
+ DTSConfig config = optional.get();
|
|
|
+
|
|
|
+ // Note:写入和读取的key生成规则应保持一致
|
|
|
+ List<String> fields = config.getRedis().getKey();
|
|
|
+ if (org.apache.commons.collections4.CollectionUtils.isEmpty(fields)) {
|
|
|
+ return config.getRedis().getPrefix();
|
|
|
+ }
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ if (StringUtils.isNotBlank(config.getRedis().getPrefix())) {
|
|
|
+ sb.append(config.getRedis().getPrefix());
|
|
|
+ }
|
|
|
+ for (String field : fields) {
|
|
|
+ sb.append(":");
|
|
|
+ sb.append(fk.getFieldValueMap().get(field));
|
|
|
+ }
|
|
|
+ return sb.toString();
|
|
|
+ } finally {
|
|
|
+ // 统计最大耗时(单位:毫秒)
|
|
|
+ long elapsedTime = System.currentTimeMillis() - startTime;
|
|
|
+ synchronized (statsLock) {
|
|
|
+ if (elapsedTime > redisKeyMaxTime) {
|
|
|
+ redisKeyMaxTime = elapsedTime;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Note:写入和读取的key生成规则应保持一致
|
|
|
+ private String redisKeyV2(FeatureKeyProto fk) {
|
|
|
+ String tableName = fk.getTableName();
|
|
|
|
|
|
- Optional<DTSConfig> optional = newDtsConfigs.stream()
|
|
|
- .filter(c -> c.getOdps() != null && StringUtils.equals(c.getOdps().getTable(), fk.getTableName()))
|
|
|
- .findFirst();
|
|
|
- if (!optional.isPresent()) {
|
|
|
- log.error("table {} not config", fk.getTableName());
|
|
|
+ // 从缓存获取配置,如果不存在则查找并缓存
|
|
|
+ DTSConfig config = getConfig(tableName);
|
|
|
+ if (config == null) {
|
|
|
+ log.error("table {} not config", tableName);
|
|
|
return "";
|
|
|
}
|
|
|
- DTSConfig config = optional.get();
|
|
|
|
|
|
// Note:写入和读取的key生成规则应保持一致
|
|
|
List<String> fields = config.getRedis().getKey();
|