Quellcode durchsuchen

wa:filter concurrent

丁云鹏 vor 1 Jahr
Ursprung
Commit
2e7337ff48

+ 45 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/AbstractFilterService.java

@@ -49,11 +49,55 @@ public abstract class AbstractFilterService {
         // log.info("filterByRiskVideos videoIds={}", JSONUtils.toJson(videoIds));
         videoIds = filterByPreViewed(param.getAppType(), param.getMid(), videoIds);
         // log.info("filterByPreViewed videoIds={}", JSONUtils.toJson(videoIds));
-        videoIds = filterByViewed(param.getAppType(), param.getMid(), param.getUid(), videoIds);
+        if (param.isConcurrent()) {
+            videoIds = filterByViewedConcurrent(param.getAppType(), param.getMid(), param.getUid(), videoIds);
+        } else {
+            videoIds = filterByViewed(param.getAppType(), param.getMid(), param.getUid(), videoIds);
+        }
         // log.info("filterByViewed videoIds={}", JSONUtils.toJson(videoIds));
         return videoIds;
     }
 
+    private List<Long> filterByViewedConcurrent(int appType, String mid, String uid, List<Long> videoIds) {
+        // TODO uid为空时,还需要过滤么?
+        if (StringUtils.isBlank(mid)
+                || CollectionUtils.isEmpty(videoIds)) {
+            return videoIds;
+        }
+
+        int chunkSize = 20;
+        Collection<List<Long>> chunks = videoIds.stream()
+                .collect(Collectors.groupingBy(it -> it / chunkSize))
+                .values();
+
+        CountDownLatch cdl = new CountDownLatch(chunks.size());
+        List<Future<List<Long>>> futures = new ArrayList<>();
+        for (final List<Long> ids : chunks) {
+            Future<List<Long>> future = pool.submit(() ->
+                    viewedService.filterViewedVideo(appType, mid, uid, ids));
+            futures.add(future);
+        }
+        try {
+            cdl.await(1000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.error("filter error", e);
+            return null;
+        }
+
+        List<Long> result = new ArrayList<>();
+        for (Future<List<Long>> f : futures) {
+            try {
+                result.addAll(f.get());
+            } catch (Exception e) {
+                log.error("future get error ", e);
+            }
+        }
+
+        return result;
+
+
+    }
+
     private List<Long> filterByViewed(int appType, String mid, String uid, List<Long> videoIds) {
         // TODO uid为空时,还需要过滤么?
         if (StringUtils.isBlank(mid)

+ 2 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/FilterParam.java

@@ -24,4 +24,6 @@ public class FilterParam {
     private String regionCode;
     private int forceTruncation = 1000;
     private Set<String> abExpCodes;
+
+    private boolean concurrent; // hardcode 临时解决过滤慢的问题
 }