Parcourir la source

Merge branch 'feature-20240118-sunxy-addNewContentCallback' of algorithm/recommend-server into master

增加一路新内容召回(基于流量池)
sunxiaoyi il y a 1 an
Parent
commit
3886014249

+ 5 - 1
recommend-server-service/pom.xml

@@ -149,7 +149,7 @@
         <dependency>
             <groupId>com.tzld.piaoquan</groupId>
             <artifactId>recommend-feature-client</artifactId>
-            <version>1.0.1</version>
+            <version>1.1.16</version>
         </dependency>
         <dependency>
             <groupId>com.google.protobuf</groupId>
@@ -186,6 +186,10 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.xuxueli</groupId>
+            <artifactId>xxl-job-core</artifactId>
+        </dependency>
     </dependencies>
 
 

+ 2 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/Application.java

@@ -21,7 +21,8 @@ import org.springframework.http.converter.protobuf.ProtobufHttpMessageConverter;
         "com.tzld.piaoquan.recommend.server.grpcservice",
         "com.tzld.piaoquan.recommend.server.remote",
         "com.tzld.piaoquan.recommend.server.config",
-        "com.tzld.piaoquan.recommend.server.web"
+        "com.tzld.piaoquan.recommend.server.web",
+        "com.tzld.piaoquan.recommend.server.xxl",
 })
 @EnableEurekaClient
 @EnableAspectJAutoProxy

+ 4 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/base/Constant.java

@@ -11,4 +11,8 @@ public class Constant {
      * traceID
      */
     public static final String LOG_TRACE_ID = "logTraceId";
+    /**
+     * 流量池头部视频redis key
+     */
+    public static final String FLOW_POOL_LAST_24_HOUR_VIDEO_REDIS_KEY = "flowpool:last:24:hour:video";
 }

+ 60 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/config/XxlJobConfig.java

@@ -0,0 +1,60 @@
+package com.tzld.piaoquan.recommend.server.config;
+
+import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * <p>
+ * xxl-job config
+ * </p>
+ *
+ * @author ZhouYang
+ * @date 2021/08/17
+ */
+@Slf4j
+@Configuration
+public class XxlJobConfig {
+
+    @Value("${xxl.job.admin.addresses}")
+    private String adminAddresses;
+
+    @Value("${xxl.job.accessToken}")
+    private String accessToken;
+
+    @Value("${xxl.job.executor.appname}")
+    private String appname;
+
+    @Value("${xxl.job.executor.address}")
+    private String address;
+
+    @Value("${xxl.job.executor.ip}")
+    private String ip;
+
+    @Value("${xxl.job.executor.port}")
+    private int port;
+
+    @Value("${xxl.job.executor.logpath}")
+    private String logPath;
+
+    @Value("${xxl.job.executor.logretentiondays}")
+    private int logRetentionDays;
+
+    @Bean
+    public XxlJobSpringExecutor xxlJobExecutor() {
+        log.info("xxl-job config init");
+        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
+        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
+        xxlJobSpringExecutor.setAppname(appname);
+        xxlJobSpringExecutor.setAddress(address);
+        xxlJobSpringExecutor.setIp(ip);
+        xxlJobSpringExecutor.setPort(port);
+        xxlJobSpringExecutor.setAccessToken(accessToken);
+        xxlJobSpringExecutor.setLogPath(logPath);
+        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
+        return xxlJobSpringExecutor;
+    }
+
+}

+ 76 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/odps/ODPSManager.java

@@ -0,0 +1,76 @@
+package com.tzld.piaoquan.recommend.server.service.odps;
+
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.task.SQLTask;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+
+/**
+ * @author sunxy
+ */
+@Slf4j
+@Component
+public class ODPSManager {
+    private final static String ACCESSID = "LTAIWYUujJAm7CbH";
+    private final static String ACCESSKEY = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
+    private final static String ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api";
+
+    public List<Record> query(String sql) {
+        Account account = new AliyunAccount(ACCESSID, ACCESSKEY);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(ENDPOINT);
+        odps.setDefaultProject("loghubods");
+        Instance i;
+        try {
+            i = SQLTask.run(odps, sql);
+            i.waitForSuccess();
+            List<Record> records = SQLTask.getResultByInstanceTunnel(i);
+            if (Objects.nonNull(records) && records.size() != 0) {
+                return records;
+            }
+        } catch (Exception e) {
+            log.error("odps query error", e);
+        }
+        return Collections.emptyList();
+    }
+
+    public String getNowHour() {
+        return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
+    }
+
+    public static void main(String[] args) {
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(new Date());
+        cal.add(Calendar.DATE, -1);
+        Date date2 = cal.getTime();
+        SimpleDateFormat format2 = new SimpleDateFormat("yyyyMMdd");
+
+        Account account = new AliyunAccount(ACCESSID, ACCESSKEY);
+        Odps odps = new Odps(account);
+        odps.setEndpoint(ENDPOINT);
+        odps.setDefaultProject("loghubods");
+        String sql = "select * from yesterday_return_top1000 where dt='" + format2.format(date2) + "';";
+        Instance i;
+        try {
+            i = SQLTask.run(odps, sql);
+            i.waitForSuccess();
+            List<Record> records = SQLTask.getResult(i);
+            for (Record r : records) {
+                System.out.println(Integer.parseInt(r.get(0).toString()));
+                System.out.println(r.get(1).toString());
+            }
+        } catch (OdpsException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 3 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/rank/RankService.java

@@ -129,7 +129,9 @@ public class RankService {
                 || param.getAbCode().equals("60103")
                 || param.getAbCode().equals("60104")
                 || param.getAbCode().equals("60105")
-                || param.getAbCode().equals("60107")) {
+                || param.getAbCode().equals("60107")
+                || param.getAbCode().equals("60110")
+        ) {
             // 地域召回要做截取,再做融合排序
             removeDuplicate(rovRecallRank);
             rovRecallRank = rovRecallRank.size() <= sizeReturn

+ 5 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/RecallService.java

@@ -132,6 +132,11 @@ public class RecallService implements ApplicationContextAware {
                 case "60104": // 去掉sim的对比实验
                     strategies.add(strategyMap.get(ReturnVideoRecallStrategy.class.getSimpleName()));
                     break;
+                case "60110": // 新内容的召回(流量池的Top内容)
+                    strategies.add(strategyMap.get(SimHotVideoRecallStrategy.class.getSimpleName()));
+                    strategies.add(strategyMap.get(ReturnVideoRecallStrategy.class.getSimpleName()));
+                    strategies.add(strategyMap.get(FlowPoolLastDayTopRecallStrategy.class.getSimpleName()));
+                    break;
                 default:
                     // todo 做兜底吗?
                     break;

+ 91 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/AbstractFlowPoolTopRecallStrategy.java

@@ -0,0 +1,91 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.alibaba.fastjson.JSONObject;
+import com.tzld.piaoquan.recommend.server.model.Video;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallStrategy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.DateFormatUtils;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.data.redis.core.RedisTemplate;
+
+import javax.annotation.Resource;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author sunxy
+ */
+@Slf4j
+public abstract class AbstractFlowPoolTopRecallStrategy implements RecallStrategy {
+
+    @Resource
+    protected RedisTemplate<String, String> redisTemplate;
+
+    @Value("${flow.pool.recent.top.video.daily.time.range:}")
+    private String timeRangeJson;
+
+    @Override
+    public List<Video> recall(RecallParam param) {
+        boolean checkIfInTimeRange = checkIfInTimeRange();
+        if (!checkIfInTimeRange) {
+            return Collections.emptyList();
+        }
+        String key = recallKey(param);
+        if (StringUtils.isBlank(key)) {
+            return Collections.emptyList();
+        }
+        Object result = redisTemplate.opsForValue().get(key);
+        if (result == null) {
+            return Collections.emptyList();
+        }
+        try {
+            List<Long> videoIdList = JSONObject.parseArray(result.toString(), Long.class);
+            return videoIdList.stream().map(vid -> {
+                Video recallData = new Video();
+                recallData.setVideoId(vid);
+                recallData.setAbCode(param.getAbCode());
+                recallData.setPushFrom(pushFrom());
+                return recallData;
+            }).limit(5).collect(Collectors.toList());
+        } catch (Exception e) {
+            log.error("recall error, key={}, result={}", key, result, e);
+        }
+        return Collections.emptyList();
+    }
+
+    protected boolean checkIfInTimeRange() {
+        if (StringUtils.isBlank(timeRangeJson)) {
+            return false;
+        }
+        try {
+            List<String> timeRangeStrList = JSONObject.parseArray(timeRangeJson, String.class);
+            if (CollectionUtils.isEmpty(timeRangeStrList)) {
+                return false;
+            }
+            for (String timeRangeStr : timeRangeStrList) {
+                String[] timeRange = timeRangeStr.split("~");
+                if (timeRange.length != 2) {
+                    continue;
+                }
+                // 4:00~15:00
+                String startTime = timeRange[0];
+                String endTime = timeRange[1];
+                String nowTime = DateFormatUtils.format(new Date(), "HH:mm");
+                if (nowTime.compareTo(startTime) >= 0 && nowTime.compareTo(endTime) <= 0) {
+                    return true;
+                }
+            }
+
+        } catch (Exception e) {
+            log.error("checkIfInTimeRange error, timeRangeJson={}", timeRangeJson, e);
+        }
+        return false;
+    }
+
+    protected abstract String recallKey(RecallParam param);
+}

+ 24 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/recall/strategy/FlowPoolLastDayTopRecallStrategy.java

@@ -0,0 +1,24 @@
+package com.tzld.piaoquan.recommend.server.service.recall.strategy;
+
+import com.tzld.piaoquan.recommend.server.common.base.Constant;
+import com.tzld.piaoquan.recommend.server.service.recall.RecallParam;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author sunxy
+ */
+@Service
+public class FlowPoolLastDayTopRecallStrategy extends AbstractFlowPoolTopRecallStrategy {
+
+    private static final String PUSH_FORM = "flow_pool_top_video_recall";
+
+    @Override
+    public String pushFrom() {
+        return PUSH_FORM;
+    }
+
+    @Override
+    public String recallKey(RecallParam param) {
+        return Constant.FLOW_POOL_LAST_24_HOUR_VIDEO_REDIS_KEY;
+    }
+}

+ 60 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/xxl/SyncFlowPoolRecentTopVideoJob.java

@@ -0,0 +1,60 @@
+package com.tzld.piaoquan.recommend.server.xxl;
+
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.recommend.server.common.base.Constant;
+import com.tzld.piaoquan.recommend.server.service.odps.ODPSManager;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import com.xxl.job.core.log.XxlJobLogger;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @author sunxy
+ */
+@Component
+@Slf4j
+public class SyncFlowPoolRecentTopVideoJob {
+
+    @Autowired
+    private ODPSManager odpsManager;
+
+    @Autowired
+    private RedisTemplate<String, String> redisTemplate;
+
+    @XxlJob("syncFlowPoolRecentTopVideo")
+    public ReturnT<String> syncFlowPoolRecentTopVideo(String params) {
+        // 执行 ODPS Sql,获取数据
+        List<Long> videoIds = queryTopVideoIdsByLast6Hour(100);
+        if (CollectionUtils.isEmpty(videoIds)) {
+            XxlJobLogger.log("syncFlowPoolRecentTopVideo, videoIds is empty");
+            return ReturnT.SUCCESS;
+        }
+        XxlJobLogger.log("syncFlowPoolRecentTopVideo, videoIds:{}", videoIds);
+        // 将数据写入 Redis
+        redisTemplate.opsForValue().set(Constant.FLOW_POOL_LAST_24_HOUR_VIDEO_REDIS_KEY,
+                JSONObject.toJSONString(videoIds), 2, TimeUnit.DAYS);
+        return ReturnT.SUCCESS;
+    }
+
+    private List<Long> queryTopVideoIdsByLast6Hour(Integer topNum) {
+        String sql = "SELECT videoid FROM loghubods.flow_pool_video_info_per_hour " +
+                "WHERE dt = '" + odpsManager.getNowHour() + "' " +
+                "ORDER BY 1d_return_cnt DESC LIMIT " + topNum + ";";
+        List<Record> records = odpsManager.query(sql);
+        if (records == null || records.size() == 0) {
+            return null;
+        }
+        return records.stream().map(record -> record.getBigint(0)).collect(Collectors.toList());
+    }
+
+
+}

+ 1 - 1
recommend-server-service/src/main/resources/application-dev.yml

@@ -28,7 +28,7 @@ spring:
 xxl:
   job:
     admin:
-      addresses: http://127.0.0.1/xxl-job-admin
+      addresses: http://xxl-job-test-internal.piaoquantv.com/xxl-job-admin
     accessToken:
     executor:
       appname: ${spring.application.name}