Просмотр исходного кода

syncContentPlatformQwDatastatJob 优化

wangyunpeng 4 часов назад
Родитель
Сommit
4457103f3c

+ 99 - 123
api-module/src/main/java/com/tzld/piaoquan/api/job/contentplatform/ContentPlatformDatastatJob.java

@@ -13,7 +13,6 @@ import com.tzld.piaoquan.api.model.po.contentplatform.*;
 import com.tzld.piaoquan.api.model.vo.WxAccountDatastatVO;
 import com.tzld.piaoquan.api.service.contentplatform.ContentPlatformPlanService;
 import com.tzld.piaoquan.growth.common.utils.DateUtil;
-import com.tzld.piaoquan.growth.common.utils.page.Page;
 import com.tzld.piaoquan.growth.common.utils.OdpsUtil;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
@@ -639,134 +638,129 @@ public class ContentPlatformDatastatJob {
             dt = param;
         }
         Long now = System.currentTimeMillis();
-        int qwPlanPageSize = 500;
-        int qwPlanPageNum = 1;
-        // 跨批次记录已处理的 rootSourceId,避免重复入库
-        Set<String> existRootSourceIds = new HashSet<>();
-        boolean firstBatch = true;
+
+        // 1. 一次性查询两张 ODPS 表,收集 rootSourceId → firstLevelCount
+        // qw_out 优先,同一个 rootSourceId 在 qw_out 中存在则跳过 qw_out2
+        Map<String, Integer> outFirstLevelMap = new HashMap<>();
+        Map<String, Integer> out2FirstLevelMap = new HashMap<>();
+        int odpsPageSize = 5000;
+
+        int odpsPageNum = 1;
         while (true) {
-            // 分批查询 qwPlan
-            List<ContentPlatformQwPlan> qwPlanPage = getQwPlanByPage(qwPlanPageNum, qwPlanPageSize);
-            if (CollectionUtils.isEmpty(qwPlanPage)) {
+            Integer offset = (odpsPageNum - 1) * odpsPageSize;
+            String outSql = String.format("SELECT rootsourceid, 首层访问人数 " +
+                    "FROM loghubods.qw_out_touliu_behavior_detail " +
+                    "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, odpsPageSize);
+            List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
+            if (CollectionUtils.isEmpty(outDataList)) {
+                break;
+            }
+            for (Record record : outDataList) {
+                outFirstLevelMap.put((String) record.get(0), Integer.parseInt((String) record.get(1)));
+            }
+            if (outDataList.size() < odpsPageSize) {
+                break;
+            }
+            odpsPageNum++;
+        }
+
+        odpsPageNum = 1;
+        while (true) {
+            Integer offset = (odpsPageNum - 1) * odpsPageSize;
+            String out2Sql = String.format("SELECT rootsourceid, 首层访问人数 " +
+                    "FROM loghubods.qw_out2_touliu_behavior_detail " +
+                    "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, odpsPageSize);
+            List<Record> outDataList = OdpsUtil.getOdpsData(out2Sql);
+            if (CollectionUtils.isEmpty(outDataList)) {
+                break;
+            }
+            for (Record record : outDataList) {
+                String rootSourceId = (String) record.get(0);
+                // qw_out 优先,已存在的跳过
+                if (!outFirstLevelMap.containsKey(rootSourceId)) {
+                    out2FirstLevelMap.put(rootSourceId, Integer.parseInt((String) record.get(1)));
+                }
+            }
+            if (outDataList.size() < odpsPageSize) {
                 break;
             }
-            // 构建当前批次的 map
+            odpsPageNum++;
+        }
+
+        if (outFirstLevelMap.isEmpty() && out2FirstLevelMap.isEmpty()) {
+            return ReturnT.SUCCESS;
+        }
+
+        // 2. 根据 ODPS 查到的 rootSourceId,只查询匹配的 qwPlan(不再查询全量 qwPlan)
+        Set<String> allRootSourceIds = new HashSet<>();
+        allRootSourceIds.addAll(outFirstLevelMap.keySet());
+        allRootSourceIds.addAll(out2FirstLevelMap.keySet());
+        List<String> rootSourceIdList = new ArrayList<>(allRootSourceIds);
+        List<List<String>> rootSourceIdBatches = Lists.partition(rootSourceIdList, 5000);
+
+        boolean firstBatch = true;
+        for (List<String> batchRootSourceIds : rootSourceIdBatches) {
+            List<ContentPlatformQwPlan> qwPlanList = planService.getQwPlanListByRootSourceIds(batchRootSourceIds);
+            if (CollectionUtils.isEmpty(qwPlanList)) {
+                continue;
+            }
+            // 构建 plan 相关 map
             Map<Long, ContentPlatformQwPlan> planMap = new HashMap<>();
-            Map<String, Long> rootSourceIdMap = new HashMap<>();
-            for (ContentPlatformQwPlan plan : qwPlanPage) {
+            Map<String, Long> rootSourceIdToPlanId = new HashMap<>();
+            for (ContentPlatformQwPlan plan : qwPlanList) {
                 planMap.put(plan.getId(), plan);
-                rootSourceIdMap.put(plan.getRootSourceId(), plan.getId());
+                rootSourceIdToPlanId.put(plan.getRootSourceId(), plan.getId());
             }
-            List<String> rootSourceIds = new ArrayList<>(rootSourceIdMap.keySet());
-            // 查询当前批次的 planVideo
+            // 查询 planVideo
             List<Long> planIds = new ArrayList<>(planMap.keySet());
             List<ContentPlatformQwPlanVideo> planVideoList = planService.getQwPlanVideoList(planIds);
             Map<Long, Long> planVideoMap = planVideoList.stream()
                     .collect(Collectors.toMap(ContentPlatformQwPlanVideo::getPlanId, ContentPlatformQwPlanVideo::getVideoId));
-            // 查询当前批次的 videoScore
+            // 查询 videoScore
             List<Long> videoIds = planVideoList.stream().map(ContentPlatformQwPlanVideo::getVideoId).collect(Collectors.toList());
             Map<Long, Double> videoScoreMap = new HashMap<>();
             if (CollectionUtils.isNotEmpty(videoIds)) {
                 List<List<Long>> videoIdPartitions = Lists.partition(videoIds, 2000);
-                List<ContentPlatformVideoAgg> videoList = new ArrayList<>();
                 for (List<Long> partition : videoIdPartitions) {
-                    videoList.addAll(planService.getVideoContentAggListByVideoIds(partition));
+                    List<ContentPlatformVideoAgg> videoList = planService.getVideoContentAggListByVideoIds(partition);
+                    for (ContentPlatformVideoAgg video : videoList) {
+                        videoScoreMap.putIfAbsent(video.getVideoId(), video.getScore());
+                    }
                 }
-                videoScoreMap = videoList.stream()
-                        .collect(Collectors.toMap(ContentPlatformVideoAgg::getVideoId, ContentPlatformVideoAgg::getScore, (a, b) -> a));
             }
-            // 对两张 ODPS 表分页查询,匹配当前批次数据
+            // 遍历当前批次的 rootSourceId,从 ODPS map 中匹配数据
             List<ContentPlatformQwDataStat> saveList = new ArrayList<>();
-            int odpsPageSize = 5000;
-            int odpsPageNum = 1;
-            while (true) {
-                Integer offset = (odpsPageNum - 1) * odpsPageSize;
-                String outSql = String.format("SELECT rootsourceid, 首层访问人数 " +
-                        "FROM loghubods.qw_out_touliu_behavior_detail " +
-                        "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, odpsPageSize);
-                List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
-                if (CollectionUtils.isEmpty(outDataList)) {
-                    break;
-                }
-                for (Record record : outDataList) {
-                    String rootSourceId = (String) record.get(0);
-                    if (!rootSourceIds.contains(rootSourceId)) {
-                        continue;
-                    }
-                    if (existRootSourceIds.contains(rootSourceId)) {
-                        continue;
-                    }
-                    int firstLevelCount = Integer.parseInt((String) record.get(1));
-                    if (firstLevelCount == 0) {
-                        continue;
-                    }
-                    ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
-                    item.setDateStr(dt);
-                    Long planId = rootSourceIdMap.get(rootSourceId);
-                    ContentPlatformQwPlan qwPlan = planMap.get(planId);
-                    item.setSubChannel(StringUtils.hasText(qwPlan.getSubChannel()) ? qwPlan.getSubChannel() : "未知");
-                    Long videoId = planVideoMap.get(planId);
-                    Double score = videoScoreMap.get(videoId);
-                    if (Objects.nonNull(score)) {
-                        BigDecimal num = BigDecimal.valueOf(score);
-                        BigDecimal rounded = num.setScale(2, RoundingMode.HALF_UP);
-                        item.setScore(rounded.doubleValue());
-                    }
-                    item.setRootSourceId(rootSourceId);
-                    item.setFirstLevelCount(firstLevelCount);
-                    item.setCreateTimestamp(now);
-                    saveList.add(item);
-                    existRootSourceIds.add(rootSourceId);
+            for (String rootSourceId : batchRootSourceIds) {
+                Long planId = rootSourceIdToPlanId.get(rootSourceId);
+                if (planId == null) {
+                    continue;
                 }
-                if (outDataList.size() < odpsPageSize) {
-                    break;
+                // qw_out 优先,其次 qw_out2
+                Integer firstLevelCount = outFirstLevelMap.get(rootSourceId);
+                boolean isFromOut = (firstLevelCount != null);
+                if (!isFromOut) {
+                    firstLevelCount = out2FirstLevelMap.get(rootSourceId);
                 }
-                odpsPageNum++;
-            }
-            odpsPageNum = 1;
-            while (true) {
-                Integer offset = (odpsPageNum - 1) * odpsPageSize;
-                String out2Sql = String.format("SELECT rootsourceid, 首层访问人数 " +
-                        "FROM loghubods.qw_out2_touliu_behavior_detail " +
-                        "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, odpsPageSize);
-                List<Record> outDataList = OdpsUtil.getOdpsData(out2Sql);
-                if (CollectionUtils.isEmpty(outDataList)) {
-                    break;
+                if (firstLevelCount == null) {
+                    continue;
                 }
-                for (Record record : outDataList) {
-                    String rootSourceId = (String) record.get(0);
-                    if (!rootSourceIds.contains(rootSourceId)) {
-                        continue;
-                    }
-                    if (existRootSourceIds.contains(rootSourceId)) {
-                        continue;
-                    }
-                    int firstLevelCount = Integer.parseInt((String) record.get(1));
-                    if (firstLevelCount == 0) {
-                        continue;
-                    }
-                    ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
-                    item.setDateStr(dt);
-                    Long planId = rootSourceIdMap.get(rootSourceId);
-                    Long videoId = planVideoMap.get(planId);
-                    Double score = videoScoreMap.get(videoId);
-                    if (Objects.nonNull(score)) {
-                        BigDecimal num = BigDecimal.valueOf(score);
-                        BigDecimal rounded = num.setScale(2, RoundingMode.HALF_UP);
-                        item.setScore(rounded.doubleValue());
-                    }
-                    item.setRootSourceId(rootSourceId);
-                    item.setFirstLevelCount(firstLevelCount);
-                    item.setCreateTimestamp(now);
-                    saveList.add(item);
-                    existRootSourceIds.add(rootSourceId);
+                ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
+                item.setDateStr(dt);
+                item.setRootSourceId(rootSourceId);
+                item.setFirstLevelCount(firstLevelCount);
+                if (isFromOut) {
+                    ContentPlatformQwPlan qwPlan = planMap.get(planId);
+                    item.setSubChannel(StringUtils.hasText(qwPlan.getSubChannel()) ? qwPlan.getSubChannel() : "未知");
                 }
-                if (outDataList.size() < odpsPageSize) {
-                    break;
+                Long videoId = planVideoMap.get(planId);
+                Double score = videoScoreMap.get(videoId);
+                if (Objects.nonNull(score)) {
+                    item.setScore(BigDecimal.valueOf(score).setScale(2, RoundingMode.HALF_UP).doubleValue());
                 }
-                odpsPageNum++;
+                item.setCreateTimestamp(now);
+                saveList.add(item);
             }
-            // 当前批次处理完后立即入库,第一批先删再插,后续批次直接追加
+            // 入库:第一批先删再插,后续批次直接追加
             if (CollectionUtils.isNotEmpty(saveList)) {
                 if (firstBatch) {
                     dataStatMapperExt.deleteQwDatastat(dt);
@@ -774,28 +768,10 @@ public class ContentPlatformDatastatJob {
                 }
                 dataStatMapperExt.batchInsertQwDatastat(saveList);
             }
-            // 清除当前批次数据,释放内存
-            planMap.clear();
-            rootSourceIdMap.clear();
-            planVideoList.clear();
-            planVideoMap.clear();
-            videoScoreMap.clear();
-            saveList.clear();
-            if (qwPlanPage.size() < qwPlanPageSize) {
-                break;
-            }
-            qwPlanPageNum++;
         }
         return ReturnT.SUCCESS;
     }
 
-    private List<ContentPlatformQwPlan> getQwPlanByPage(int pageNum, int pageSize) {
-        ContentPlatformQwPlanExample example = new ContentPlatformQwPlanExample();
-        Page page = new Page(pageNum, pageSize);
-        example.setPage(page);
-        return qwPlanMapper.selectByExample(example);
-    }
-
     @XxlJob("syncContentPlatformQwDatastatTotalJob")
     public ReturnT<String> syncContentPlatformQwDatastatTotalJob(String param) {
         String dt = DateUtil.getBeforeDayDateString("yyyyMMdd");