Ver código fonte

Merge branch 'vlog_merge_refactor_smz' of https://git.yishihui.com/algorithm/recommend-server into vlog_merge_refactor_smz

sunmingze 1 ano atrás
pai
commit
ab913dcd77

+ 2 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/Application.java

@@ -21,6 +21,8 @@ import org.springframework.http.converter.protobuf.ProtobufHttpMessageConverter;
 })
 @ComponentScan({
         "com.tzld.piaoquan.recommend.server.service",
+        "com.tzld.piaoquan.recommend.server.implement",
+        "com.tzld.piaoquan.recommend.server.framework.utils",
         "com.tzld.piaoquan.recommend.server.grpcservice",
         "com.tzld.piaoquan.recommend.server.remote",
         "com.tzld.piaoquan.recommend.server.config",

+ 0 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/candidiate/IndexCandidateQueue.java

@@ -3,8 +3,6 @@ package com.tzld.piaoquan.recommend.server.framework.candidiate;
 
 import com.tzld.piaoquan.recommend.server.common.base.RankItem;
 import com.tzld.piaoquan.recommend.server.framework.common.User;
-import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
-import com.tzld.piaoquan.recommend.server.framework.candidiate.QueueName;
 import com.tzld.piaoquan.recommend.server.framework.merger.*;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
 import org.apache.commons.lang3.tuple.Pair;

+ 7 - 10
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/merger/StrategyQueue.java

@@ -2,8 +2,8 @@ package com.tzld.piaoquan.recommend.server.framework.merger;
 
 
 import com.tzld.piaoquan.recommend.server.common.base.RankItem;
-import com.tzld.piaoquan.recommend.server.framework.common.User;
 import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
+import com.tzld.piaoquan.recommend.server.framework.common.User;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
 import lombok.Data;
 import org.apache.commons.lang3.tuple.Pair;
@@ -11,13 +11,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 @Data
 public abstract class StrategyQueue {
@@ -171,8 +165,11 @@ public abstract class StrategyQueue {
                 LOGGER.error("Rule for not exist child queue [{}]", rule.queueName);
             }
 
-            Map<String, Candidate> myCandidates = new HashMap<String, Candidate>();
-            n += children.get(rule.queueName).candidate(myCandidates, myRecallNum, user, requestData, requestIndex, expId);
+            Map<String, Candidate> myCandidates = new HashMap<>();
+            StrategyQueue strategyQueue = children.get(rule.queueName);
+            if (strategyQueue != null) {
+                n += strategyQueue.candidate(myCandidates, myRecallNum, user, requestData, requestIndex, expId);
+            }
             candidateMap.putAll(myCandidates);
         }
 

+ 13 - 33
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/BaseRecaller.java

@@ -1,18 +1,14 @@
 package com.tzld.piaoquan.recommend.server.framework.recaller;
 
 
-import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-
-
 import com.tzld.piaoquan.recommend.server.common.base.RankItem;
 import com.tzld.piaoquan.recommend.server.framework.candidiate.*;
 import com.tzld.piaoquan.recommend.server.framework.common.User;
 import com.tzld.piaoquan.recommend.server.framework.recaller.provider.QueueProvider;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
-import com.tzld.piaoquan.recommend.server.model.Video;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -22,12 +18,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 
 public class BaseRecaller<Video> {
@@ -88,27 +79,19 @@ public class BaseRecaller<Video> {
     // 读取redis中的数据放入queue中
     public Map<Candidate, Queue<Video>> loadQueues(List<Candidate> candidates) {
         // update queueName
-        Iterable<Candidate> updateCandidates = FluentIterable.from(candidates).transform(new Function<Candidate, Candidate>() {
-            @Override
-            public Candidate apply(Candidate candidate) {
-                try {
-                    long ttl = QueueName.DEFAULT_LOCAL_CACHE_TTL;
-                    candidate.setCandidateQueueName(QueueName.fromString(candidate.getCandidateKey(), ttl));
-                } catch (Exception e) {
-                    candidate.setCandidateQueueName(null);
-                    LOGGER.error("error parse QueueName [{}]", candidate.getCandidateKey());
-                }
-                return candidate;
+        Iterable<Candidate> updateCandidates = FluentIterable.from(candidates).transform(candidate -> {
+            try {
+                long ttl = QueueName.DEFAULT_LOCAL_CACHE_TTL;
+                candidate.setCandidateQueueName(QueueName.fromString(candidate.getCandidateKey(), ttl));
+            } catch (Exception e) {
+                candidate.setCandidateQueueName(null);
+                LOGGER.error("error parse QueueName [{}]", candidate.getCandidateKey());
             }
+            return candidate;
         });
 
         // parse queues
-        Iterable<QueueName> queueNames = FluentIterable.from(updateCandidates).transform(new Function<Candidate, QueueName>() {
-            @Override
-            public QueueName apply(Candidate candidate) {
-                return candidate.getCandidateQueueName();
-            }
-        });
+        Iterable<QueueName> queueNames = FluentIterable.from(updateCandidates).transform(candidate -> candidate.getCandidateQueueName());
 
         // parallel load queues, redis 或者缓存获取index
         Map<QueueName, Queue<Video>> queues = Maps.newConcurrentMap();
@@ -144,12 +127,9 @@ public class BaseRecaller<Video> {
 
         // load from redis
         List<Callable<Map<Candidate, Queue<Video>>>> fetchQueueCalls = Lists.newArrayList();
-        fetchQueueCalls.add(new Callable<Map<Candidate, Queue<Video>>>() {
-            @Override
-            public Map<Candidate, Queue<Video>> call() throws Exception {
-                boolean isFromRedis = true;
-                return obtainQueue(recallCandidates, requestData, user, isFromRedis);
-            }
+        fetchQueueCalls.add(() -> {
+            boolean isFromRedis = true;
+            return obtainQueue(recallCandidates, requestData, user, isFromRedis);
         });
 
         // 多线程执行load

+ 15 - 16
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/recaller/provider/RedisBackedQueue.java

@@ -1,7 +1,6 @@
 package com.tzld.piaoquan.recommend.server.framework.recaller.provider;
 
 
-import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Iterables;
@@ -10,17 +9,20 @@ import com.tzld.piaoquan.recommend.server.framework.candidiate.QueueName;
 import com.tzld.piaoquan.recommend.server.framework.utils.FixedThreadPoolHelper;
 import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
 import com.tzld.piaoquan.recommend.server.model.Video;
-import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.*;
 
 
+@Service
 public class RedisBackedQueue implements QueueProvider<Video> {
     private static final Logger logger = LoggerFactory.getLogger(RedisBackedQueue.class);
     private static final int DEFAULT_TTL = 7 * 24 * 60 * 60;
@@ -127,20 +129,17 @@ public class RedisBackedQueue implements QueueProvider<Video> {
         final List<Callable<Map<QueueName, Queue<Video>>>> callables =
                 new ArrayList<Callable<Map<QueueName, Queue<Video>>>>();
         for (final List<QueueName> queueNames : namesIterable) {
-            callables.add(new Callable<Map<QueueName, Queue<Video>>>() {
-                @Override
-                public Map<QueueName, Queue<Video>> call() {
-                    Map<QueueName, Queue<Video>> result = new HashMap<QueueName, Queue<Video>>();
-                    for (final QueueName name : queueNames) {
-                        try {
-                            Queue queue = load(name);
-                            result.put(name, queue);
-                        } catch (Exception e) {
-                            logger.error("load queue error [{}] [{}]", name, ExceptionUtils.getFullStackTrace(e));
-                        }
+            callables.add(() -> {
+                Map<QueueName, Queue<Video>> result = new HashMap<>();
+                for (final QueueName name : queueNames) {
+                    try {
+                        Queue queue = load(name);
+                        result.put(name, queue);
+                    } catch (Exception e) {
+                        logger.error("load queue error [{}] [{}]", name, ExceptionUtils.getFullStackTrace(e));
                     }
-                    return result;
                 }
+                return result;
             });
         }
 

+ 4 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/framework/utils/RedisSmartClient.java

@@ -2,8 +2,10 @@ package com.tzld.piaoquan.recommend.server.framework.utils;
 
 
 import com.google.common.base.Function;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Service;
 import redis.clients.jedis.JedisCluster;
 import redis.clients.jedis.Tuple;
 
@@ -15,10 +17,12 @@ import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+@Service
 public class RedisSmartClient {
 
     private final static ExecutorService executorService = Executors.newCachedThreadPool();
     @Qualifier("redisTemplate")
+    @Autowired
     protected RedisTemplate<String, String> redisTemplate;
 
 

+ 9 - 6
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/FlowPoolRecommendPipeline.java

@@ -9,28 +9,32 @@ import com.tzld.piaoquan.recommend.server.framework.merger.SimilarityUtils;
 import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueue;
 import com.tzld.piaoquan.recommend.server.framework.recaller.BaseRecaller;
 import com.tzld.piaoquan.recommend.server.framework.recaller.provider.RedisBackedQueue;
-import com.tzld.piaoquan.recommend.server.framework.score.ScorerPipeline;
 import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionExtractorPipeline;
 import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionExtractorUtils;
 import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-
+@Service
 public class FlowPoolRecommendPipeline {
 
     public static final String MERGE_CONF = "flow_merge_config.conf";
 
     public static final String PREFIX = "";
 
+    @Resource
+    private RedisSmartClient client;
+
     private List<RankItem> feedByRec(final RecommendRequest requestData,
-                                       final int requestIndex,
-                                       final User userInfo) {
-        int recallNum =  200;
+                                     final int requestIndex,
+                                     final User userInfo) {
+        int recallNum = 200;
 
         // Step 1: Attention extraction
         long timestamp = System.currentTimeMillis();
@@ -47,7 +51,6 @@ public class FlowPoolRecommendPipeline {
 
 
         // Step 4: Recalling & Basic Scoring
-        RedisSmartClient client = new RedisSmartClient();
         RedisBackedQueue queueProvider = new RedisBackedQueue(client, 1000L);
 
 

+ 7 - 4
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/TopRecommendPipeline.java

@@ -14,7 +14,9 @@ import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionE
 import com.tzld.piaoquan.recommend.server.framework.userattention.UserAttentionExtractorUtils;
 import com.tzld.piaoquan.recommend.server.framework.utils.RedisSmartClient;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -22,7 +24,7 @@ import java.util.Map;
 
 import static com.tzld.piaoquan.recommend.server.service.score.ScorerUtils.getScorerPipeline;
 
-
+@Service
 public class TopRecommendPipeline {
 
     public static final String FILTER_CONF = "filter_config.conf";
@@ -30,6 +32,8 @@ public class TopRecommendPipeline {
 
     public static final String PREFIX = "";
 
+    @Resource
+    private RedisSmartClient client;
 
     public List<RankItem> feedByRec(final RecommendRequest requestData,
                                     final int requestIndex,
@@ -38,8 +42,8 @@ public class TopRecommendPipeline {
 
         // Step 1: Attention extraction
         long timestamp = System.currentTimeMillis();
-        UserAttentionExtractorPipeline attentionExtractorPipeline = UserAttentionExtractorUtils.getAtttentionPipeline(UserAttentionExtractorUtils.BASE_CONF);
-        attentionExtractorPipeline.extractAttention(requestData, userInfo);
+//        UserAttentionExtractorPipeline attentionExtractorPipeline = UserAttentionExtractorUtils.getAtttentionPipeline(UserAttentionExtractorUtils.BASE_CONF);
+//        attentionExtractorPipeline.extractAttention(requestData, userInfo);
 
         // Step 2: create top queue
         StrategyQueue topQueue = MergeUtils.createTopQueue(MERGE_CONF, "top-queue");
@@ -51,7 +55,6 @@ public class TopRecommendPipeline {
 
 
         // Step 4: Recalling & Basic Scoring
-        RedisSmartClient client = new RedisSmartClient();
         RedisBackedQueue queueProvider = new RedisBackedQueue(client, 1000L);
 
         BaseRecaller recaller = new BaseRecaller(queueProvider);

+ 25 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/implement/candidate/HotCandidateQueue.java

@@ -0,0 +1,25 @@
+package com.tzld.piaoquan.recommend.server.implement.candidate;
+
+import com.tzld.piaoquan.recommend.server.framework.candidiate.Candidate;
+import com.tzld.piaoquan.recommend.server.framework.candidiate.IndexCandidateQueue;
+import com.tzld.piaoquan.recommend.server.framework.common.User;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueueConfig;
+import com.tzld.piaoquan.recommend.server.framework.merger.StrategyQueueInfo;
+import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
+
+import java.util.Map;
+
+/**
+ * @author sunxy
+ */
+public class HotCandidateQueue extends IndexCandidateQueue {
+
+    public HotCandidateQueue(StrategyQueueInfo strategyQueueInfo, StrategyQueueConfig strategyQueueConfig) {
+        super(strategyQueueInfo, strategyQueueConfig);
+    }
+
+    @Override
+    public int addCandidateKey(Map<String, Candidate> candidates, int recallNum, User user, RecommendRequest requestData, int requestIndex, int expId) {
+        return 0;
+    }
+}

+ 8 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/web/RecommendV2Controller.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.tzld.piaoquan.recommend.server.client.ProtobufUtils;
 import com.tzld.piaoquan.recommend.server.common.base.RankItem;
+import com.tzld.piaoquan.recommend.server.framework.common.User;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendRequest;
 import com.tzld.piaoquan.recommend.server.gen.recommend.RecommendResponse;
 import com.tzld.piaoquan.recommend.server.implement.TopRecommendPipeline;
@@ -15,6 +16,7 @@ import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
+import javax.annotation.Resource;
 import java.util.List;
 
 /**
@@ -26,6 +28,9 @@ public class RecommendV2Controller {
     @Autowired
     private RecommendService recommendService;
 
+    @Resource
+    TopRecommendPipeline topRecommendPipeline;
+
     @RequestMapping("/homepage/recommend/v2")
     public String homepageRecommend(@RequestBody RecommendRequest httpRequest) {
         MDC.put("appType", String.valueOf(httpRequest.getAppType()));
@@ -57,8 +62,9 @@ public class RecommendV2Controller {
     public String feedByRec(@RequestBody RecommendRequest httpRequest) {
         MDC.put("appType", String.valueOf(httpRequest.getAppType()));
 
-        TopRecommendPipeline topRecommendPipeline = new TopRecommendPipeline();
-        List<RankItem> rankItems = topRecommendPipeline.feedByRec(httpRequest, 0, null);
+        User user = new User();
+        user.setRegion(httpRequest.getCityCode());
+        List<RankItem> rankItems = topRecommendPipeline.feedByRec(httpRequest, 0, user);
         String result = "";
         try {
             result = JSONObject.toJSONString(rankItems);

+ 9 - 1
recommend-server-service/src/main/resources/merge_config.conf

@@ -63,7 +63,15 @@ rule-config = {
   // 精选队列
   hot-queue = {
     merge-rule = {
-      hot-index = {
+      global1h-index = {
+        recall-percentage = 0.5
+        min-merge-num = 1
+      }
+      region1h-index = {
+        recall-percentage = 0.5
+        min-merge-num = 1
+      }
+      region24h-index = {
         recall-percentage = 0.5
         min-merge-num = 1
       }