Przeglądaj źródła

fix qw datastat

wangyunpeng 2 tygodni temu
rodzic
commit
451bdb3f63

+ 34 - 10
api-module/src/main/java/com/tzld/piaoquan/api/job/contentplatform/ContentPlatformDatastatJob.java

@@ -606,11 +606,19 @@ public class ContentPlatformDatastatJob {
                 .collect(Collectors.toMap(ContentPlatformVideoAgg::getVideoId, ContentPlatformVideoAgg::getScore, (a, b) -> a));
         List<ContentPlatformQwDataStat> saveList = new ArrayList<>();
         List<String> rootSourceIds = qwPlanList.stream().map(ContentPlatformQwPlan::getRootSourceId).collect(Collectors.toList());
-        String outSql = String.format("SELECT * FROM loghubods.qw_out_touliu_behavior_detail WHERE dt=%s;", dt);
-        List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
-        Long now = System.currentTimeMillis();
         List<String> existRootSourceIds = new ArrayList<>();
-        if (CollectionUtils.isNotEmpty(outDataList)) {
+        Long now = System.currentTimeMillis();
+        int pageSize = 5000;
+        int pageNum = 1;
+        while (true) {
+            Integer offset = (pageNum - 1) * pageSize;
+            String outSql = String.format("SELECT rootsourceid, 首层访问人数 " +
+                    "FROM loghubods.qw_out_touliu_behavior_detail " +
+                    "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, pageSize);
+            List<Record> outDataList = OdpsUtil.getOdpsData(outSql);
+            if (CollectionUtils.isEmpty(outDataList)) {
+                break;
+            }
             for (Record record : outDataList) {
                 ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
                 String rootSourceId = (String) record.get(0);
@@ -620,7 +628,7 @@ public class ContentPlatformDatastatJob {
                 if (existRootSourceIds.contains(rootSourceId)) {
                     continue;
                 }
-                int firstLevelCount = Integer.parseInt((String) record.get(8));
+                int firstLevelCount = Integer.parseInt((String) record.get(1));
                 if (firstLevelCount == 0) {
                     continue;
                 }
@@ -641,11 +649,22 @@ public class ContentPlatformDatastatJob {
                 saveList.add(item);
                 existRootSourceIds.add(rootSourceId);
             }
+            if (outDataList.size() < pageSize) {
+                break;
+            }
+            pageNum++;
         }
-        String out2Sql = String.format("SELECT * FROM loghubods.qw_out2_touliu_behavior_detail WHERE dt=%s;", dt);
-        List<Record> out2DataList = OdpsUtil.getOdpsData(out2Sql);
-        if (CollectionUtils.isNotEmpty(out2DataList)) {
-            for (Record record : out2DataList) {
+        pageNum = 1;
+        while (true) {
+            Integer offset = (pageNum - 1) * pageSize;
+            String out2Sql = String.format("SELECT rootsourceid, 首层访问人数 " +
+                    "FROM loghubods.qw_out2_touliu_behavior_detail " +
+                    "WHERE dt=%s and 首层访问人数 > 0 limit %s,%s;", dt, offset, pageSize);
+            List<Record> outDataList = OdpsUtil.getOdpsData(out2Sql);
+            if (CollectionUtils.isEmpty(outDataList)) {
+                break;
+            }
+            for (Record record : outDataList) {
                 ContentPlatformQwDataStat item = new ContentPlatformQwDataStat();
                 String rootSourceId = (String) record.get(0);
                 if (!rootSourceIds.contains(rootSourceId)) {
@@ -654,7 +673,7 @@ public class ContentPlatformDatastatJob {
                 if (existRootSourceIds.contains(rootSourceId)) {
                     continue;
                 }
-                int firstLevelCount = Integer.parseInt((String) record.get(8));
+                int firstLevelCount = Integer.parseInt((String) record.get(1));
                 if (firstLevelCount == 0) {
                     continue;
                 }
@@ -671,7 +690,12 @@ public class ContentPlatformDatastatJob {
                 item.setFirstLevelCount(firstLevelCount);
                 item.setCreateTimestamp(now);
                 saveList.add(item);
+                existRootSourceIds.add(rootSourceId);
+            }
+            if (outDataList.size() < pageSize) {
+                break;
             }
+            pageNum++;
         }
         if (CollectionUtils.isNotEmpty(saveList)) {
             dataStatMapperExt.deleteQwDatastat(dt);