Browse Source

feat:添加定时同步热点宝流量池视频任务

zhaohaipeng 2 weeks ago
parent
commit
28f3045d89

+ 5 - 1
recommend-server-service/pom.xml

@@ -252,7 +252,7 @@
         <dependency>
             <groupId>ml.dmlc</groupId>
             <artifactId>xgboost4j-spark_2.12</artifactId>
-            <version>1.7.6</version>
+            <version>2.0.1</version>
             <exclusions>
                 <exclusion>
                     <artifactId>scala-library</artifactId>
@@ -270,6 +270,10 @@
             <artifactId>recommend-similarity</artifactId>
             <version>1.0.0</version>
         </dependency>
+        <dependency>
+            <groupId>org.springframework.cloud</groupId>
+            <artifactId>spring-cloud-starter-openfeign</artifactId>
+        </dependency>
     </dependencies>
 
 

+ 2 - 2
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/Application.java

@@ -1,13 +1,12 @@
 package com.tzld.piaoquan.recommend.server;
 
 
-import com.tzld.piaoquan.abtest.client.ABTestClient;
-import com.tzld.piaoquan.recommend.feature.client.FeatureClient;
 import com.tzld.piaoquan.recommend.feature.client.FeatureV2Client;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.data.redis.RedisReactiveAutoConfiguration;
 import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
+import org.springframework.cloud.openfeign.EnableFeignClients;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
@@ -33,6 +32,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
 @EnableEurekaClient
 @EnableAspectJAutoProxy
 @EnableScheduling
+@EnableFeignClients
 public class Application {
     public static void main(String[] args) {
         SpringApplication.run(Application.class, args);

+ 10 - 1
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/common/RedisKeyConstants.java

@@ -5,8 +5,17 @@ package com.tzld.piaoquan.recommend.server.common;
  */
 public class RedisKeyConstants {
 
-    public static class Recommend{
+    public static class Recommend {
         public static String riskUserMid = "risk:user:mid";
         public static String riskUserUid = "risk:user:uid";
     }
+
+    public static class DouHot {
+
+        // dou:hot:flow:pool:level:item:{省份}:{level}
+        public static String ITEM_REDIS_KEY_FORMAT = "dou:hot:flow:pool:level:item:%s:%s";
+
+        // dou:hot:flow:pool:local:distribute:count:{video}:{flowPool}
+        public static String LOCAL_DISTRIBUTE_KEY_FORMAT = "dou:hot:flow:pool:local:distribute:count:%s:%s";
+    }
 }

+ 17 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/feign/FlowPoolFeign.java

@@ -0,0 +1,17 @@
+package com.tzld.piaoquan.recommend.server.feign;
+
+import com.alibaba.fastjson.JSONObject;
+import com.tzld.piaoquan.recommend.server.feign.model.FlowPoolResponse;
+import com.tzld.piaoquan.recommend.server.feign.model.FlowPoolVideoInfo;
+import org.springframework.cloud.openfeign.FeignClient;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+
+import java.util.List;
+
+@FeignClient(value = "flowpool")
+public interface FlowPoolFeign {
+
+    @PostMapping("/flowpool/video/getFlowPoolVideo")
+    FlowPoolResponse<List<FlowPoolVideoInfo>> getFlowPoolVideo(@RequestBody JSONObject param);
+}

+ 18 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/feign/model/FlowPoolResponse.java

@@ -0,0 +1,18 @@
+package com.tzld.piaoquan.recommend.server.feign.model;
+
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString
+public class FlowPoolResponse<T> {
+    private int code;
+
+    private String msg;
+
+    private T data;
+
+    private String redirect;
+
+    private String success;
+}

+ 25 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/feign/model/FlowPoolVideoInfo.java

@@ -0,0 +1,25 @@
+package com.tzld.piaoquan.recommend.server.feign.model;
+
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString
+public class FlowPoolVideoInfo {
+
+    private Long videoId;
+
+    private String flowPool;
+
+    private Integer distributeCount;
+
+    private Double score;
+
+    private Integer flowPoolId;
+
+    private Integer flowPoolLevelId;
+
+    private Integer level;
+
+    private String province;
+}

+ 27 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/repository/DouHotVideoMapping.java

@@ -0,0 +1,27 @@
+package com.tzld.piaoquan.recommend.server.repository;
+
+import lombok.Data;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import java.util.Date;
+
+@Data
+@Entity
+@Table(name = "douhot_video_mapping")
+public class DouHotVideoMapping {
+
+    @Id
+    private Long id;
+
+    private Long videoId;
+
+    private String vid;
+
+    private Integer type;
+
+    private Date createTime;
+
+    private Date updateTime;
+}

+ 13 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/repository/DouHotVideoMappingRepository.java

@@ -0,0 +1,13 @@
+package com.tzld.piaoquan.recommend.server.repository;
+
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public interface DouHotVideoMappingRepository extends JpaRepository<DouHotVideoMapping, Long> {
+
+    List<DouHotVideoMapping> findAllByVideoIdIn(List<Long> videoIdList);
+
+}

+ 33 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/repository/DouHotVideoPortraitData.java

@@ -0,0 +1,33 @@
+package com.tzld.piaoquan.recommend.server.repository;
+
+
+import lombok.Data;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+import javax.persistence.Table;
+import java.util.Date;
+
+@Data
+@Entity
+@Table(name = "douhot_video_portrait_data")
+public class DouHotVideoPortraitData {
+
+    @Id
+    private Long id;
+
+    private String vid;
+
+    private String name;
+
+    private Double value;
+
+    private String option;
+
+    private Integer type;
+
+    private Date createTime;
+
+    private Date updateTime;
+
+}

+ 13 - 0
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/repository/DouHotVideoPortraitDataRepository.java

@@ -0,0 +1,13 @@
+package com.tzld.piaoquan.recommend.server.repository;
+
+import org.springframework.data.jpa.repository.JpaRepository;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+@Repository
+public interface DouHotVideoPortraitDataRepository extends JpaRepository<DouHotVideoPortraitData, Long> {
+
+    List<DouHotVideoPortraitData> findAllByVidInAndType(List<String> videoIds, Integer type);
+
+}

+ 124 - 3
recommend-server-service/src/main/java/com/tzld/piaoquan/recommend/server/service/flowpool/FlowPoolService.java

@@ -1,22 +1,39 @@
 package com.tzld.piaoquan.recommend.server.service.flowpool;
 
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.collect.Lists;
+import com.tzld.piaoquan.recommend.server.common.RedisKeyConstants;
+import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
+import com.tzld.piaoquan.recommend.server.feign.FlowPoolFeign;
+import com.tzld.piaoquan.recommend.server.feign.model.FlowPoolResponse;
+import com.tzld.piaoquan.recommend.server.feign.model.FlowPoolVideoInfo;
 import com.tzld.piaoquan.recommend.server.model.TripleConsumer;
 import com.tzld.piaoquan.recommend.server.model.Video;
-import com.tzld.piaoquan.recommend.server.common.ThreadPoolFactory;
+import com.tzld.piaoquan.recommend.server.repository.DouHotVideoMapping;
+import com.tzld.piaoquan.recommend.server.repository.DouHotVideoMappingRepository;
+import com.tzld.piaoquan.recommend.server.repository.DouHotVideoPortraitData;
+import com.tzld.piaoquan.recommend.server.repository.DouHotVideoPortraitDataRepository;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.tzld.piaoquan.recommend.server.common.enums.AppTypeEnum.*;
-import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants.*;
+import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants.KEY_WITH_LEVEL_FORMAT;
+import static com.tzld.piaoquan.recommend.server.service.flowpool.FlowPoolConstants.KEY_WITH_LEVEL_FORMAT_V2;
 
 /**
  * @author dyp
@@ -30,6 +47,17 @@ public class FlowPoolService {
     @Autowired
     private FlowPoolConfigService flowPoolConfigService;
 
+    @Autowired
+    private DouHotVideoMappingRepository douHotVideoMappingRepository;
+    @Autowired
+    private DouHotVideoPortraitDataRepository douHotVideoPortraitDataRepository;
+
+    @Value("${dou.hot.flow.pool.id:18}")
+    private Integer douHotFlowPoolId;
+
+    @Autowired
+    private FlowPoolFeign flowPoolFeign;
+
     private final String localDistributeCountFormat = "flow:pool:local:distribute:count:%s:%s";
 
     public final String valueFormat = "%s-%s";
@@ -124,5 +152,98 @@ public class FlowPoolService {
             }
         });
     }
+
+    public void syncDouHotFlowPoolVideo() {
+        List<FlowPoolVideoInfo> allDouHotVideo = this.findAllDouHotVideoFromFlowPool();
+        log.info("[DouHot video size]: {}", allDouHotVideo.size());
+
+        List<Long> allVideoId = allDouHotVideo.stream()
+                .map(FlowPoolVideoInfo::getVideoId)
+                .collect(Collectors.toList());
+
+        // 补充省份信息
+        Map<Long, String> allVideoAndProvinceMap = this.findAllVideoAndProvinceMap(allVideoId);
+        for (FlowPoolVideoInfo flowPoolVideoInfo : allDouHotVideo) {
+            if (allVideoAndProvinceMap.containsKey(flowPoolVideoInfo.getVideoId())) {
+                flowPoolVideoInfo.setProvince(allVideoAndProvinceMap.get(flowPoolVideoInfo.getVideoId()));
+            }
+        }
+
+        allDouHotVideo = allDouHotVideo.stream()
+                .filter(i -> StringUtils.isNotBlank(i.getProvince()))
+                .collect(Collectors.toList());
+        log.info("[DouHot filter empty province after video size]: {}", allDouHotVideo.size());
+
+
+        Map<String, List<FlowPoolVideoInfo>> provinceAndVideoListMap = allDouHotVideo.stream()
+                .collect(Collectors.groupingBy(FlowPoolVideoInfo::getProvince, Collectors.toList()));
+
+        for (Map.Entry<String, List<FlowPoolVideoInfo>> entry : provinceAndVideoListMap.entrySet()) {
+            String province = entry.getKey();
+            List<String> items = entry.getValue().stream()
+                    .map(i -> String.format("%d-%s", i.getVideoId(), i.getFlowPool()))
+                    .collect(Collectors.toList());
+
+            log.info("[DouHot province video size]. province: {}, video size: {}", entry.getKey(), items.size());
+
+            String redisKey = String.format(RedisKeyConstants.DouHot.ITEM_REDIS_KEY_FORMAT, province, "1");
+            log.info("[DouHot item redis key]: {}", redisKey);
+            redisTemplate.delete(redisKey);
+            redisTemplate.opsForSet().add(redisKey, items.toArray(new String[0]));
+            redisTemplate.expire(redisKey, 24 * 60 * 60, TimeUnit.SECONDS);
+        }
+
+    }
+
+    private Map<Long, String> findAllVideoAndProvinceMap(List<Long> videoIds) {
+        // 获取票圈视频ID与热点宝vid的映射
+        List<List<Long>> videoIdPartition = Lists.partition(videoIds, 500);
+        Map<Long, String> videoIdAndVidMap = videoIdPartition.stream().map(douHotVideoMappingRepository::findAllByVideoIdIn)
+                .flatMap(List::stream)
+                .collect(Collectors.toMap(DouHotVideoMapping::getVideoId, DouHotVideoMapping::getVid, (o1, o2) -> o1));
+
+        // 获取热点宝vid与地域的映射
+        List<List<String>> vidPartition = Lists.partition(new ArrayList<>(videoIdAndVidMap.values()), 500);
+        Map<String, String> vidAndProvinceMap = vidPartition.stream().map(i -> douHotVideoPortraitDataRepository.findAllByVidInAndType(i, 4))
+                .flatMap(List::stream)
+                .collect(Collectors.toMap(DouHotVideoPortraitData::getVid, DouHotVideoPortraitData::getName, (o1, o2) -> o1));
+
+        Map<Long, String> resultMap = new HashMap<>(videoIdAndVidMap.size());
+        for (Map.Entry<Long, String> entry : videoIdAndVidMap.entrySet()) {
+            Long videoId = entry.getKey();
+            String vid = entry.getValue();
+            if (vidAndProvinceMap.containsKey(vid)) {
+                resultMap.put(videoId, vidAndProvinceMap.get(vid));
+            }
+        }
+
+        return resultMap;
+    }
+
+    private List<FlowPoolVideoInfo> findAllDouHotVideoFromFlowPool() {
+        List<FlowPoolVideoInfo> result = new ArrayList<>();
+        int pageNum = 0;
+        while (true) {
+            JSONObject paramJson = new JSONObject();
+            paramJson.put("flowPoolId", douHotFlowPoolId);
+            paramJson.put("appType", 0);
+            paramJson.put("pageSize", 1000);
+            paramJson.put("pageNum", pageNum++);
+
+            log.info("[get DouHot flow pool video] paramJson:{}", paramJson);
+            FlowPoolResponse<List<FlowPoolVideoInfo>> response = flowPoolFeign.getFlowPoolVideo(paramJson);
+            if (0 != response.getCode()) {
+                log.error("[get DouHot flow pool video request error] responseJson: {}", response);
+                break;
+            }
+
+            if (CollectionUtils.isEmpty(response.getData())) {
+                log.error("[get DouHot flow pool video data is empty] responseJson: {}", response);
+                break;
+            }
+            result.addAll(response.getData());
+        }
+        return result;
+    }
 }