wangyunpeng 1 месяц назад
Родитель
Сommit
a21fd1e04a

+ 7 - 0
api-module/src/main/java/com/tzld/piaoquan/api/dao/mapper/contentplatform/ExternalChannelMapperExt.java

@@ -37,6 +37,13 @@ public interface ExternalChannelMapperExt {
      */
     int checkRootSourceIdExists(@Param("rootSourceId") String rootSourceId);
 
+    /**
+     * 批量查询已存在的rootSourceId
+     * @param rootSourceIds rootSourceId列表
+     * @return 已存在的rootSourceId列表
+     */
+    List<String> selectExistingRootSourceIds(@Param("rootSourceIds") List<String> rootSourceIds);
+
     /**
      * 批量插入待处理记录(忽略重复)
      * 使用 INSERT IGNORE 实现去重

+ 20 - 3
api-module/src/main/java/com/tzld/piaoquan/api/job/ExternalChannelProcessJob.java

@@ -949,9 +949,27 @@ public class ExternalChannelProcessJob {
         for (int i = 0; i < records.size(); i += BATCH_INSERT_SIZE) {
             int end = Math.min(i + BATCH_INSERT_SIZE, records.size());
             List<ExternalChannel> batch = records.subList(i, end);
-            int count = externalChannelMapperExt.batchInsertIgnore(batch);
+
+            // 批量查询已存在的rootSourceId,过滤掉已存在的记录
+            List<String> batchRootSourceIds = batch.stream()
+                    .map(ExternalChannel::getRootSourceId)
+                    .collect(Collectors.toList());
+            List<String> existingIds = externalChannelMapperExt.selectExistingRootSourceIds(batchRootSourceIds);
+            List<ExternalChannel> filteredBatch = batch;
+            if (!CollectionUtils.isEmpty(existingIds)) {
+                filteredBatch = batch.stream()
+                        .filter(r -> !existingIds.contains(r.getRootSourceId()))
+                        .collect(Collectors.toList());
+            }
+
+            if (CollectionUtils.isEmpty(filteredBatch)) {
+                log.info("渠道{}本批全部已存在, 跳过", channelType);
+                continue;
+            }
+
+            int count = externalChannelMapperExt.batchInsertIgnore(filteredBatch);
             insertedCount += count;
-            log.info("渠道{}批次插入完成, 本批{}, 累计插入{}", channelType, count, insertedCount);
+            log.info("渠道{}批次插入完成, 本批{}, 过滤已存在{}, 累计插入{}", channelType, filteredBatch.size(), existingIds != null ? existingIds.size() : 0, insertedCount);
         }
 
         log.info("渠道{}历史数据初始化完成, 插入{}条", channelType, insertedCount);
@@ -1044,7 +1062,6 @@ public class ExternalChannelProcessJob {
             ContentPlatformGzhPlanVideoExample example = new ContentPlatformGzhPlanVideoExample();
             example.createCriteria()
                     .andRootSourceIdLike(prefix + "%")
-                    .andStatusEqualTo(1)
                     .andCreateTimestampGreaterThanOrEqualTo(startDate.getTime());
             List<ContentPlatformGzhPlanVideo> list = gzhPlanVideoMapper.selectByExample(example);
             if (!CollectionUtils.isEmpty(list)) {

+ 11 - 0
api-module/src/main/resources/mapper/contentplatform/ext/ExternalChannelMapperExt.xml

@@ -53,6 +53,17 @@
           and is_delete = 0
     </select>
 
+    <!-- 批量查询已存在的rootSourceId -->
+    <select id="selectExistingRootSourceIds" resultType="java.lang.String">
+        select root_source_id
+        from external_channel
+        where is_delete = 0
+        and root_source_id in
+        <foreach collection="rootSourceIds" item="item" open="(" separator="," close=")">
+            #{item}
+        </foreach>
+    </select>
+
     <!-- 批量插入待处理记录(忽略重复) -->
     <insert id="batchInsertIgnore" parameterType="java.util.List">
         insert ignore into external_channel (