|
@@ -3,6 +3,7 @@ package com.tzld.piaoquan.recommend.server.service.flowpool;
|
|
|
import com.tzld.piaoquan.recommend.server.model.TripleConsumer;
|
|
|
import com.tzld.piaoquan.recommend.server.model.Video;
|
|
|
import com.tzld.piaoquan.recommend.server.service.ThreadPoolFactory;
|
|
|
+import com.tzld.piaoquan.recommend.server.util.JSONUtils;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.apache.commons.collections4.MapUtils;
|
|
@@ -72,34 +73,14 @@ public class FlowPoolService {
|
|
|
|
|
|
private void asyncDelDistributeCountWithLevel(Map<Long, String> videoFlowPoolMap) {
|
|
|
asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- redisTemplate.opsForSet().remove(String.format(KEY_WITH_LEVEL_FORMAT, appType, level), values);
|
|
|
+ String key = String.format(KEY_WITH_LEVEL_FORMAT, appType, level);
|
|
|
+ Long count = redisTemplate.opsForSet().remove(key, values);
|
|
|
+ log.info("asyncDelDistributeCountWithLevel remove key={}, values={}, count={}",
|
|
|
+ key, JSONUtils.toJson(values), count);
|
|
|
redisTemplate.opsForSet().remove(String.format(KEY_QUICK_WITH_LEVEL_FORMAT, appType), values);
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- private void asyncDelDistributeCount(Map<Long, String> videoFlowPoolMap,
|
|
|
- TripleConsumer<Integer, String, List<String>> flowPoolRemoveConsumer) {
|
|
|
- if (MapUtils.isEmpty(videoFlowPoolMap)) {
|
|
|
- return;
|
|
|
- }
|
|
|
- pool.execute(() -> {
|
|
|
- List<String> keys = videoFlowPoolMap.entrySet().stream()
|
|
|
- .map(v -> String.format(localDistributeCountFormat, v.getKey(), v.getValue()))
|
|
|
- .collect(Collectors.toList());
|
|
|
- redisTemplate.delete(keys);
|
|
|
-
|
|
|
- Map<String, Double> levelWeight = flowPoolConfigService.getLevelWeight();
|
|
|
- List<String> values = videoFlowPoolMap.entrySet().stream()
|
|
|
- .map(v -> String.format(valueFormat, v.getKey(), v.getValue()))
|
|
|
- .collect(Collectors.toList());
|
|
|
- for (String level : levelWeight.keySet()) {
|
|
|
- for (int appType : appTypes) {
|
|
|
- flowPoolRemoveConsumer.accept(appType, level, values);
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-
|
|
|
public Map<Long, Integer> getDistributeCountWithLevelScore(Map<Long, String> videoFlowPoolMap) {
|
|
|
if (MapUtils.isEmpty(videoFlowPoolMap)) {
|
|
|
return Collections.emptyMap();
|
|
@@ -129,7 +110,10 @@ public class FlowPoolService {
|
|
|
|
|
|
private void asyncLocalDistributeCountWithLevelScore(Map<Long, String> videoFlowPoolMap) {
|
|
|
asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- redisTemplate.opsForZSet().remove(String.format(KEY_WITH_LEVEL_SCORE_FORMAT, appType, level), values);
|
|
|
+ String key = String.format(KEY_WITH_LEVEL_SCORE_FORMAT, appType, level);
|
|
|
+ Long count = redisTemplate.opsForZSet().remove(key, values);
|
|
|
+ log.info("asyncLocalDistributeCountWithLevelScore remove key={}, values={}, count={}",
|
|
|
+ key, JSONUtils.toJson(values), count);
|
|
|
redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT, appType), values);
|
|
|
});
|
|
|
}
|
|
@@ -160,7 +144,11 @@ public class FlowPoolService {
|
|
|
|
|
|
private void asyncDelDistributeCountWithScore(Map<Long, String> videoFlowPoolMap) {
|
|
|
asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> {
|
|
|
- redisTemplate.opsForZSet().remove(String.format(KEY_WITH_SCORE_FORMAT, appType, level), values);
|
|
|
+ String key = String.format(KEY_WITH_SCORE_FORMAT, appType, level);
|
|
|
+ Long count = redisTemplate.opsForZSet().remove(key, values);
|
|
|
+ log.info("asyncDelDistributeCountWithScore remove key={}, values={}, count={}",
|
|
|
+ key, JSONUtils.toJson(values), count);
|
|
|
+
|
|
|
redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_SCORE_FORMAT, appType), values);
|
|
|
});
|
|
|
}
|
|
@@ -175,7 +163,8 @@ public class FlowPoolService {
|
|
|
.map(v -> String.format(localDistributeCountFormat, v.getKey(), v.getValue()))
|
|
|
.collect(Collectors.toList());
|
|
|
List<String> counts = redisTemplate.opsForValue().multiGet(keys);
|
|
|
-
|
|
|
+ log.info("getDistributeCount localDistributeCountKeys={}, counts={}", JSONUtils.toJson(keys),
|
|
|
+ JSONUtils.toJson(counts));
|
|
|
Map<Long, Integer> result = new HashMap<>();
|
|
|
for (int i = 0; i < entries.size(); i++) {
|
|
|
result.put(entries.get(i).getKey(), NumberUtils.toInt(counts.get(i), 0));
|
|
@@ -194,4 +183,32 @@ public class FlowPoolService {
|
|
|
});
|
|
|
return removeMap;
|
|
|
}
|
|
|
+
|
|
|
+ private void asyncDelDistributeCount(Map<Long, String> videoFlowPoolMap,
|
|
|
+ TripleConsumer<Integer, String, String[]> flowPoolRemoveConsumer) {
|
|
|
+ if (MapUtils.isEmpty(videoFlowPoolMap)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ pool.execute(() -> {
|
|
|
+ List<String> keys = videoFlowPoolMap.entrySet().stream()
|
|
|
+ .map(v -> String.format(localDistributeCountFormat, v.getKey(), v.getValue()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ log.info("asyncDelDistributeCount localDistributeCountKey={}", JSONUtils.toJson(keys));
|
|
|
+ redisTemplate.delete(keys);
|
|
|
+
|
|
|
+ Map<String, Double> levelWeight = flowPoolConfigService.getLevelWeight();
|
|
|
+ String[] values = new String[videoFlowPoolMap.size()];
|
|
|
+ int i = 0;
|
|
|
+ for (Map.Entry v : videoFlowPoolMap.entrySet()) {
|
|
|
+ values[i] = String.format(valueFormat, v.getKey(), v.getValue());
|
|
|
+ }
|
|
|
+ // remove 每小程序每层的数据
|
|
|
+ // TODO 现在视频只会出现在一个层级,所以可以做个优化
|
|
|
+ for (String level : levelWeight.keySet()) {
|
|
|
+ for (int appType : appTypes) {
|
|
|
+ flowPoolRemoveConsumer.accept(appType, level, values);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|