Просмотр исходного кода

queryDemandMatchResult 分页查询

wangyunpeng 6 часов назад
Родитель
Сommit
386f80e8a5

+ 2 - 0
api-module/src/main/java/com/tzld/piaoquan/api/dao/mapper/contentplatform/ext/ContentPlatformDemandVideoMapperExt.java

@@ -11,6 +11,8 @@ public interface ContentPlatformDemandVideoMapperExt {
 
     int deleteByDt(@Param("dt") String dt);
 
+    int deleteByDtAndChannelName(@Param("dt") String dt, @Param("channelName") String channelName);
+
     List<ContentPlatformDemandVideo> selectByCondition(@Param("dt") String dt,
                                                        @Param("channelName") String channelName,
                                                        @Param("pointType") String pointType,

+ 96 - 63
api-module/src/main/java/com/tzld/piaoquan/api/job/contentplatform/ContentPlatformDemandVideoJob.java

@@ -35,6 +35,11 @@ public class ContentPlatformDemandVideoJob {
     @Autowired
     private MessageAttachmentService messageAttachmentService;
 
+    private static final List<String> SYNC_CHANNEL_NAMES = Arrays.asList(
+            "公众号合作-即转-稳定",
+            "群/企微合作-稳定"
+    );
+
     @XxlJob("syncContentPlatformDemandVideoJob")
     public ReturnT<String> syncContentPlatformDemandVideoJob(String param) {
         String dt = DateUtil.getBeforeDayDateString("yyyyMMdd");
@@ -43,35 +48,65 @@ public class ContentPlatformDemandVideoJob {
         }
         log.info("syncContentPlatformDemandVideoJob start, dt={}", dt);
 
-        try {
-            // 调用接口获取需求匹配结果
+        for (String syncChannelName : SYNC_CHANNEL_NAMES) {
+            try {
+                syncByChannel(dt, syncChannelName);
+            } catch (Exception e) {
+                log.error("syncContentPlatformDemandVideoJob error, dt={}, channelName={}", dt, syncChannelName, e);
+                return ReturnT.FAIL;
+            }
+        }
+
+        log.info("syncContentPlatformDemandVideoJob finish, dt={}", dt);
+        return ReturnT.SUCCESS;
+    }
+
+    private static final int PAGE_SIZE = 10000;
+
+    private void syncByChannel(String dt, String syncChannelName) throws Exception {
+        log.info("syncByChannel start, dt={}, channelName={}", dt, syncChannelName);
+
+        Long now = System.currentTimeMillis();
+        List<ContentPlatformDemandVideo> saveList = new ArrayList<>();
+
+        int pageNum = 1;
+        int totalPages = 1;
+
+        while (pageNum <= totalPages) {
+            // 调用分页接口获取需求匹配结果
             JSONObject requestParam = new JSONObject();
             requestParam.put("dt", dt);
-            requestParam.put("channelName", "群/企微合作-稳定");
+            requestParam.put("channelName", syncChannelName);
+            requestParam.put("pageNum", pageNum);
+            requestParam.put("pageSize", PAGE_SIZE);
 
             String response = httpPoolClient.post(DEMAND_MATCH_API_URL, requestParam.toJSONString());
             if (!StringUtils.hasText(response)) {
-                log.error("syncContentPlatformDemandVideoJob response is empty, dt={}", dt);
-                return ReturnT.FAIL;
+                log.error("syncByChannel response is empty, dt={}, channelName={}, pageNum={}", dt, syncChannelName, pageNum);
+                throw new RuntimeException("syncByChannel response is empty");
             }
 
             JSONObject result = JSONObject.parseObject(response);
             if (result.getInteger("code") != 0) {
-                log.error("syncContentPlatformDemandVideoJob api error, dt={}, msg={}", dt, result.getString("msg"));
-                return ReturnT.FAIL;
+                log.error("syncByChannel api error, dt={}, channelName={}, pageNum={}, msg={}", dt, syncChannelName, pageNum, result.getString("msg"));
+                throw new RuntimeException("syncByChannel api error: " + result.getString("msg"));
             }
 
-            JSONArray dataArray = result.getJSONArray("data");
-            if (dataArray == null || dataArray.isEmpty()) {
-                log.info("syncContentPlatformDemandVideoJob no data, dt={}", dt);
-                return ReturnT.SUCCESS;
+            JSONObject dataObj = result.getJSONObject("data");
+            if (dataObj == null) {
+                log.info("syncByChannel no data, dt={}, channelName={}, pageNum={}", dt, syncChannelName, pageNum);
+                break;
             }
 
-            Long now = System.currentTimeMillis();
-            List<ContentPlatformDemandVideo> saveList = new ArrayList<>();
+            totalPages = dataObj.getIntValue("totalPages");
+            JSONArray records = dataObj.getJSONArray("records");
+            if (records == null || records.isEmpty()) {
+                log.info("syncByChannel no records, dt={}, channelName={}, pageNum={}", dt, syncChannelName, pageNum);
+                break;
+            }
 
-            for (int i = 0; i < dataArray.size(); i++) {
-                JSONObject demandItem = dataArray.getJSONObject(i);
+            for (int i = 0; i < records.size(); i++) {
+                JSONObject demandItem = records.getJSONObject(i);
                 String channelName = demandItem.getString("channelName");
                 String crowdSegment = demandItem.getString("crowdSegment");
                 String dimension = demandItem.getString("dimension");
@@ -147,61 +182,59 @@ public class ContentPlatformDemandVideoJob {
                 }
             }
 
-            if (CollectionUtils.isEmpty(saveList)) {
-                log.info("syncContentPlatformDemandVideoJob no matched videos, dt={}", dt);
-                return ReturnT.SUCCESS;
-            }
+            pageNum++;
+        }
 
-            // 获取视频详情(标题、封面、视频URL)
-            List<Long> videoIds = saveList.stream().map(ContentPlatformDemandVideo::getVideoId)
-                    .distinct().collect(Collectors.toList());
-            Map<Long, VideoDetail> videoDetailMap = new HashMap<>();
-            for (List<Long> partition : Lists.partition(videoIds, 20)) {
-                Set<Long> ids = new HashSet<>(partition);
-                videoDetailMap.putAll(messageAttachmentService.getVideoDetail(ids));
-            }
+        if (CollectionUtils.isEmpty(saveList)) {
+            log.info("syncByChannel no matched videos, dt={}, channelName={}", dt, syncChannelName);
+            return;
+        }
 
-            // 填充视频详情
-            for (ContentPlatformDemandVideo demandVideo : saveList) {
-                VideoDetail detail = videoDetailMap.get(demandVideo.getVideoId());
-                if (Objects.nonNull(detail)) {
-                    demandVideo.setTitle(detail.getTitle());
-                    String cover = detail.getCover();
-                    if (StringUtils.hasText(cover) && cover.contains("/watermark")) {
-                        cover = cover.substring(0, cover.indexOf("/watermark"));
-                    }
-                    demandVideo.setCover(cover);
-                    demandVideo.setVideo(detail.getVideoPath());
+        // 获取视频详情(标题、封面、视频URL)
+        List<Long> videoIds = saveList.stream().map(ContentPlatformDemandVideo::getVideoId)
+                .distinct().collect(Collectors.toList());
+        Map<Long, VideoDetail> videoDetailMap = new HashMap<>();
+        for (List<Long> partition : Lists.partition(videoIds, 20)) {
+            Set<Long> ids = new HashSet<>(partition);
+            videoDetailMap.putAll(messageAttachmentService.getVideoDetail(ids));
+        }
+
+        // 填充视频详情
+        for (ContentPlatformDemandVideo demandVideo : saveList) {
+            VideoDetail detail = videoDetailMap.get(demandVideo.getVideoId());
+            if (Objects.nonNull(detail)) {
+                demandVideo.setTitle(detail.getTitle());
+                String cover = detail.getCover();
+                if (StringUtils.hasText(cover) && cover.contains("/watermark")) {
+                    cover = cover.substring(0, cover.indexOf("/watermark"));
                 }
+                demandVideo.setCover(cover);
+                demandVideo.setVideo(detail.getVideoPath());
             }
+        }
 
-            // 过滤未获取到视频详情的记录
-            saveList = saveList.stream()
-                    .filter(v -> StringUtils.hasText(v.getTitle()))
-                    .collect(Collectors.toList());
-
-            // 按crowd_segment粒度去重,相同videoId保留分数最高的一条
-            saveList = saveList.stream()
-                    .collect(Collectors.groupingBy(v -> v.getCrowdSegment() + "_" + v.getVideoId()))
-                    .values().stream()
-                    .map(group -> group.stream()
-                            .max(Comparator.comparingDouble(v -> v.getScore() != null ? v.getScore() : 0.0))
-                            .orElse(null))
-                    .filter(Objects::nonNull)
-                    .collect(Collectors.toList());
-
-            // 先删除当天数据,再批量插入
-            demandVideoMapperExt.deleteByDt(dt);
-            for (List<ContentPlatformDemandVideo> partition : Lists.partition(saveList, 500)) {
-                demandVideoMapperExt.batchInsert(partition);
-            }
+        // 过滤未获取到视频详情的记录
+        saveList = saveList.stream()
+                .filter(v -> StringUtils.hasText(v.getTitle()))
+                .collect(Collectors.toList());
 
-            log.info("syncContentPlatformDemandVideoJob success, dt={}, count={}", dt, saveList.size());
-            return ReturnT.SUCCESS;
-        } catch (Exception e) {
-            log.error("syncContentPlatformDemandVideoJob error, dt={}", dt, e);
-            return ReturnT.FAIL;
+        // 按crowd_segment粒度去重,相同videoId保留分数最高的一条
+        saveList = saveList.stream()
+                .collect(Collectors.groupingBy(v -> v.getCrowdSegment() + "_" + v.getVideoId()))
+                .values().stream()
+                .map(group -> group.stream()
+                        .max(Comparator.comparingDouble(v -> v.getScore() != null ? v.getScore() : 0.0))
+                        .orElse(null))
+                .filter(Objects::nonNull)
+                .collect(Collectors.toList());
+
+        // 先删除当天该渠道数据,再批量插入
+        demandVideoMapperExt.deleteByDtAndChannelName(dt, syncChannelName);
+        for (List<ContentPlatformDemandVideo> partition : Lists.partition(saveList, 500)) {
+            demandVideoMapperExt.batchInsert(partition);
         }
+
+        log.info("syncByChannel success, dt={}, channelName={}, count={}", dt, syncChannelName, saveList.size());
     }
 
     @XxlJob("checkContentPlatformDemandVideoStatusJob")

+ 4 - 0
api-module/src/main/resources/mapper/contentplatform/ext/ContentPlatformDemandVideoMapperExt.xml

@@ -27,6 +27,10 @@
         DELETE FROM content_platform_demand_video WHERE dt = #{dt}
     </delete>
 
+    <delete id="deleteByDtAndChannelName">
+        DELETE FROM content_platform_demand_video WHERE dt = #{dt} AND channel_name = #{channelName}
+    </delete>
+
     <select id="selectByCondition" resultType="com.tzld.piaoquan.api.model.po.contentplatform.ContentPlatformDemandVideo">
         SELECT id, dt, channel_name, crowd_segment, dimension, point_type, standard_element,
                category_name, demand_id, crowd_package, conversion_target, partner, account, scene_value,