jiandong.yh vor 1 Tag
Ursprung
Commit
c52a3a5778

+ 28 - 0
ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/thread/ThreadPoolFactory.java

@@ -26,6 +26,15 @@ public final class ThreadPoolFactory {
             new ThreadFactoryBuilder().setNameFormat("Feature-%d").build(),
             new ThreadPoolExecutor.AbortPolicy());
 
+    private final static ExecutorService FEATURE_OPTIMIZED = new CommonThreadPoolExecutor(
+            256,
+            256,
+            0L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(5000),
+            new ThreadFactoryBuilder().setNameFormat("Optimized-%d").build(),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+
     private final static ExecutorService SCORE = new CommonThreadPoolExecutor(
             512,
             512,
@@ -34,6 +43,15 @@ public final class ThreadPoolFactory {
             new ThreadFactoryBuilder().setNameFormat("SCORE-%d").build(),
             new ThreadPoolExecutor.AbortPolicy());
 
+    private final static ExecutorService SCORE_OPTIMIZED = new CommonThreadPoolExecutor(
+            64,
+            64,
+            0L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<>(5000),
+            new ThreadFactoryBuilder().setNameFormat("Score-Opt-%d").build(),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+
+
     public static ExecutorService defaultPool() {
         return DEFAULT;
     }
@@ -42,9 +60,19 @@ public final class ThreadPoolFactory {
         return FEATURE;
     }
 
+    public static ExecutorService featureOptimized() {
+        return FEATURE_OPTIMIZED;
+    }
+
+
     public static ExecutorService score() {
         return SCORE;
     }
 
+    public static ExecutorService scoreOptimized() {
+        return SCORE_OPTIMIZED;
+    }
+
+
 
 }

+ 9 - 0
ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/util/ExtractorUtils.java

@@ -10,6 +10,15 @@ import java.util.Map;
 
 public class ExtractorUtils {
 
+    public static double cachedConceptSimilarity(String str1, String str2) {
+        Double score = SimilarityCache.get(str1, str2);
+        if (score == null) {
+            score = Similarity.conceptSimilarity(str1, str2);
+            SimilarityCache.put(str1, str2, score);
+        }
+        return score;
+    }
+
     public static double sigmoid(double x) {
         return 1.0 / (1.0 + Math.exp(-x));
     }

+ 50 - 0
ad-engine-commons/src/main/java/com/tzld/piaoquan/ad/engine/commons/util/SimilarityCache.java

@@ -0,0 +1,50 @@
+package com.tzld.piaoquan.ad.engine.commons.util;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 文本相似度计算缓存
+ * 封装 Caffeine Cache,解决 Similarity.conceptSimilarity 计算耗时高的问题
+ */
+@Slf4j
+public class SimilarityCache {
+
+    /**
+     * Cache<Key, Score>
+     * Key格式: str1 + "::" + str2 (字典序排序,保证A-B和B-A的一致性)
+     * Value: 相似度分数
+     */
+    private static final Cache<String, Double> cache = Caffeine.newBuilder()
+            .maximumSize(50000) // 最大缓存 5万条
+            .expireAfterWrite(10, TimeUnit.MINUTES) // 写入10分钟后过期
+            .recordStats() // 开启统计
+            .build();
+
+    /**
+     * 获取缓存Key,确保 str1 和 str2 顺序无关
+     */
+    private static String getKey(String str1, String str2) {
+        if (str1 == null) str1 = "";
+        if (str2 == null) str2 = "";
+
+        if (str1.compareTo(str2) <= 0) {
+            return str1 + "::" + str2;
+        } else {
+            return str2 + "::" + str1;
+        }
+    }
+
+    public static Double get(String str1, String str2) {
+        return cache.getIfPresent(getKey(str1, str2));
+    }
+
+    public static void put(String str1, String str2, Double score) {
+        if (score != null) {
+            cache.put(getKey(str1, str2), score);
+        }
+    }
+}

+ 9 - 0
ad-engine-server/pom.xml

@@ -47,6 +47,15 @@
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>8</source>
+                    <target>8</target>
+                    <encoding>UTF-8</encoding>
+                </configuration>
+            </plugin>
         </plugins>
 
     </build>

+ 15 - 1
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/scorer/PAIScorer.java

@@ -26,6 +26,10 @@ public class PAIScorer extends AbstractScorer {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(PAIScorer.class);
 
+    @org.springframework.beans.factory.annotation.Value("${rank.strategy.optimize.switch:false}")
+    private boolean optimizeSwitch;
+
+
 
     public PAIScorer(ScorerConfigInfo configInfo) {
         super(configInfo);
@@ -94,7 +98,17 @@ public class PAIScorer extends AbstractScorer {
             batches.add(new ArrayList<>(items.subList(i, Math.min(i + batchSize, items.size()))));
         }
 
-        ExecutorService executor = ThreadPoolFactory.score();
+        for (int i = 0; i < items.size(); i += batchSize) {
+            batches.add(new ArrayList<>(items.subList(i, Math.min(i + batchSize, items.size()))));
+        }
+
+        ExecutorService executor;
+        if (optimizeSwitch) {
+             executor = ThreadPoolFactory.scoreOptimized();
+        } else {
+             executor = ThreadPoolFactory.score();
+        }
+
         List<Future<List<AdRankItem>>> futures = new ArrayList<>();
 
         for (List<AdRankItem> batch : batches) {

+ 14 - 5
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/strategy/RankStrategyBasic.java

@@ -26,10 +26,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 
 import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.*;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -143,6 +140,8 @@ public abstract class RankStrategyBasic implements RankStrategy {
 
     private static final double DEFAULT_CORRECTION = 1.0;
 
+    @Value("${rank.strategy.optimize.switch:false}")
+    protected boolean optimizeSwitch;
 
     protected static final List<String> hasChannelScenes = new ArrayList<String>() {{
         add("DaiTou");
@@ -217,7 +216,14 @@ public abstract class RankStrategyBasic implements RankStrategy {
 
         try {
             // 执行所有的任务并等待所有任务完成
-            List<Future<Feature>> futures = executorService.invokeAll(tasks);
+            List<Future<Feature>> futures;
+            if (optimizeSwitch) {
+                // 优化:设置5秒超时,避免无限等待
+                futures = executorService.invokeAll(tasks, 5000, TimeUnit.MILLISECONDS);
+            } else {
+                futures = executorService.invokeAll(tasks);
+            }
+
 
             // 等待所有任务完成并合并结果
             for (Future<Feature> future : futures) {
@@ -234,8 +240,11 @@ public abstract class RankStrategyBasic implements RankStrategy {
                 } catch (ExecutionException e) {
                     log.error("获取特征批次失败: {}", e.getCause() != null ? e.getCause().getMessage() : e.getMessage());
                     // 继续处理其他批次,不中断整个流程
+                } catch (java.util.concurrent.CancellationException e) {
+                   log.warn("获取特征批次被取消(可能是超时): {}", e.getMessage());
                 }
             }
+
         } catch (InterruptedException e) {
             log.error("getFeature interrupted", e);
             Thread.currentThread().interrupt();

+ 69 - 9
ad-engine-service/src/main/java/com/tzld/piaoquan/ad/engine/service/score/strategy/RankStrategyBy688.java

@@ -27,10 +27,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 import java.util.stream.Collectors;
 
 import static com.tzld.piaoquan.ad.engine.commons.math.Const.*;
@@ -158,8 +155,13 @@ public class RankStrategyBy688 extends RankStrategyBasic {
         Random random = new Random();
         List<Future<AdRankItem>> futures = new ArrayList<>();
         CountDownLatch cdl1 = new CountDownLatch(request.getAdIdList().size());
+
+        // 优化:根据开关选择线程池
+        ExecutorService featurePool = optimizeSwitch ? ThreadPoolFactory.featureOptimized() : ThreadPoolFactory.feature();
+
         for (AdPlatformCreativeDTO dto : request.getAdIdList()) {
-            Future<AdRankItem> future = ThreadPoolFactory.feature().submit(() -> {
+            Future<AdRankItem> future = featurePool.submit(() -> {
+
                 AdRankItem adRankItem = new AdRankItem();
                 try {
                     adRankItem.setAdId(dto.getCreativeId());
@@ -173,14 +175,30 @@ public class RankStrategyBy688 extends RankStrategyBasic {
                     adRankItem.setSkuId(dto.getSkuId());
                     adRankItem.setCustomerId(dto.getCustomerId());
                     adRankItem.setProfession(dto.getProfession());
-                    adRankItem.setRandom(random.nextInt(1000));
+                    adRankItem.setProfession(dto.getProfession());
+
+                    // 优化:使用 ThreadLocalRandom 减少锁竞争
+                    if (optimizeSwitch) {
+                        adRankItem.setRandom(java.util.concurrent.ThreadLocalRandom.current().nextInt(1000));
+                    } else {
+                        adRankItem.setRandom(random.nextInt(1000));
+                    }
+
                     if (noApiAdVerIds.contains(dto.getAdVerId())) {
                         adRankItem.getExt().put("isApi", "0");
                     } else {
                         adRankItem.getExt().put("isApi", "1");
                     }
                     adRankItem.getExt().put("recallsources", dto.getRecallSources());
-                    adRankItem.getExt().put("correctCpaMap", JSONObject.toJSONString(correctCpaMap.get(dto.getAdId())));
+                    adRankItem.getExt().put("recallsources", dto.getRecallSources());
+
+                    // 优化:直接存对象,避免循环内 JSON 序列化
+                    if (optimizeSwitch) {
+                        adRankItem.getExt().put("correctCpaMap", correctCpaMap.get(dto.getAdId()));
+                    } else {
+                        adRankItem.getExt().put("correctCpaMap", JSONObject.toJSONString(correctCpaMap.get(dto.getAdId())));
+                    }
+
                     adRankItem.getExt().put("correctionFactor", correctCpaMap.get(dto.getAdId()).getCorrectionFactor());
                     setGuaranteeWeight(map, dto.getAdVerId(), adRankItem.getExt(), isGuaranteedFlow, reqFeature);
                     String cidStr = dto.getCreativeId().toString();
@@ -267,7 +285,7 @@ public class RankStrategyBy688 extends RankStrategyBasic {
         userFeatureMap = this.featureBucket(userFeatureMap);
         CountDownLatch cdl4 = new CountDownLatch(adRankItems.size());
         for (AdRankItem adRankItem : adRankItems) {
-            ThreadPoolFactory.feature().submit(() -> {
+            featurePool.submit(() -> {
                 try {
                     Map<String, String> featureMap = adRankItem.getFeatureMap();
                     adRankItem.setFeatureMap(this.featureBucket(featureMap));
@@ -689,7 +707,12 @@ public class RankStrategyBy688 extends RankStrategyBasic {
         if (scoreParam.getExpCodeSet().contains(word2vecExp)) {
             score = SimilarityUtils.word2VecSimilarity(cTitle, vTitle);
         } else {
-            score = Similarity.conceptSimilarity(cTitle, vTitle);
+            // 优化:使用 Caffeine 缓存相似度计算结果
+            if (optimizeSwitch) {
+                score = ExtractorUtils.cachedConceptSimilarity(cTitle, vTitle);
+            } else {
+                score = Similarity.conceptSimilarity(cTitle, vTitle);
+            }
         }
         featureMap.put("ctitle_vtitle_similarity", String.valueOf(score));
     }
@@ -718,8 +741,22 @@ public class RankStrategyBy688 extends RankStrategyBasic {
                     if (scoreParam.getExpCodeSet().contains(word2vecExp)) {
                         doubles = ExtractorUtils.funcC34567ForTagsNew(tags, title);
                     } else {
+                        // 优化:使用带缓存的 Word2Vec 相似度 (如果需要可进一步优化 word2vec)
+                        // 这里暂时只优化 conceptSimilarity
                         doubles = ExtractorUtils.funcC34567ForTags(tags, title);
+                        if (optimizeSwitch) {
+                             // 如果 funcC34567ForTags 内部也用了 conceptSimilarity,需要透传开关或使用 cached 版本
+                             // 由于 funcC34567ForTags 是静态方法,我们在方法内部判断?
+                             // 实际上 ExtractorUtils.funcC34567ForTags 内部调用了 Similarity.conceptSimilarity
+                             // 我们应该修改 ExtractorUtils 或者在这里替换实现
+                             // 简单起见,我们复制 funcC34567ForTags 的逻辑到这里并使用 cachedConceptSimilarity 
+                             // 或者修改 ExtractorUtils.funcC34567ForTags 让他支持缓存? 
+                             // 最好的办法是修改 ExtractorUtils.funcC34567ForTags
+                             // 但为了稳妥,我们调用新的 cached 方法
+                            doubles = funcC34567ForTagsCached(tags, title);
+                        }
                     }
+
                     featureMap.put(prefix + "_" + tagsField + "_matchnum", String.valueOf(doubles[0]));
                     featureMap.put(prefix + "_" + tagsField + "_maxscore", String.valueOf(doubles[1]));
                     featureMap.put(prefix + "_" + tagsField + "_avgscore", String.valueOf(doubles[2]));
@@ -908,4 +945,27 @@ public class RankStrategyBy688 extends RankStrategyBasic {
         }
         return newFeatureMap;
     }
+    private Double[] funcC34567ForTagsCached(String tags, String title) {
+        String[] tagsList = tags.split(",");
+        int d1 = 0;
+        double d3 = 0.0;
+        double d4 = 0.0;
+
+        for (String tag : tagsList) {
+            if (title.contains(tag)) {
+                d1++;
+            }
+            // 使用缓存的相似度计算
+            double score = ExtractorUtils.cachedConceptSimilarity(tag, title);
+            if (score > d3) {
+                d3 = score;
+            }
+            d4 += score;
+        }
+
+        d4 = (tagsList.length > 0) ? d4 / tagsList.length : d4;
+
+        return new Double[]{(double) d1, d3, d4};
+    }
+
 }