Просмотр исходного кода

长文推送场景值、外部变更检查注释

wangyunpeng 9 часов назад
Родитель
Сommit
3796f3f84d

+ 135 - 111
api-module/src/main/java/com/tzld/piaoquan/api/job/contentplatform/ContentPlatformDatastatJob.java

@@ -12,6 +12,7 @@ import com.tzld.piaoquan.api.model.po.contentplatform.*;
 import com.tzld.piaoquan.api.model.vo.WxAccountDatastatVO;
 import com.tzld.piaoquan.api.service.contentplatform.ContentPlatformPlanService;
 import com.tzld.piaoquan.growth.common.utils.DateUtil;
+import com.tzld.piaoquan.growth.common.utils.page.Page;
 import com.tzld.piaoquan.growth.common.utils.OdpsUtil;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
@@ -630,144 +631,167 @@ public class ContentPlatformDatastatJob {
         return gzhAccountMapper.selectByExample(example);
     }
 
-    private List<ContentPlatformQwDataStat> getQwDatastatCount(String dt) {
-        ContentPlatformQwDataStatExample example = new ContentPlatformQwDataStatExample();
-        example.createCriteria().andDateStrEqualTo(dt);
-        return qwDataStatMapper.selectByExample(example);
-    }
-
     @XxlJob("syncContentPlatformQwDatastatJob")
     public ReturnT<String> syncContentPlatformQwDatastatJob(String param) {
         String dt = DateUtil.getBeforeDayDateString("yyyyMMdd");
         if (StringUtils.hasText(param)) {
             dt = param;
         }
-        List<ContentPlatformQwPlan> qwPlanList = getAllQwPlan();
-        if (CollectionUtils.isEmpty(qwPlanList)) {
-            return ReturnT.SUCCESS;
-        }
-        Map<Long, ContentPlatformQwPlan> planMap = qwPlanList.stream()
-                .collect(Collectors.toMap(ContentPlatformQwPlan::getId, plan -> plan));
-        Map<String, Long> rootSourceIdMap = qwPlanList.stream()
-                .collect(Collectors.toMap(ContentPlatformQwPlan::getRootSourceId, ContentPlatformQwPlan::getId));
-        List<Long> planIds = qwPlanList.stream().map(ContentPlatformQwPlan::getId).collect(Collectors.toList());
-        List<ContentPlatformQwPlanVideo> planVideoList = new ArrayList<>();
-        List<List<Long>> partitionList = Lists.partition(planIds, 2000);
-        for (List<Long> partition : partitionList) {
-            planVideoList.addAll(planService.getQwPlanVideoList(partition));
-        }
-        Map<Long, Long> planVideoMap = planVideoList.stream()
-                .collect(Collectors.toMap(ContentPlatformQwPlanVideo::getPlanId, ContentPlatformQwPlanVideo::getVideoId));
-        List<Long> videoIds = planVideoList.stream().map(ContentPlatformQwPlanVideo::getVideoId).collect(Collectors.toList());
-        List<ContentPlatformVideoAgg> videoList = new ArrayList<>();
-        List<List<Long>> videoIdPartitionList = Lists.partition(videoIds, 2000);
-        for (List<Long> partition : videoIdPartitionList) {
-            videoList.addAll(planService.getVideoContentAggListByVideoIds(partition));
-        }
-        Map<Long, Double> videoScoreMap = videoList.stream()
-                .collect(Collectors.toMap(ContentPlatformVideoAgg::getVideoId, ContentPlatformVideoAgg::getScore, (a, b) -> a));
-        List<ContentPlatformQwDataStat> saveList = new ArrayList<>();
-        List<String> rootSourceIds = qwPlanList.stream().map(ContentPlatformQwPlan::getRootSourceId).collect(Collectors.toList());
-        List<String> existRootSourceIds = new ArrayList<>();
         Long now = System.currentTimeMillis();
-        int pageSize = 5000;
-        int pageNum = 1;
+        int qwPlanPageSize = 500;
+        int qwPlanPageNum = 1;
+        // 跨批次记录已处理的 rootSourceId,避免重复入库
+        Set<String> existRootSourceIds = new HashSet<>();
+        boolean firstBatch = true;
         while (true) {
-            Integer offset = (pageNum - 1) * pageSize;
-            String outSql = String.format("SELECT rootsourceid, 首层访问人数 " +
-                    "FROM loghubods.qw_out_touliu_behavior_detail " +
-                    "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, pageSize);
-            List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
-            if (CollectionUtils.isEmpty(outDataList)) {
+            // 分批查询 qwPlan
+            List<ContentPlatformQwPlan> qwPlanPage = getQwPlanByPage(qwPlanPageNum, qwPlanPageSize);
+            if (CollectionUtils.isEmpty(qwPlanPage)) {
                 break;
             }
-            for (Record record : outDataList) {
-                ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
-                String rootSourceId = (String) record.get(0);
-                if (!rootSourceIds.contains(rootSourceId)) {
-                    continue;
+            // 构建当前批次的 map
+            Map<Long, ContentPlatformQwPlan> planMap = new HashMap<>();
+            Map<String, Long> rootSourceIdMap = new HashMap<>();
+            for (ContentPlatformQwPlan plan : qwPlanPage) {
+                planMap.put(plan.getId(), plan);
+                rootSourceIdMap.put(plan.getRootSourceId(), plan.getId());
+            }
+            List<String> rootSourceIds = new ArrayList<>(rootSourceIdMap.keySet());
+            // 查询当前批次的 planVideo
+            List<Long> planIds = new ArrayList<>(planMap.keySet());
+            List<ContentPlatformQwPlanVideo> planVideoList = planService.getQwPlanVideoList(planIds);
+            Map<Long, Long> planVideoMap = planVideoList.stream()
+                    .collect(Collectors.toMap(ContentPlatformQwPlanVideo::getPlanId, ContentPlatformQwPlanVideo::getVideoId));
+            // 查询当前批次的 videoScore
+            List<Long> videoIds = planVideoList.stream().map(ContentPlatformQwPlanVideo::getVideoId).collect(Collectors.toList());
+            Map<Long, Double> videoScoreMap = new HashMap<>();
+            if (CollectionUtils.isNotEmpty(videoIds)) {
+                List<List<Long>> videoIdPartitions = Lists.partition(videoIds, 2000);
+                List<ContentPlatformVideoAgg> videoList = new ArrayList<>();
+                for (List<Long> partition : videoIdPartitions) {
+                    videoList.addAll(planService.getVideoContentAggListByVideoIds(partition));
                 }
-                if (existRootSourceIds.contains(rootSourceId)) {
-                    continue;
+                videoScoreMap = videoList.stream()
+                        .collect(Collectors.toMap(ContentPlatformVideoAgg::getVideoId, ContentPlatformVideoAgg::getScore, (a, b) -> a));
+            }
+            // 对两张 ODPS 表分页查询,匹配当前批次数据
+            List<ContentPlatformQwDataStat> saveList = new ArrayList<>();
+            int odpsPageSize = 5000;
+            int odpsPageNum = 1;
+            while (true) {
+                Integer offset = (odpsPageNum - 1) * odpsPageSize;
+                String outSql = String.format("SELECT rootsourceid, 首层访问人数 " +
+                        "FROM loghubods.qw_out_touliu_behavior_detail " +
+                        "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, odpsPageSize);
+                List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
+                if (CollectionUtils.isEmpty(outDataList)) {
+                    break;
                 }
-                int firstLevelCount = Integer.parseInt((String) record.get(1));
-                if (firstLevelCount == 0) {
-                    continue;
+                for (Record record : outDataList) {
+                    String rootSourceId = (String) record.get(0);
+                    if (!rootSourceIds.contains(rootSourceId)) {
+                        continue;
+                    }
+                    if (existRootSourceIds.contains(rootSourceId)) {
+                        continue;
+                    }
+                    int firstLevelCount = Integer.parseInt((String) record.get(1));
+                    if (firstLevelCount == 0) {
+                        continue;
+                    }
+                    ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
+                    item.setDateStr(dt);
+                    Long planId = rootSourceIdMap.get(rootSourceId);
+                    ContentPlatformQwPlan qwPlan = planMap.get(planId);
+                    item.setSubChannel(StringUtils.hasText(qwPlan.getSubChannel()) ? qwPlan.getSubChannel() : "未知");
+                    Long videoId = planVideoMap.get(planId);
+                    Double score = videoScoreMap.get(videoId);
+                    if (Objects.nonNull(score)) {
+                        BigDecimal num = BigDecimal.valueOf(score);
+                        BigDecimal rounded = num.setScale(2, RoundingMode.HALF_UP);
+                        item.setScore(rounded.doubleValue());
+                    }
+                    item.setRootSourceId(rootSourceId);
+                    item.setFirstLevelCount(firstLevelCount);
+                    item.setCreateTimestamp(now);
+                    saveList.add(item);
+                    existRootSourceIds.add(rootSourceId);
                 }
-                item.setDateStr(dt);
-                Long planId = rootSourceIdMap.get(rootSourceId);
-                ContentPlatformQwPlan qwPlan = planMap.get(planId);
-                item.setSubChannel(StringUtils.hasText(qwPlan.getSubChannel()) ? qwPlan.getSubChannel() : "未知");
-                Long videoId = planVideoMap.get(planId);
-                Double score = videoScoreMap.get(videoId);
-                if (Objects.nonNull(score)) {
-                    BigDecimal num = BigDecimal.valueOf(score);
-                    BigDecimal rounded = num.setScale(2, RoundingMode.HALF_UP);
-                    item.setScore(rounded.doubleValue());
+                if (outDataList.size() < odpsPageSize) {
+                    break;
                 }
-                item.setRootSourceId(rootSourceId);
-                item.setFirstLevelCount(firstLevelCount);
-                item.setCreateTimestamp(now);
-                saveList.add(item);
-                existRootSourceIds.add(rootSourceId);
-            }
-            if (outDataList.size() < pageSize) {
-                break;
+                odpsPageNum++;
             }
-            pageNum++;
-        }
-        pageNum = 1;
-        while (true) {
-            Integer offset = (pageNum - 1) * pageSize;
-            String out2Sql = String.format("SELECT rootsourceid, 首层访问人数 " +
-                    "FROM loghubods.qw_out2_touliu_behavior_detail " +
-                    "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, pageSize);
-            List<Record> outDataList = OdpsUtil.getOdpsData(out2Sql);
-            if (CollectionUtils.isEmpty(outDataList)) {
-                break;
-            }
-            for (Record record : outDataList) {
-                ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
-                String rootSourceId = (String) record.get(0);
-                if (!rootSourceIds.contains(rootSourceId)) {
-                    continue;
+            odpsPageNum = 1;
+            while (true) {
+                Integer offset = (odpsPageNum - 1) * odpsPageSize;
+                String out2Sql = String.format("SELECT rootsourceid, 首层访问人数 " +
+                        "FROM loghubods.qw_out2_touliu_behavior_detail " +
+                        "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, odpsPageSize);
+                List<Record> outDataList = OdpsUtil.getOdpsData(out2Sql);
+                if (CollectionUtils.isEmpty(outDataList)) {
+                    break;
                 }
-                if (existRootSourceIds.contains(rootSourceId)) {
-                    continue;
+                for (Record record : outDataList) {
+                    String rootSourceId = (String) record.get(0);
+                    if (!rootSourceIds.contains(rootSourceId)) {
+                        continue;
+                    }
+                    if (existRootSourceIds.contains(rootSourceId)) {
+                        continue;
+                    }
+                    int firstLevelCount = Integer.parseInt((String) record.get(1));
+                    if (firstLevelCount == 0) {
+                        continue;
+                    }
+                    ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
+                    item.setDateStr(dt);
+                    Long planId = rootSourceIdMap.get(rootSourceId);
+                    Long videoId = planVideoMap.get(planId);
+                    Double score = videoScoreMap.get(videoId);
+                    if (Objects.nonNull(score)) {
+                        BigDecimal num = BigDecimal.valueOf(score);
+                        BigDecimal rounded = num.setScale(2, RoundingMode.HALF_UP);
+                        item.setScore(rounded.doubleValue());
+                    }
+                    item.setRootSourceId(rootSourceId);
+                    item.setFirstLevelCount(firstLevelCount);
+                    item.setCreateTimestamp(now);
+                    saveList.add(item);
+                    existRootSourceIds.add(rootSourceId);
                 }
-                int firstLevelCount = Integer.parseInt((String) record.get(1));
-                if (firstLevelCount == 0) {
-                    continue;
+                if (outDataList.size() < odpsPageSize) {
+                    break;
                 }
-                item.setDateStr(dt);
-                Long planId = rootSourceIdMap.get(rootSourceId);
-                Long videoId = planVideoMap.get(planId);
-                Double score = videoScoreMap.get(videoId);
-                if (Objects.nonNull(score)) {
-                    BigDecimal num = BigDecimal.valueOf(score);
-                    BigDecimal rounded = num.setScale(2, RoundingMode.HALF_UP);
-                    item.setScore(rounded.doubleValue());
+                odpsPageNum++;
+            }
+            // 当前批次处理完后立即入库,第一批先删再插,后续批次直接追加
+            if (CollectionUtils.isNotEmpty(saveList)) {
+                if (firstBatch) {
+                    dataStatMapperExt.deleteQwDatastat(dt);
+                    firstBatch = false;
                 }
-                item.setRootSourceId(rootSourceId);
-                item.setFirstLevelCount(firstLevelCount);
-                item.setCreateTimestamp(now);
-                saveList.add(item);
-                existRootSourceIds.add(rootSourceId);
+                dataStatMapperExt.batchInsertQwDatastat(saveList);
             }
-            if (outDataList.size() < pageSize) {
+            // 清除当前批次数据,释放内存
+            planMap.clear();
+            rootSourceIdMap.clear();
+            planVideoList.clear();
+            planVideoMap.clear();
+            videoScoreMap.clear();
+            saveList.clear();
+            if (qwPlanPage.size() < qwPlanPageSize) {
                 break;
             }
-            pageNum++;
-        }
-        if (CollectionUtils.isNotEmpty(saveList)) {
-            dataStatMapperExt.deleteQwDatastat(dt);
-            dataStatMapperExt.batchInsertQwDatastat(saveList);
+            qwPlanPageNum++;
         }
         return ReturnT.SUCCESS;
     }
 
-    private List<ContentPlatformQwPlan> getAllQwPlan() {
+    private List<ContentPlatformQwPlan> getQwPlanByPage(int pageNum, int pageSize) {
         ContentPlatformQwPlanExample example = new ContentPlatformQwPlanExample();
+        Page page = new Page(pageNum, pageSize);
+        example.setPage(page);
         return qwPlanMapper.selectByExample(example);
     }