|
@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
import java.util.concurrent.*;
|
|
|
|
|
|
|
|
@@ -106,4 +107,80 @@ public class ScorerPipeline {
|
|
|
|
|
|
return items;
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ public List<AdRankItem> scoring(final Map<String, String> sceneFeatureMap,
|
|
|
+ final Map<String, String> userFeatureMap,
|
|
|
+ final List<AdRankItem> rankItems) {
|
|
|
+
|
|
|
+ if (CollectionUtils.isEmpty(scorers)) {
|
|
|
+ log.error("scorers is empty");
|
|
|
+ return rankItems;
|
|
|
+ }
|
|
|
+ List<AdRankItem> items = rankItems;
|
|
|
+
|
|
|
+ for (final AbstractScorer scorer : scorers) {
|
|
|
+ if (!scorer.isEnable()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ final int beforeSize = rankItems.size();
|
|
|
+ final long startTime = System.currentTimeMillis();
|
|
|
+
|
|
|
+ String fullScorerName = scorer.getScorerConfigInfo().getScorerName();
|
|
|
+ String[] scorerNames = fullScorerName.split("\\.");
|
|
|
+ final String scorerName = scorerNames.length > 0 ? scorerNames[scorerNames.length - 1] : fullScorerName;
|
|
|
+
|
|
|
+ final List<AdRankItem> scoreRankerItems = items;
|
|
|
+ Callable<List<AdRankItem>> callable = () -> scorer.scoring(sceneFeatureMap, userFeatureMap, scoreRankerItems);
|
|
|
+
|
|
|
+
|
|
|
+ List<AdRankItem> scoredItems = new ArrayList<AdRankItem>();
|
|
|
+ try {
|
|
|
+ List<Future<List<AdRankItem>>> futures = executorService.invokeAll(Arrays.asList(callable), SCORE_TIME_OUT, TimeUnit.MILLISECONDS);
|
|
|
+ for (Future<List<AdRankItem>> future : futures) {
|
|
|
+ try {
|
|
|
+ if (future.isDone() && !future.isCancelled() && future.get() != null) {
|
|
|
+ scoredItems.addAll(future.get());
|
|
|
+ } else {
|
|
|
+ LOGGER.error("score task is cancelled, scorename [{}] fail items [{}]",
|
|
|
+ new Object[]{scorerName, scoreRankerItems.size()});
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOGGER.error("thread pool exception scorename [{}], exception [{}]",
|
|
|
+ new Object[]{scorerName, ExceptionUtils.getFullStackTrace(e)});
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOGGER.error("thread pool exception uid [{}] scorename [{}], exception [{}]",
|
|
|
+ new Object[]{scorerName, ExceptionUtils.getFullStackTrace(e)});
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if (CollectionUtils.isNotEmpty(scoreRankerItems)) {
|
|
|
+ items = scoreRankerItems;
|
|
|
+ } else {
|
|
|
+ items = new ArrayList<>(items);
|
|
|
+ }
|
|
|
+
|
|
|
+ int position = 0;
|
|
|
+ for (AdRankItem item : items) {
|
|
|
+ item.getRankerIndex().put(scorerName, position++);
|
|
|
+ item.getRankerScore().put(scorerName, item.getScore());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ long spentTime = System.currentTimeMillis() - startTime;
|
|
|
+ LOGGER.debug("after scorer [{}], spentTime [{}], before size [{}], remaining size [{}]",
|
|
|
+ new Object[]{scorerName, spentTime, beforeSize, scoreRankerItems.size()});
|
|
|
+ }
|
|
|
+
|
|
|
+ int position = 0;
|
|
|
+ for (AdRankItem item : items) {
|
|
|
+ item.getRankerIndex().put("finalScore", position++);
|
|
|
+ item.getRankerScore().put("finalScore", item.getScore());
|
|
|
+ }
|
|
|
+
|
|
|
+ return items;
|
|
|
+ }
|
|
|
}
|