|
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionException;
|
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
|
import java.util.concurrent.Future;
|
|
import java.util.concurrent.Future;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
|
|
public class PAIScorerV2 extends AbstractScorer {
|
|
public class PAIScorerV2 extends AbstractScorer {
|
|
|
|
|
|
|
@@ -102,31 +103,45 @@ public class PAIScorerV2 extends AbstractScorer {
|
|
|
for (List<AdRankItem> batch : batches) {
|
|
for (List<AdRankItem> batch : batches) {
|
|
|
futures.add(executor.submit(() -> {
|
|
futures.add(executor.submit(() -> {
|
|
|
long queueWaitTime = System.currentTimeMillis() - submitTime;
|
|
long queueWaitTime = System.currentTimeMillis() - submitTime;
|
|
|
|
|
+ long batchStartTime = System.currentTimeMillis();
|
|
|
try {
|
|
try {
|
|
|
multipleCtrScore(batch, userFeatureMap, sceneFeatureMap, model);
|
|
multipleCtrScore(batch, userFeatureMap, sceneFeatureMap, model);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
LOGGER.error("Error during multipleCtrScore batch execution", e);
|
|
LOGGER.error("Error during multipleCtrScore batch execution", e);
|
|
|
}
|
|
}
|
|
|
- LOGGER.info("PAIScorerV2 batch done batchSize={} queueWaitTime={}ms",
|
|
|
|
|
- batch.size(), queueWaitTime);
|
|
|
|
|
|
|
+ long batchExecuteTime = System.currentTimeMillis() - batchStartTime;
|
|
|
|
|
+ if (queueWaitTime > 100) {
|
|
|
|
|
+ LOGGER.error("PAIScorerV2 batch queue wait too long batchSize={} queueWaitTime={}ms executeTime={}ms",
|
|
|
|
|
+ batch.size(), queueWaitTime, batchExecuteTime);
|
|
|
|
|
+ }
|
|
|
return batch;
|
|
return batch;
|
|
|
}));
|
|
}));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 合并结果
|
|
// 合并结果
|
|
|
List<AdRankItem> merged = new ArrayList<>();
|
|
List<AdRankItem> merged = new ArrayList<>();
|
|
|
|
|
+ int batchIndex = 0;
|
|
|
for (Future<List<AdRankItem>> future : futures) {
|
|
for (Future<List<AdRankItem>> future : futures) {
|
|
|
|
|
+ long getStartTime = System.currentTimeMillis();
|
|
|
try {
|
|
try {
|
|
|
merged.addAll(future.get(400, TimeUnit.MILLISECONDS));
|
|
merged.addAll(future.get(400, TimeUnit.MILLISECONDS));
|
|
|
|
|
+ } catch (TimeoutException e) {
|
|
|
|
|
+ long waitTime = System.currentTimeMillis() - getStartTime;
|
|
|
|
|
+ LOGGER.error("PAIScorerV2 batch timeout batchIndex={} waitTime={}ms totalElapsed={}ms",
|
|
|
|
|
+ batchIndex, waitTime, System.currentTimeMillis() - startTime);
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
- LOGGER.error("Execution error in batch", e);
|
|
|
|
|
|
|
+ LOGGER.error("Execution error in batch batchIndex={}", batchIndex, e);
|
|
|
}
|
|
}
|
|
|
|
|
+ batchIndex++;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
Collections.sort(merged);
|
|
Collections.sort(merged);
|
|
|
|
|
|
|
|
- LOGGER.info("PAIScorerV2 rankByJava done itemSize={} batchCount={} totalTime={}ms",
|
|
|
|
|
- items.size(), batches.size(), System.currentTimeMillis() - startTime);
|
|
|
|
|
|
|
+ long totalTime = System.currentTimeMillis() - startTime;
|
|
|
|
|
+ if (totalTime > 300) {
|
|
|
|
|
+ LOGGER.error("PAIScorerV2 rankByJava slow itemSize={} batchCount={} totalTime={}ms",
|
|
|
|
|
+ items.size(), batches.size(), totalTime);
|
|
|
|
|
+ }
|
|
|
return merged;
|
|
return merged;
|
|
|
}
|
|
}
|
|
|
|
|
|