丁云鹏 1 gadu atpakaļ
vecāks
revīzija
2fa753839d

+ 11 - 6
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/RecommendService.java

@@ -100,21 +100,24 @@ public class RecommendService {
         if (request.getVersionAuditStatus() == 1) {
             return specialMidRecommend(request);
         }
+        Stopwatch stopwatch = Stopwatch.createStarted();
         if (StringUtils.isNotBlank(request.getMid())
                 && redisTemplate.opsForSet().isMember("special:mid", request.getMid())) {
             return specialMidRecommend(request);
         }
+        log.info("sprcial mid cost={}", stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+        stopwatch.reset().start();
 
-        Stopwatch stopwatch = Stopwatch.createStarted();
         RecommendParam param = genRecommendParam(request, recommendType);
-        log.info("genRecommendParam={}, cost={}", JSONUtils.toJson(param),
+        log.info("genRecommendParam={},genRecommendParam cost={}", JSONUtils.toJson(param),
                 stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
         stopwatch.reset().start();
         List<Video> videos = videoRecommend(param);
-        log.info("videoRecommend={}, cost={}", JSONUtils.toJson(videos), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+        log.info("videoRecommend={}, videoRecommend cost={}", JSONUtils.toJson(videos),
+                stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
         stopwatch.reset().start();
         updateCache(request, param, videos);
-        log.info("updateCache={}, cost={}", JSONUtils.toJson(videos), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+        log.info("updateCache={}, updateCache cost={}", JSONUtils.toJson(videos), stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
 
         // 更新position
         List<VideoProto> vps = new ArrayList<>();
@@ -276,11 +279,13 @@ public class RecommendService {
     private List<Video> videoRecommend(RecommendParam param) {
         Stopwatch stopwatch = Stopwatch.createStarted();
         RecallResult recallResult = recallService.recall(convertToRecallParam(param));
-        log.info("recallResult={}, cost={}", recallResult, stopwatch.elapsed(TimeUnit.MILLISECONDS));
+        log.info("recallResult={}, videoRecommend recallResult cost={}", recallResult,
+                stopwatch.elapsed(TimeUnit.MILLISECONDS));
         stopwatch.reset().start();
 
         RankResult rankResult = rankService.rank(convertToRankParam(param, recallResult));
-        log.info("rankResult={}, cost={}", rankResult, stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
+        log.info("rankResult={}, videoRecommend rank cost={}", rankResult,
+                stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
 
 
         if (rankResult == null || CollectionUtils.isEmpty(rankResult.getVideos())) {

+ 13 - 10
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/ViewedService.java

@@ -1,7 +1,9 @@
 package com.tzld.piaoquan.recommend.server.service;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import com.google.common.reflect.TypeToken;
+import com.tzld.piaoquan.recommend.server.util.HttpClientFactory;
 import com.tzld.piaoquan.recommend.server.util.JSONUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.HttpEntity;
@@ -9,7 +11,6 @@ import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
@@ -19,6 +20,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author dyp
@@ -32,7 +34,8 @@ public class ViewedService {
     @Value("${video.filter.url:}")
     private String videoFilterUrl;
 
-    private CloseableHttpClient clients = HttpClients.createDefault();
+    private CloseableHttpClient client = HttpClientFactory.create(1000, 3000, 200, 1000, 0, 500);
+
 
     @PostConstruct
     public void init() {
@@ -41,18 +44,17 @@ public class ViewedService {
         viewedTypesMap.put(13, Lists.newArrayList(1));
     }
 
+
     // TODO 如果过滤失败,那么认为所有视频都被过滤掉
     public List<Long> filterViewedVideo(int appType, String mid, String uid, List<Long> videoIds) {
-        List<Integer> viewedTypes = viewedTypesMap.getOrDefault(appType, defaultViewedTypes);
 
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        List<Integer> viewedTypes = viewedTypesMap.getOrDefault(appType, defaultViewedTypes);
         CloseableHttpResponse chr = null;
-
         try {
-
             HttpPost post = new HttpPost(videoFilterUrl);
             post.addHeader("Connection", "close");
             post.addHeader("content-type", "application/json");
-
             Map<String, Object> param = new HashMap<>();
             param.put("appType", appType);
             param.put("mid", mid);
@@ -61,13 +63,13 @@ public class ViewedService {
             param.put("videoIds", videoIds);
             post.setEntity(new StringEntity(JSONUtils.toJson(param)));
 
-            log.info("request param={}", JSONUtils.toJson(param));
-            chr = clients.execute(post);
+            log.info("request={}", JSONUtils.toJson(param));
+            chr = client.execute(post);
+            log.info("response={}", JSONUtils.toJson(chr));
             if (chr == null
                     || chr.getStatusLine() == null
                     || chr.getStatusLine().getStatusCode() != 200) {
-                log.error("filterViewedVideo failed http status exception! videoFilterUrl={}, CloseableHttpResponse={}",
-                        videoFilterUrl, chr);
+                log.error("filterViewedVideo failed http status exception!");
                 return Collections.emptyList();
             }
             HttpEntity entity = chr.getEntity();
@@ -93,6 +95,7 @@ public class ViewedService {
         } catch (Exception e) {
             log.error("invoke http filterViewedVideo error", e);
         } finally {
+            log.info("filterViewedVideo cost={}", stopwatch.stop().elapsed(TimeUnit.MILLISECONDS));
             try {
                 if (chr != null) {
                     chr.close();

+ 1 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/filter/AbstractFilterService.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.service.filter;
 
+import com.google.common.base.Stopwatch;
 import com.tzld.piaoquan.recommend.server.service.PreViewedService;
 import com.tzld.piaoquan.recommend.server.service.ViewedService;
 import com.tzld.piaoquan.recommend.server.util.JSONUtils;

+ 1 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolWithLevelRecallStrategy.java

@@ -1,5 +1,6 @@
 package com.tzld.piaoquan.recommend.server.service.recall.strategy;
 
+import com.google.common.base.Stopwatch;
 import com.tzld.piaoquan.recommend.server.model.Video;
 import com.tzld.piaoquan.recommend.server.service.filter.FilterResult;
 import com.tzld.piaoquan.recommend.server.service.filter.FlowPoolWithLevelFilterService;

+ 136 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/HttpClientFactory.java

@@ -0,0 +1,136 @@
+package com.tzld.piaoquan.recommend.server.util;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.HttpClientConnectionManager;
+import org.apache.http.conn.socket.ConnectionSocketFactory;
+import org.apache.http.conn.socket.PlainConnectionSocketFactory;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultHttpRequestRetryHandler;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
+import org.apache.http.ssl.SSLContexts;
+import org.slf4j.MDC;
+
+import javax.net.ssl.SSLContext;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class HttpClientFactory {
+
+    private static final ScheduledExecutorService SCHEDULED_CLOSED_EXECUTOR = new ScheduledThreadPoolExecutor(1,
+            new BasicThreadFactory.Builder().namingPattern("http conn-closed-thread-%s").priority(Thread.NORM_PRIORITY).daemon(false).build(), (r, e) -> log.error(" monitor push reject task error={}", e.toString()));
+
+    private static final List<HttpClientConnectionManager> HTTP_CLIENT_CONNECTION_MANAGERS = Lists.newArrayList();
+
+    static {
+        SCHEDULED_CLOSED_EXECUTOR.schedule(() -> HTTP_CLIENT_CONNECTION_MANAGERS.forEach(HttpClientConnectionManager::closeExpiredConnections), 5, TimeUnit.SECONDS);
+    }
+
+    private static HttpRequestInterceptor getInterceptor() {
+        HttpRequestInterceptor requestInterceptor = (request, context) -> {
+            try {
+                String missSpanId = MDC.get("missSpanId");
+                String missTraceId = MDC.get("request-id");
+                if (missTraceId != null && !"".equals(missTraceId.trim())) {
+                    request.setHeader("request-id", missTraceId);
+                }
+                if (missSpanId != null && !"".equals(missSpanId.trim())) {
+                    request.setHeader("missSpanId", missSpanId);
+                }
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
+            }
+        };
+        return requestInterceptor;
+    }
+
+//    public String request(HttpRequestBase request) {
+//
+//        HttpEntity entity = null;
+//        try {
+//            log.info("request={}", JSONUtils.toJson(request));
+//            CloseableHttpResponse chr = request((HttpUriRequest) request);
+//            log.info("response={}", JSONUtils.toJson(chr));
+//            if (chr == null
+//                    || chr.getStatusLine() == null
+//                    || chr.getStatusLine().getStatusCode() != 200) {
+//                log.error("request failed http status exception!");
+//                return Strings.EMPTY;
+//            }
+//            entity = chr.getEntity();
+//            if (entity == null) {
+//                return Strings.EMPTY;
+//            }
+//            String content = EntityUtils.toString(entity, "UTF-8");
+//            log.info("response entity={}", JSONUtils.toJson(content));
+//        } catch (Exception e) {
+//            log.error("request error url={}", request.getURI().getPath(), e);
+//        } finally {
+//            if (request != null) {
+//                request.abort();
+//            }
+//            EntityUtils.consumeQuietly(entity);
+//        }
+//    }
+//
+//
+//    public CloseableHttpResponse request(HttpUriRequest request) {
+//        try {
+//            CloseableHttpResponse execute = closeableHttpClient.execute(request);
+//            return execute;
+//        } catch (Exception e) {
+//            log.error(String.format("http timeout request url = %s .", request.getURI().getPath()));
+//            throw new RuntimeException(e);
+//        }
+//    }
+
+    /**
+     * @param connectTimeout 连接超时时间 ms
+     * @param socketTimeout  读超时时间(等待数据超时时间)ms
+     * @param maxPerRoute    每个路由的最大连接数
+     * @param maxTotal       最大连接数
+     * @param retryCount     重试次数
+     * @return httpclient instance
+     */
+    public static CloseableHttpClient create(int connectTimeout, int socketTimeout, int maxPerRoute, int maxTotal,
+                                                int retryCount, int connectionWaitTimeout) {
+        try {
+            RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(connectTimeout).setSocketTimeout(socketTimeout).setConnectionRequestTimeout(connectionWaitTimeout).build();
+            CloseableHttpClient client = HttpClientBuilder.create()
+                    .setDefaultRequestConfig(requestConfig)
+                    .setConnectionManager(createConnectionManager(maxPerRoute, maxTotal))
+                    .setRetryHandler(new DefaultHttpRequestRetryHandler(retryCount, false)).addInterceptorFirst(getInterceptor()).build();
+            return client;
+        } catch (Throwable e) {
+            log.error("create HttpPoolClient exception", e);
+            throw new RuntimeException("create HttpPoolClient exception");
+        }
+    }
+
+    private static PoolingHttpClientConnectionManager createConnectionManager(int maxPerRoute, int maxTotal) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
+        SSLContext sslContext = SSLContexts.custom().loadTrustMaterial((chain, authType) -> true).build();
+        Registry<ConnectionSocketFactory> registry = RegistryBuilder.<ConnectionSocketFactory>create()
+                .register("http", PlainConnectionSocketFactory.getSocketFactory())
+                .register("https", new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE)).build();
+        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(registry);
+        cm.setDefaultMaxPerRoute(maxPerRoute);
+        cm.setMaxTotal(maxTotal);
+        HTTP_CLIENT_CONNECTION_MANAGERS.add(cm);
+        return cm;
+    }
+
+}