zhangbo пре 1 година
родитељ
комит
aff75f4baf

+ 1 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/RankItem.java

@@ -12,7 +12,7 @@ import java.util.Map;
 public class RankItem implements Comparable<RankItem> {
 
     // featureMap中保存所有的特征
-    public Map<String, Object> featureMap = new HashMap<>();
+    public Map<String, String> featureMap = new HashMap<>();
     public long videoId;
     private double score; // 记录最终的score
     private Video video;

+ 80 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/OfflineVlogFeatureGroup.java

@@ -0,0 +1,80 @@
+package com.tzld.piaoquan.recommend.server.service.rank.strategy;
+
+public enum OfflineVlogFeatureGroup {
+
+    machineinfo_brand,
+    machineinfo_model,
+    machineinfo_platform,
+    machineinfo_system,
+    u_1day_exp_cnt,
+    u_1day_click_cnt,
+    u_1day_share_cnt,
+    u_1day_return_cnt,
+    u_ctr_1day,
+    u_str_1day,
+    u_rov_1day,
+    u_ros_1day,
+
+    u_3day_exp_cnt,
+    u_3day_click_cnt,
+    u_3day_share_cnt,
+    u_3day_return_cnt,
+    u_ctr_3day,
+    u_str_3day,
+    u_rov_3day,
+    u_ros_3day,
+
+
+    total_time,
+
+    play_count_total,
+    i_1day_exp_cnt,
+    i_1day_click_cnt,
+    i_1day_share_cnt,
+    i_1day_return_cnt,
+    i_ctr_1day,
+    i_str_1day,
+    i_rov_1day,
+    i_ros_1day,
+
+    i_3day_exp_cnt,
+    i_3day_click_cnt,
+    i_3day_share_cnt,
+    i_3day_return_cnt,
+    i_ctr_3day,
+    i_str_3day,
+    i_rov_3day,
+    i_ros_3day,
+
+    ctx_week,
+    ctx_hour,
+    ctx_region,
+    ctx_city,
+    ;
+
+
+    private final byte[] idBytes;
+    private final byte[] nameBytes;
+
+    OfflineVlogFeatureGroup() {
+        this.idBytes = String.valueOf(ordinal()).getBytes();
+        this.nameBytes = name().toLowerCase().getBytes();
+    }
+
+    public final int getId() {
+        return ordinal();
+    }
+
+    public final String getGroupName() {
+        return name().toLowerCase();
+    }
+
+    public final byte[] getGroupNameBytes() {
+        return getGroupName().getBytes();
+    }
+
+    public final byte[] getIdBytes() {
+        return idBytes;
+    }
+
+}

+ 92 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/OfflineVlogShareLRFeatureExtractor.java

@@ -0,0 +1,92 @@
+package com.tzld.piaoquan.recommend.server.service.rank.strategy;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.tzld.piaoquan.recommend.feature.domain.video.feature.BytesGroup;
+import com.tzld.piaoquan.recommend.feature.domain.video.feature.BytesUtils;
+import com.tzld.piaoquan.recommend.feature.model.sample.BaseFeature;
+import com.tzld.piaoquan.recommend.feature.model.sample.FeatureGroup;
+import com.tzld.piaoquan.recommend.feature.model.sample.LRSamples;
+import java.util.Map;
+import java.util.*;
+import com.tzld.piaoquan.recommend.feature.model.sample.GroupedFeature;
+
+public class OfflineVlogShareLRFeatureExtractor {
+
+    public ListMultimap<FeatureGroup, BaseFeature> featureMap = ArrayListMultimap.create();
+
+    final private BytesUtils utils;
+    final private int groupCount = OfflineVlogFeatureGroup.values().length;
+    public OfflineVlogShareLRFeatureExtractor() {
+        BytesGroup[] groups = new BytesGroup[OfflineVlogFeatureGroup.values().length];
+        OfflineVlogFeatureGroup[] var2 = OfflineVlogFeatureGroup.values();
+        int var3 = var2.length;
+
+        for(int var4 = 0; var4 < var3; ++var4) {
+            OfflineVlogFeatureGroup g = var2[var4];
+            groups[g.ordinal()] = new BytesGroup(g.ordinal(), g.getGroupName(), g.getGroupNameBytes());
+        }
+        this.utils = new BytesUtils(groups);
+    }
+    public void makeFeature(Map<String, Object> maps){
+        for (Map.Entry<String, Object> entry : maps.entrySet()){
+            OfflineVlogFeatureGroup ovf = OfflineVlogFeatureGroup.valueOf(entry.getKey());
+            Object value = entry.getValue();
+            if (value instanceof String){
+                this.makeFea(ovf, ((String)value).getBytes());
+            }else if (value instanceof Double){
+                this.makeFea(ovf, String.valueOf((Double)value).getBytes());
+            }else if (value instanceof Integer){
+                //todo
+            }else{
+                //todo
+                this.makeFea(ovf, ((String)value).getBytes());
+            }
+        }
+    }
+
+    private FeatureGroup makeGroup(OfflineVlogFeatureGroup group) {
+        FeatureGroup.Builder g = FeatureGroup.newBuilder();
+        g.setType("1");
+        g.setName(group.getGroupName());
+        g.setId(group.ordinal());
+        return g.build();
+    }
+    void makeFea(OfflineVlogFeatureGroup group, byte[] value) {
+        FeatureGroup featureGroup = this.makeGroup(group);
+        BaseFeature feature = this.utils.makeFea(group.ordinal(), value);
+        this.featureMap.put(featureGroup, feature);
+    }
+
+    public synchronized LRSamples single(Map<String, byte[]> userBytesFeature,
+                                         Map<String, byte[]> videoBytesFeature,
+                                         Map<String, byte[]> sceneFeatureMapByte) {
+        featureMap.clear();
+        // extract features todo zhangbo
+        for (Map.Entry<String, byte[]> entry : userBytesFeature.entrySet()){
+            makeFea(OfflineVlogFeatureGroup.valueOf(entry.getKey()), entry.getValue());
+        }
+        for (Map.Entry<String, byte[]> entry : videoBytesFeature.entrySet()){
+            makeFea(OfflineVlogFeatureGroup.valueOf(entry.getKey()), entry.getValue());
+        }
+        for (Map.Entry<String, byte[]> entry : sceneFeatureMapByte.entrySet()){
+            makeFea(OfflineVlogFeatureGroup.valueOf(entry.getKey()), entry.getValue());
+        }
+
+        LRSamples.Builder lr = LRSamples.newBuilder();
+        lr.setGroupNum(groupCount);
+        List<FeatureGroup> keys = new ArrayList<>(featureMap.keySet());
+        int count = 0;
+        for(FeatureGroup group : keys) {
+            List<BaseFeature> fea = featureMap.get(group);
+            GroupedFeature.Builder gf = GroupedFeature.newBuilder();
+            gf.setGroup(group);
+            gf.setCount(fea.size());
+            gf.addAllFeatures(fea);
+            count += fea.size();
+            lr.addFeatures(gf);
+        }
+        lr.setCount(count);
+        return lr.build();
+    }
+}

+ 36 - 4
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/strategy/RankStrategy4RankModel.java

@@ -95,14 +95,14 @@ public class RankStrategy4RankModel extends RankService {
             return videos;
         }
 
-        Map<String, Object> userFeatureMap = new HashMap<>();
+        Map<String, String> userFeatureMap = new HashMap<>();
         if (param.getMid() != null && !param.getMid().isEmpty()){
             String midKey = "user_info_4video_" + param.getMid();
             String userFeatureStr = this.redisTemplate.opsForValue().get(midKey);
             if (userFeatureStr != null){
                 try{
                     userFeatureMap = JSONUtils.fromJson(userFeatureStr,
-                            new TypeToken<Map<String, Object>>() {},
+                            new TypeToken<Map<String, String>>() {},
                             userFeatureMap);
                 }catch (Exception e){
                     log.error(String.format("parse user json is wrong in {} with {}",
@@ -110,8 +110,32 @@ public class RankStrategy4RankModel extends RankService {
                 }
             }
         }
+        final Set<String> userFeatureSet = new HashSet<>(Arrays.asList(
+                "machineinfo_brand", "machineinfo_model", "machineinfo_platform", "machineinfo_system",
+                "u_1day_exp_cnt", "u_1day_click_cnt", "u_1day_share_cnt", "u_1day_return_cnt",
+                "u_ctr_1day","u_str_1day","u_rov_1day","u_ros_1day",
+                "u_3day_exp_cnt","u_3day_click_cnt","u_3day_share_cnt","u_3day_return_cnt",
+                "u_ctr_3day","u_str_3day","u_rov_3day","u_ros_3day"
+        ));
+        Iterator<Map.Entry<String, String>> iterator = userFeatureMap.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<String, String> entry = iterator.next();
+            if (!userFeatureSet.contains(entry.getKey())) {
+                // 删除键值对
+                iterator.remove();
+            }
+        }
+
         log.info("userFeature in model = {}", JSONUtils.toJson(userFeatureMap));
 
+        final Set<String> itemFeatureSet = new HashSet<>(Arrays.asList(
+                "total_time", "play_count_total",
+                "i_1day_exp_cnt", "i_1day_click_cnt", "i_1day_share_cnt", "i_1day_return_cnt",
+                "i_ctr_1day", "i_str_1day", "i_rov_1day", "i_ros_1day",
+                "i_3day_exp_cnt", "i_3day_click_cnt", "i_3day_share_cnt", "i_3day_return_cnt",
+                "i_ctr_3day", "i_str_3day", "i_rov_3day", "i_ros_3day"
+        ));
+
         List<RankItem> rankItems = CommonCollectionUtils.toList(videos, RankItem::new);
         List<Long> videoIds = CommonCollectionUtils.toListDistinct(videos, Video::getVideoId);
         List<String> videoFeatureKeys = videoIds.stream().map(r-> "video_info_" + r)
@@ -120,12 +144,20 @@ public class RankStrategy4RankModel extends RankService {
         if (videoFeatures != null){
             for (int i=0; i<videoFeatures.size(); ++i){
                 String vF = videoFeatures.get(i);
-                Map<String, Object> vfMap = new HashMap<>();
+                Map<String, String> vfMap = new HashMap<>();
                 if (vF == null){
                     continue;
                 }
                 try{
-                    vfMap = JSONUtils.fromJson(vF, new TypeToken<Map<String, Object>>() {}, vfMap);
+                    vfMap = JSONUtils.fromJson(vF, new TypeToken<Map<String, String>>() {}, vfMap);
+                    Iterator<Map.Entry<String, String>> iteratorIn = vfMap.entrySet().iterator();
+                    while (iteratorIn.hasNext()) {
+                        Map.Entry<String, String> entry = iteratorIn.next();
+                        if (!itemFeatureSet.contains(entry.getKey())) {
+                            // 删除键值对
+                            iteratorIn.remove();
+                        }
+                    }
                     rankItems.get(i).setFeatureMap(vfMap);
                 }catch (Exception e){
                     log.error(String.format("parse video json is wrong in {} with {}",

+ 5 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/AbstractScorer.java

@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 
 public abstract class AbstractScorer {
@@ -63,4 +64,8 @@ public abstract class AbstractScorer {
                                            final UserFeature userFeature,
                                            final List<RankItem> rankItems);
 
+    public abstract List<RankItem> scoring(final Map<String, String> sceneFeatureMap,
+                                           final Map<String, String> userFeatureMap,
+                                           final List<RankItem> rankItems);
+
 }

+ 76 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/ScorerPipeline.java

@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.*;
 
 
@@ -106,4 +107,79 @@ public class ScorerPipeline {
 
         return items;
     }
+
+    public List<RankItem> scoring(final Map<String, String> sceneFeatureMap,
+                                  final Map<String, String> userFeatureMap,
+                                  final List<RankItem> rankItems) {
+
+        if (CollectionUtils.isEmpty(scorers)) {
+            log.error("scorers is empty");
+            return rankItems;
+        }
+        List<RankItem> items = rankItems;
+
+        for (final AbstractScorer scorer : scorers) {
+            if (!scorer.isEnable()) {
+                continue;
+            }
+
+            final int beforeSize = rankItems.size();
+            final long startTime = System.currentTimeMillis();
+
+            String fullScorerName = scorer.getScorerConfigInfo().getScorerName();
+            String[] scorerNames = fullScorerName.split("\\.");
+            final String scorerName = scorerNames.length > 0 ? scorerNames[scorerNames.length - 1] : fullScorerName;
+
+            final List<RankItem> scoreRankerItems = items;
+            Callable<List<RankItem>> callable = () -> scorer.scoring(sceneFeatureMap, userFeatureMap, scoreRankerItems);
+
+            // execute score use thread to protected score worst time
+            List<RankItem> scoredItems = new ArrayList<RankItem>();
+            try {
+                List<Future<List<RankItem>>> futures = executorService.invokeAll(Arrays.asList(callable), SCORE_TIME_OUT, TimeUnit.MILLISECONDS);
+                for (Future<List<RankItem>> future : futures) {
+                    try {
+                        if (future.isDone() && !future.isCancelled() && future.get() != null) {
+                            scoredItems.addAll(future.get());
+                        } else {
+                            LOGGER.error("score task is cancelled, scorename [{}] fail items [{}]",
+                                    new Object[]{scorerName, scoreRankerItems.size()});
+                        }
+                    } catch (Exception e) {
+                        LOGGER.error("thread pool exception scorename [{}], exception [{}]",
+                                new Object[]{scorerName, ExceptionUtils.getFullStackTrace(e)});
+                    }
+                }
+            } catch (Exception e) {
+                LOGGER.error("thread pool exception uid [{}] scorename [{}], exception [{}]",
+                        new Object[]{scorerName, ExceptionUtils.getFullStackTrace(e)});
+            }
+
+            //  变更item
+            if (CollectionUtils.isNotEmpty(scoreRankerItems)) {
+                items = scoreRankerItems;
+            } else {
+                items = new ArrayList<>(items);
+            }
+
+            int position = 0;
+            for (RankItem item : items) {
+                item.getRankerIndex().put(scorerName, position++);
+                item.getRankerScore().put(scorerName, item.getScore());
+            }
+
+            //
+            long spentTime = System.currentTimeMillis() - startTime;
+            LOGGER.debug("after scorer [{}], spentTime [{}], before size [{}], remaining size [{}]",
+                    new Object[]{scorerName, spentTime, beforeSize, scoreRankerItems.size()});
+        }
+
+        int position = 0;
+        for (RankItem item : items) {
+            item.getRankerIndex().put("finalScore", position++);
+            item.getRankerScore().put("finalScore", item.getScore());
+        }
+
+        return items;
+    }
 }

+ 142 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/score/VlogShareLRScorer.java

@@ -5,6 +5,7 @@ import com.tzld.piaoquan.recommend.feature.domain.video.base.*;
 import com.tzld.piaoquan.recommend.server.common.base.*;
 import com.tzld.piaoquan.recommend.feature.model.sample.*;
 import com.tzld.piaoquan.recommend.feature.domain.video.feature.VlogShareLRFeatureExtractor;
+import com.tzld.piaoquan.recommend.server.service.rank.strategy.OfflineVlogShareLRFeatureExtractor;
 import com.tzld.piaoquan.recommend.server.service.score.model.LRModel;
 
 import org.apache.commons.collections4.CollectionUtils;
@@ -12,9 +13,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.*;
 
 
@@ -187,4 +186,144 @@ public class VlogShareLRScorer extends BaseLRModelScorer {
         }
         LOGGER.debug("Ctr Score {}, Total: {}, Cancel: {}", new Object[]{requestContext.getRequest_id(), items.size(), cancel});
     }
+    @Override
+    public List<RankItem> scoring(final Map<String, String> sceneFeatureMap,
+                                           final Map<String, String> userFeatureMap,
+                                           final List<RankItem> rankItems){
+        if (CollectionUtils.isEmpty(rankItems)) {
+            return rankItems;
+        }
+
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+
+        List<RankItem> result = rankItems;
+        result = rankByJava(
+                sceneFeatureMap, userFeatureMap, rankItems
+        );
+
+        LOGGER.debug("ctr ranker time java items size={}, time={} ", result != null ? result.size() : 0,
+                System.currentTimeMillis() - startTime);
+
+        return result;
+    }
+
+    private List<RankItem> rankByJava(final Map<String, String> sceneFeatureMap,
+                                      final Map<String, String> userFeatureMap,
+                                      final List<RankItem> items) {
+        long startTime = System.currentTimeMillis();
+        LRModel model = (LRModel) this.getModel();
+        LOGGER.debug("model size: [{}]", model.getModelSize());
+        // userBytes
+        Map<String, byte[]> userFeatureMapByte = new HashMap<>();
+        for(Map.Entry<String, String> entry: userFeatureMap.entrySet()){
+            userFeatureMapByte.put(entry.getKey(), entry.getValue().getBytes());
+        }
+        //sceneBytes
+        Map<String, byte[]> sceneFeatureMapByte = new HashMap<>();
+        for(Map.Entry<String, String> entry: sceneFeatureMap.entrySet()){
+            sceneFeatureMapByte.put(entry.getKey(), entry.getValue().getBytes());
+        }
+
+        // 所有都参与打分,按照ctr排序
+        multipleCtrScore(items, userFeatureMapByte, sceneFeatureMapByte, model);
+
+        // debug log
+        if (LOGGER.isDebugEnabled()) {
+            for (int i = 0; i < items.size(); i++) {
+                LOGGER.debug("before enter feeds model predict ctr score [{}] [{}]", items.get(i), items.get(i));
+            }
+        }
+
+        Collections.sort(items);
+
+        LOGGER.debug("ctr ranker java execute time: [{}]", System.currentTimeMillis() - startTime);
+        LOGGER.debug("[ctr ranker time java] items size={}, cost={} ", items != null ? items.size() : 0,
+                System.currentTimeMillis() - startTime);
+        return items;
+    }
+
+    private void multipleCtrScore(final List<RankItem> items,
+                                  final Map<String, byte[]> userFeatureMapByte,
+                                  final Map<String, byte[]> sceneFeatureMapByte,
+                                  final LRModel model) {
+
+        List<Callable<Object>> calls = new ArrayList<Callable<Object>>();
+        for (int index = 0; index < items.size(); index++) {
+            final int fIndex = index;
+            items.get(fIndex).setScore(0.0);   //原始分为 cube中的粗打分,如果超时,为原始值存在问题, 需要置0
+            calls.add(new Callable<Object>() {
+                @Override
+                public Object call() throws Exception {
+                    try {
+                        calcScore(model, items.get(fIndex), userFeatureMapByte, sceneFeatureMapByte);
+                    } catch (Exception e) {
+                        LOGGER.error("ctr exception: [{}] [{}]", items.get(fIndex).videoId, ExceptionUtils.getFullStackTrace(e));
+                    }
+                    return new Object();
+                }
+            });
+        }
+
+        List<Future<Object>> futures = null;
+        try {
+            futures = executorService.invokeAll(calls, LOCAL_TIME_OUT, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            LOGGER.error("execute invoke fail: {}", ExceptionUtils.getFullStackTrace(e));
+        }
+
+        //等待所有请求的结果返回, 超时也返回
+        int cancel = 0;
+        if (futures != null) {
+            for (Future<Object> future : futures) {
+                try {
+                    if (!future.isDone() || future.isCancelled() || future.get() == null) {
+                        cancel++;
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.error("InterruptedException {},{}", ExceptionUtils.getFullStackTrace(e));
+                } catch (ExecutionException e) {
+                    LOGGER.error("ExecutionException {},{}", sceneFeatureMapByte.size(),
+                            ExceptionUtils.getFullStackTrace(e));
+                }
+            }
+        }
+        LOGGER.debug("Ctr Score {}, Total: {}, Cancel: {}", new Object[]{sceneFeatureMapByte.size(), items.size(), cancel});
+    }
+
+    public double calcScore(final LRModel lrModel,
+                            final RankItem item,
+                            final Map<String, byte[]> userFeatureMapByte,
+                            final Map<String, byte[]> sceneFeatureMapByte) {
+
+        LRSamples lrSamples = null;
+        OfflineVlogShareLRFeatureExtractor bytesFeatureExtractor;
+        bytesFeatureExtractor = new OfflineVlogShareLRFeatureExtractor();
+
+        try {
+
+            Map<String, byte[]> itemFeatureByte = new HashMap<>();
+            for (Map.Entry<String, String> entry: item.getFeatureMap().entrySet()){
+                itemFeatureByte.put(entry.getKey(), entry.getValue().getBytes());
+            }
+            lrSamples = bytesFeatureExtractor.single(userFeatureMapByte, itemFeatureByte, sceneFeatureMapByte);
+        } catch (Exception e) {
+            LOGGER.error("extract feature error for imei={}, doc={}, [{}]", new Object[]{"", item.getVideoId(),
+                    ExceptionUtils.getFullStackTrace(e)});
+        }
+
+
+        double pro = 0.0;
+        if (lrSamples != null && lrSamples.getFeaturesList() != null) {
+            try {
+                pro = lrModel.score(lrSamples);
+            } catch (Exception e) {
+                LOGGER.error("score error for doc={} exception={}", new Object[]{
+                        item.getVideoId(), ExceptionUtils.getFullStackTrace(e)});
+            }
+        }
+        item.setScore(pro);
+        return pro;
+    }
 }