| 
					
				 | 
			
			
				@@ -84,8 +84,8 @@ public class FlowPoolService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             redisTemplate.opsForSet().remove(String.format(KEY_QUICK_WITH_LEVEL_FORMAT, appType), values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            redisTemplate.opsForSet().remove(KEY_WITH_LEVEL_FORMAT_V2, values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        asyncDelDistributeCountV2(videoFlowPoolMap, (appType, level, values) -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            redisTemplate.opsForSet().remove(String.format(KEY_WITH_LEVEL_FORMAT_V2, appType, level), values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             redisTemplate.opsForSet().remove(KEY_QUICK_WITH_LEVEL_FORMAT_V2, values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -124,50 +124,12 @@ public class FlowPoolService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT, appType), values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            redisTemplate.opsForZSet().remove(KEY_WITH_LEVEL_SCORE_FORMAT_V2, values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        asyncDelDistributeCountV2(videoFlowPoolMap, (appType, level, values) -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            redisTemplate.opsForZSet().remove(String.format(KEY_WITH_LEVEL_SCORE_FORMAT_V2, appType, level), values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             redisTemplate.opsForZSet().remove(KEY_QUICK_WITH_LEVEL_SCORE_FORMAT_V2, values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public Map<Long, Integer> getDistributeCountWithScore(Map<Long, String> videoFlowPoolMap) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if (MapUtils.isEmpty(videoFlowPoolMap)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            return Collections.emptyMap(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<Long, Integer> result = getDistributeCount(videoFlowPoolMap); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        // 处理脏数据:分发数<0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<Long, String> dirties = videoFlowPoolMap.entrySet().stream() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                .filter(e -> result.get(e.getKey()) <= 0) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                .collect(Collectors.toMap( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        e -> e.getKey(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        e -> e.getValue() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                )); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        asyncDelDistributeCountWithScore(dirties); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        return result; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public void updateDistributeCountWithScore(List<Video> videos) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if (CollectionUtils.isEmpty(videos)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            return; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<Long, String> removeMap = updateDistributeCount(videos); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        asyncDelDistributeCountWithScore(removeMap); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private void asyncDelDistributeCountWithScore(Map<Long, String> videoFlowPoolMap) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            String key = String.format(KEY_WITH_SCORE_FORMAT, appType, level); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            Long count = redisTemplate.opsForZSet().remove(key, values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            redisTemplate.opsForZSet().remove(String.format(KEY_QUICK_WITH_SCORE_FORMAT, appType), values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            redisTemplate.opsForZSet().remove(KEY_WITH_SCORE_FORMAT_V2, values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            redisTemplate.opsForZSet().remove(KEY_QUICK_WITH_SCORE_FORMAT_V2, values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     private Map<Long, Integer> getDistributeCount(Map<Long, String> videoFlowPoolMap) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         // 为了保证有序 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         List<Map.Entry<Long, String>> entries = videoFlowPoolMap.entrySet().stream() 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -225,70 +187,27 @@ public class FlowPoolService { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public Map<Long, Integer> getSupplyDistributeCountWithLevel(Map<Long, String> videoFlowPoolMap) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if (MapUtils.isEmpty(videoFlowPoolMap)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            return Collections.emptyMap(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<Long, Integer> result = getSupplyDistributeCount(videoFlowPoolMap); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        // 处理脏数据:分发数<0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<Long, String> dirties = videoFlowPoolMap.entrySet().stream() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                .filter(e -> result.get(e.getKey()) <= 0) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                .collect(Collectors.toMap( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        e -> e.getKey(), 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        e -> e.getValue() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                )); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        asyncDelSupplyDistributeCountWithLevel(dirties); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        return result; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private Map<Long, Integer> getSupplyDistributeCount(Map<Long, String> videoFlowPoolMap) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        // 为了保证有序 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        List<Map.Entry<Long, String>> entries = videoFlowPoolMap.entrySet().stream() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                .sorted(Comparator.comparingLong(e -> e.getKey())) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                .collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        List<String> keys = entries.stream() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                .map(v -> String.format(supplyLocalDistributeCountFormat, v.getKey(), v.getValue())) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                .collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        List<String> counts = redisTemplate.opsForValue().multiGet(keys); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<Long, Integer> result = new HashMap<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        for (int i = 0; i < entries.size(); i++) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            result.put(entries.get(i).getKey(), NumberUtils.toInt(counts.get(i), 0)); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        return result; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private Map<Long, String> updateSupplyDistributeCount(List<Video> videos) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        // TODO 异步更新 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<Long, String> removeMap = new HashMap<>(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        videos.stream().forEach(v -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            String key = String.format(supplyLocalDistributeCountFormat, v.getVideoId(), v.getFlowPool()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            Long count = redisTemplate.opsForValue().decrement(key); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            if (count <= 0) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                removeMap.put(v.getVideoId(), v.getFlowPool()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        return removeMap; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    public void updateSupplyDistributeCountWithLevel(List<Video> videos) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if (CollectionUtils.isEmpty(videos)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    private void asyncDelDistributeCountV2(Map<Long, String> videoFlowPoolMap, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                           TripleConsumer<Integer, String, String[]> flowPoolRemoveConsumer) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        if (MapUtils.isEmpty(videoFlowPoolMap)) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             return; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        Map<Long, String> removeMap = updateSupplyDistributeCount(videos); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        asyncDelSupplyDistributeCountWithLevel(removeMap); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				- 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pool.execute(() -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            List<String> keys = videoFlowPoolMap.entrySet().stream() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    .map(v -> String.format(localDistributeCountFormat, v.getKey(), v.getValue())) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    .collect(Collectors.toList()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            redisTemplate.delete(keys); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    private void asyncDelSupplyDistributeCountWithLevel(Map<Long, String> videoFlowPoolMap) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        asyncDelDistributeCount(videoFlowPoolMap, (appType, level, values) -> { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            String key = String.format(KEY_WITH_LEVEL_SUPPLY_FORMAT, appType, level); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            Long count = redisTemplate.opsForSet().remove(key, values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            Map<String, Double> levelWeight = flowPoolConfigService.getLevelWeight4FlowPoolWithLevel(); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            String[] values = new String[videoFlowPoolMap.size()]; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            int i = 0; 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            for (Map.Entry v : videoFlowPoolMap.entrySet()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                values[i++] = String.format(valueFormat, v.getKey(), v.getValue()); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            for (String level : levelWeight.keySet()) { 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                flowPoolRemoveConsumer.accept(0, level, values); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         }); 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     } 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 } 
			 |