Pārlūkot izejas kodu

ADD:新内容召回(基于流量池)

sunxy 1 gadu atpakaļ
vecāks
revīzija
66668478b1

+ 1 - 1
recommend-server-service/pom.xml

@@ -142,7 +142,7 @@
         <dependency>
             <groupId>com.tzld.piaoquan</groupId>
             <artifactId>recommend-feature-client</artifactId>
-            <version>1.0.1</version>
+            <version>1.1.16</version>
         </dependency>
         <dependency>
             <groupId>com.google.protobuf</groupId>

+ 4 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/Constant.java

@@ -11,4 +11,8 @@ public class Constant {
      * traceID
      */
     public static final String LOG_TRACE_ID = "logTraceId";
+    /**
+     * 流量池头部视频redis key
+     */
+    public static final String FLOW_POOL_RECENT_TOP_VIDEO_REDIS_KEY = "flowpool:recent:top:video";
 }

+ 49 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/SyncFlowPoolRecentTopVideoJob.java

@@ -0,0 +1,49 @@
+package com.tzld.piaoquan.recommend.server.service;
+
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.recommend.server.common.base.Constant;
+import com.tzld.piaoquan.recommend.server.util.ODPSManager;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author sunxy
+ */
+@Component
+public class SyncFlowPoolRecentTopVideoJob {
+
+    @Resource
+    private ODPSManager odpsManager;
+
+    @Resource
+    private RedisTemplate<String, Object> redisTemplate;
+
+    public void syncFlowPoolRecentTopVideo() {
+        // 执行 ODPS Sql,获取数据
+        List<Long> videoIds = queryTopVideoIdsByLast6Hour(100);
+        if (CollectionUtils.isEmpty(videoIds)) {
+            return;
+        }
+        // 将数据写入 Redis
+        redisTemplate.opsForValue().set(Constant.FLOW_POOL_RECENT_TOP_VIDEO_REDIS_KEY,
+                JSONObject.toJSONString(videoIds));
+    }
+
+    private List<Long> queryTopVideoIdsByLast6Hour(Integer topNum) {
+        String sql = "SELECT videoid FROM loghubods.flow_pool_video_info_per_hour " +
+                "ORDER BY 1d_return_cnt DESC LIMIT " + topNum + ";";
+        List<Record> records = odpsManager.query(sql);
+        if (records == null || records.size() == 0) {
+            return null;
+        }
+        return records.stream().map(record -> record.getBigint(0)).collect(Collectors.toList());
+    }
+
+
+}

+ 2 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -131,6 +131,8 @@ public class RecallService implements ApplicationContextAware {
                 case "60104": // 去掉sim的对比实验
                     strategies.add(strategyMap.get(ReturnVideoRecallStrategy.class.getSimpleName()));
                     break;
+                case "60110": // 新内容的召回(流量池的Top内容)
+                    strategies.add(strategyMap.get(FlowPoolLastDayTopRecallStrategy.class.getSimpleName()));
                 default:
                     // todo 做兜底吗?
                     break;

+ 91 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolTopRecallStrategy.java

@@ -0,0 +1,91 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.alibaba.fastjson.JSONObject;
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.RedisTemplate;
+
+import javax.annotation.Resource;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author sunxy
+ */
+@Slf4j
+public abstract class AbstractFlowPoolTopRecallStrategy implements RecallStrategy {
+
+    @Resource
+    protected RedisTemplate<String, Object> redisTemplate;
+
+    @Value("${flow.pool.recent.top.video.daily.time.range:}")
+    private String timeRangeJson;
+
+    @Override
+    public List<Video> recall(RecallParam param) {
+        boolean checkIfInTimeRange = checkIfInTimeRange();
+        if (!checkIfInTimeRange) {
+            return Collections.emptyList();
+        }
+        String key = recallKey(param);
+        if (StringUtils.isBlank(key)) {
+            return Collections.emptyList();
+        }
+        Object result = redisTemplate.opsForValue().get(key);
+        if (result == null) {
+            return Collections.emptyList();
+        }
+        try {
+            List<Long> videoIdList = JSONObject.parseArray(result.toString(), Long.class);
+            return videoIdList.stream().map(vid -> {
+                Video recallData = new Video();
+                recallData.setVideoId(vid);
+                recallData.setAbCode(param.getAbCode());
+                recallData.setPushFrom(pushFrom());
+                return recallData;
+            }).limit(5).collect(Collectors.toList());
+        } catch (Exception e) {
+            log.error("recall error, key={}, result={}", key, result, e);
+        }
+        return Collections.emptyList();
+    }
+
+    protected boolean checkIfInTimeRange() {
+        if (StringUtils.isBlank(timeRangeJson)) {
+            return false;
+        }
+        try {
+            List<String> timeRangeStrList = JSONObject.parseArray(timeRangeJson, String.class);
+            if (CollectionUtils.isEmpty(timeRangeStrList)) {
+                return false;
+            }
+            for (String timeRangeStr : timeRangeStrList) {
+                String[] timeRange = timeRangeStr.split("~");
+                if (timeRange.length != 2) {
+                    continue;
+                }
+                // 4:00~15:00
+                String startTime = timeRange[0];
+                String endTime = timeRange[1];
+                String nowTime = DateFormatUtils.format(new Date(), "HH:mm");
+                if (nowTime.compareTo(startTime) >= 0 && nowTime.compareTo(endTime) <= 0) {
+                    return true;
+                }
+            }
+
+        } catch (Exception e) {
+            log.error("checkIfInTimeRange error, timeRangeJson={}", timeRangeJson, e);
+        }
+        return false;
+    }
+
+    protected abstract String recallKey(RecallParam param);
+}

+ 24 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolLastDayTopRecallStrategy.java

@@ -0,0 +1,24 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.tzld.piaoquan.recommend.server.common.base.Constant;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author sunxy
+ */
+@Service
+public class FlowPoolLastDayTopRecallStrategy extends AbstractFlowPoolTopRecallStrategy {
+
+    private static final String PUSH_FORM = "flow_pool_top_video_recall";
+
+    @Override
+    public String pushFrom() {
+        return PUSH_FORM;
+    }
+
+    @Override
+    public String recallKey(RecallParam param) {
+        return Constant.FLOW_POOL_RECENT_TOP_VIDEO_REDIS_KEY;
+    }
+}

+ 70 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/util/ODPSManager.java

@@ -0,0 +1,70 @@
+package com.tzld.piaoquan.recommend.server.util;
+
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.task.SQLTask;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+/**
+ * @author sunxy
+ */
+@Slf4j
+@Component
+public class ODPSManager {
+    private final static String ACCESSID = "LTAIWYUujJAm7CbH";
+    private final static String ACCESSKEY = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+    private final static String ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api";
+
+    public List<Record> query(String sql) {
+        Account account = new AliyunAccount(ACCESSID, ACCESSKEY);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(ENDPOINT);
+        odps.setDefaultProject("loghubods");
+        Instance i;
+        try {
+            i = SQLTask.run(odps, sql);
+            i.waitForSuccess();
+            List<Record> records = SQLTask.getResultByInstanceTunnel(i);
+            if (Objects.nonNull(records) && records.size() != 0) {
+                return records;
+            }
+        } catch (Exception e) {
+            log.error("odps query error", e);
+        }
+        return Collections.emptyList();
+    }
+
+    public static void main(String[] args) {
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(new Date());
+        cal.add(Calendar.DATE, -1);
+        Date date2 = cal.getTime();
+        SimpleDateFormat format2 = new SimpleDateFormat("yyyyMMdd");
+
+        Account account = new AliyunAccount(ACCESSID, ACCESSKEY);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(ENDPOINT);
+        odps.setDefaultProject("loghubods");
+        String sql = "select * from yesterday_return_top1000 where dt='" + format2.format(date2) + "';";
+        Instance i;
+        try {
+            i = SQLTask.run(odps, sql);
+            i.waitForSuccess();
+            List<Record> records = SQLTask.getResult(i);
+            for (Record r : records) {
+                System.out.println(Integer.parseInt(r.get(0).toString()));
+                System.out.println(r.get(1).toString());
+            }
+        } catch (OdpsException e) {
+            e.printStackTrace();
+        }
+    }
+}