Просмотр исходного кода

Merge branch '20260416-wyp-externalChannel'

wangyunpeng 1 месяц назад
Родитель
Сommit
4fedeba12d

+ 18 - 0
api-module/src/main/java/com/tzld/piaoquan/api/component/RecommendApiService.java

@@ -7,6 +7,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
 /**
  * 推荐API服务
  */
@@ -36,6 +38,22 @@ public class RecommendApiService {
         }
         return null;
     }
+    /**
+     * 获取最近N天 rootSourceId
+     */
+    public List<String> getArticleByRootSourceId(String rootSourceIdPrefix, Integer days) {
+        try {
+            String url = recommendApiHost + "/article/getNearDaysRootSourceIds?rootSourceIdPrefix=" + rootSourceIdPrefix + "&days=" + days;
+            String response = httpPoolClient.get(url);
+            JSONObject res = JSONObject.parseObject(response);
+            if (res.getInteger("code") == 0 && res.getJSONArray("data") != null) {
+                return res.getJSONArray("data").toJavaList(String.class);
+            }
+        } catch (Exception e) {
+            log.error("获取文章信息失败, rootSourceIdPrefix={} days={}", rootSourceIdPrefix, days, e);
+        }
+        return null;
+    }
     /**
      * 根据rootSourceId获取合作方文章信息
      */

+ 7 - 0
api-module/src/main/java/com/tzld/piaoquan/api/controller/ExternalController.java

@@ -36,4 +36,11 @@ public class ExternalController {
         return CommonResponse.success();
     }
 
+    @ApiOperation(value = "历史数据导入")
+    @GetMapping("/job/initExternalChannelHistory")
+    public CommonResponse<Null> initExternalChannelHistory(@RequestParam String days) {
+        job.initExternalChannelHistory(days);
+        return CommonResponse.success();
+    }
+
 }

+ 16 - 1
api-module/src/main/java/com/tzld/piaoquan/api/dao/mapper/contentplatform/ExternalChannelMapperExt.java

@@ -29,4 +29,19 @@ public interface ExternalChannelMapperExt {
      * @return 更新的记录数
      */
     int markFailedForTimeoutRecords(@Param("timeoutTime") Date timeoutTime);
-}
+
+    /**
+     * 检查 rootSourceId 是否已存在
+     * @param rootSourceId 根来源ID
+     * @return 存在返回1,不存在返回0
+     */
+    int checkRootSourceIdExists(@Param("rootSourceId") String rootSourceId);
+
+    /**
+     * 批量插入待处理记录(忽略重复)
+     * 使用 INSERT IGNORE 实现去重
+     * @param list 待插入记录列表
+     * @return 插入成功的记录数
+     */
+    int batchInsertIgnore(@Param("list") List<ExternalChannel> list);
+} 

+ 328 - 54
api-module/src/main/java/com/tzld/piaoquan/api/job/ExternalChannelProcessJob.java

@@ -23,9 +23,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * 外部渠道处理定时任务
@@ -92,16 +94,24 @@ public class ExternalChannelProcessJob {
     private static final String PREFIX_LONGARTICLES_OUTER = "longArticles_outer";
     private static final String PREFIX_LONGARTICLES_INNER = "longArticles_inner_";
 
-    /** 默认最大重试天数,超过此天数的待处理记录不再查询 */
+    /**
+     * 默认最大重试天数,超过此天数的待处理记录不再查询
+     */
     private static final int DEFAULT_MAX_RETRY_DAYS = 3;
 
-    /** 每次查询的记录数限制 */
+    /**
+     * 每次查询的记录数限制
+     */
     private static final int QUERY_LIMIT = 100;
 
-    /** 分布式锁前缀 */
+    /**
+     * 分布式锁前缀
+     */
     private static final String LOCK_PREFIX = "external_channel:process:";
 
-    /** 分布式锁过期时间(秒) */
+    /**
+     * 分布式锁过期时间(秒)
+     */
     private static final long LOCK_EXPIRE_SECONDS = 300;
 
     /**
@@ -112,7 +122,7 @@ public class ExternalChannelProcessJob {
     @XxlJob("processExternalChannelJob")
     public ReturnT<String> processExternalChannel(String param) {
         log.info("开始处理外部渠道待处理记录, 最大重试天数={}", DEFAULT_MAX_RETRY_DAYS);
-        
+
         try {
             // 提前计算时间阈值,避免在SQL中进行日期计算
             Date now = new Date();
@@ -123,27 +133,38 @@ public class ExternalChannelProcessJob {
             if (failedCount > 0) {
                 log.info("已将{}条创建时间早于{}的记录标记为失败", failedCount, beforeDaysTime);
             }
-            
-            // 1. 查询所有待处理的记录(status=0),仅查询指定时间之后的记录
-            List<ExternalChannel> pendingList = externalChannelMapperExt.selectPendingList(QUERY_LIMIT, beforeDaysTime);
-            
-            if (CollectionUtils.isEmpty(pendingList)) {
-                log.info("没有待处理的外部渠道记录");
-                return ReturnT.SUCCESS;
-            }
-            
-            log.info("找到{}条待处理记录", pendingList.size());
-            
-            // 2. 逐条处理
-            for (ExternalChannel record : pendingList) {
-                try {
-                    processSingleRecord(record);
-                } catch (Exception e) {
-                    log.error("处理记录异常, id={}, rootSourceId={}", record.getId(), record.getRootSourceId(), e);
+
+            // 1. 循环分页查询并处理待处理的记录(status=0),仅查询指定时间之后的记录
+            int totalProcessed = 0;
+            int pageNum = 0;
+            List<ExternalChannel> pendingList;
+
+            do {
+                pageNum++;
+                pendingList = externalChannelMapperExt.selectPendingList(QUERY_LIMIT, beforeDaysTime);
+
+                if (CollectionUtils.isEmpty(pendingList)) {
+                    if (pageNum == 1) {
+                        log.info("没有待处理的外部渠道记录");
+                    }
+                    break;
                 }
-            }
-            
-            log.info("外部渠道处理完成");
+
+                log.info("第{}页, 找到{}条待处理记录", pageNum, pendingList.size());
+
+                // 2. 逐条处理
+                for (ExternalChannel record : pendingList) {
+                    try {
+                        processSingleRecord(record);
+                        totalProcessed++;
+                    } catch (Exception e) {
+                        log.error("处理记录异常, id={}, rootSourceId={}", record.getId(), record.getRootSourceId(), e);
+                    }
+                }
+
+            } while (pendingList.size() >= QUERY_LIMIT);
+
+            log.info("外部渠道处理完成, 共处理{}条记录", totalProcessed);
             return ReturnT.SUCCESS;
         } catch (Exception e) {
             log.error("处理外部渠道记录时发生异常", e);
@@ -158,7 +179,7 @@ public class ExternalChannelProcessJob {
     private void processSingleRecord(ExternalChannel record) {
         String rootSourceId = record.getRootSourceId();
         Long recordId = record.getId();
-        
+
         if (StringUtils.isBlank(rootSourceId)) {
             log.warn("rootSourceId为空,标记为未知渠道, id={}", recordId);
             record.setChannel("未知渠道-rootSourceId为空");
@@ -170,24 +191,24 @@ public class ExternalChannelProcessJob {
         String lockKey = LOCK_PREFIX + recordId;
         String lockValue = String.valueOf(System.currentTimeMillis());
         boolean locked = false;
-        
+
         try {
             locked = redisUtils.tryLock(lockKey, lockValue, LOCK_EXPIRE_SECONDS);
             if (!locked) {
                 log.info("获取分布式锁失败,跳过处理, id={}, rootSourceId={}", recordId, rootSourceId);
                 return;
             }
-            
+
             // 判断渠道类型并处理
             ChannelType channelType = determineChannelType(rootSourceId);
-            
+
             if (channelType == null) {
                 log.warn("无法识别渠道类型, id={}, rootSourceId={}", recordId, rootSourceId);
                 record.setChannel("未知渠道-前缀不匹配");
                 updateRecordProcessed(record, ExternalChannelStatusEnum.UNKNOWN_CHANNEL.getVal());
                 return;
             }
-            
+
             switch (channelType) {
                 case MINIAPP_TOULIU:
                     processMiniAppTouliu(record);
@@ -243,7 +264,7 @@ public class ExternalChannelProcessJob {
             return ChannelType.GZH_COOPERATE_JIZHUAN;
         } else if (rootSourceId.startsWith(PREFIX_DYYQW) || rootSourceId.startsWith(PREFIX_TOULIU_TENCENTWBqw)) {
             return ChannelType.QW_COOPERATE;
-        } else if (rootSourceId.startsWith(PREFIX_TOULIU_TENCENTGZH) 
+        } else if (rootSourceId.startsWith(PREFIX_TOULIU_TENCENTGZH)
                 || rootSourceId.startsWith(PREFIX_TOULIU_TENCENTGZHARTICLE)
                 || rootSourceId.startsWith(PREFIX_GZHTOULIU_ARTICLES)) {
             return ChannelType.GZH_TOULIU;
@@ -376,9 +397,9 @@ public class ExternalChannelProcessJob {
         String rootSourceId = record.getRootSourceId();
         Long recordId = record.getId();
         log.info("处理小程序投流, id={}, rootSourceId={}", recordId, rootSourceId);
-        
+
         record.setChannel("小程序投流-稳定");
-        
+
         // 调用AD API获取创意信息
         try {
             AdPutTencentCreative creative = adApiService.getCreativeByRootSourceId(rootSourceId);
@@ -410,26 +431,26 @@ public class ExternalChannelProcessJob {
         String rootSourceId = record.getRootSourceId();
         Long recordId = record.getId();
         log.info("处理公众号合作-即转, id={}, rootSourceId={}", recordId, rootSourceId);
-        
+
         record.setChannel("公众号合作-即转-稳定");
         record.setCardId(rootSourceId);
-        
+
         try {
             // 查询cgi_reply_bucket_data获取ghId
             CgiReplyBucketDataExample example = new CgiReplyBucketDataExample();
             example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andIsDeleteEqualTo(0);
             example.setOrderByClause("id desc");
             List<CgiReplyBucketData> bucketDataList = cgiReplyBucketDataMapper.selectByExample(example);
-            
+
             if (!CollectionUtils.isEmpty(bucketDataList)) {
                 String ghId = bucketDataList.get(0).getGhId();
                 record.setAccountId(ghId);
-                
+
                 // 查询content_platform_gzh_account获取合作方
                 ContentPlatformGzhAccountExample accountExample = new ContentPlatformGzhAccountExample();
                 accountExample.createCriteria().andGhIdEqualTo(ghId).andStatusEqualTo(1);
                 List<ContentPlatformGzhAccount> accountList = gzhAccountMapper.selectByExample(accountExample);
-                
+
                 if (!CollectionUtils.isEmpty(accountList)) {
                     record.setPartnerId(String.valueOf(accountList.get(0).getCreateAccountId()));
                 }
@@ -453,7 +474,7 @@ public class ExternalChannelProcessJob {
         String rootSourceId = record.getRootSourceId();
         Long recordId = record.getId();
         log.info("处理群/企微合作, id={}, rootSourceId={}", recordId, rootSourceId);
-        
+
         record.setChannel("群/企微合作-稳定");
         record.setCardId(rootSourceId);
 
@@ -462,7 +483,7 @@ public class ExternalChannelProcessJob {
             ContentPlatformQwPlanExample example = new ContentPlatformQwPlanExample();
             example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1);
             List<ContentPlatformQwPlan> planList = qwPlanMapper.selectByExample(example);
-            
+
             if (!CollectionUtils.isEmpty(planList)) {
                 ContentPlatformQwPlan plan = planList.get(0);
                 record.setPartnerId(String.valueOf(plan.getCreateAccountId()));
@@ -487,7 +508,7 @@ public class ExternalChannelProcessJob {
         String rootSourceId = record.getRootSourceId();
         Long recordId = record.getId();
         log.info("处理公众号投流, id={}, rootSourceId={}", recordId, rootSourceId);
-        
+
         record.setChannel("公众号投流-稳定");
         record.setCardId(rootSourceId);
 
@@ -497,7 +518,7 @@ public class ExternalChannelProcessJob {
             example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andIsDeleteEqualTo(0);
             example.setOrderByClause("id desc");
             List<CgiReplyBucketData> bucketDataList = cgiReplyBucketDataMapper.selectByExample(example);
-            
+
             if (!CollectionUtils.isEmpty(bucketDataList)) {
                 CgiReplyBucketData bucketData = bucketDataList.get(0);
                 record.setAccountId(bucketData.getGhId());
@@ -537,19 +558,19 @@ public class ExternalChannelProcessJob {
         String rootSourceId = record.getRootSourceId();
         Long recordId = record.getId();
         log.info("处理服务号合作-Daily, id={}, rootSourceId={}", recordId, rootSourceId);
-        
+
         record.setChannel("服务号合作-Daily-自选");
-        
+
         try {
             // 查询content_platform_gzh_plan_video
             ContentPlatformGzhPlanVideoExample videoExample = new ContentPlatformGzhPlanVideoExample();
             videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1);
             List<ContentPlatformGzhPlanVideo> videoList = gzhPlanVideoMapper.selectByExample(videoExample);
-            
+
             if (!CollectionUtils.isEmpty(videoList)) {
                 ContentPlatformGzhPlanVideo video = videoList.get(0);
                 record.setPartnerId(String.valueOf(video.getCreateAccountId()));
-                
+
                 // 关联content_platform_gzh_plan获取公众号
                 ContentPlatformGzhPlan plan = gzhPlanMapper.selectByPrimaryKey(video.getPlanId());
                 if (plan != null) {
@@ -586,7 +607,7 @@ public class ExternalChannelProcessJob {
         log.info("处理服务号投流-Daily, id={}, rootSourceId={}", recordId, rootSourceId);
         record.setChannel("服务号投流-Daily");
         record.setCardId(rootSourceId);
-        
+
         try {
             // 通过 recommendApiService 获取文章信息
             JSONObject article = recommendApiService.getArticleByRootSourceId(rootSourceId);
@@ -619,7 +640,7 @@ public class ExternalChannelProcessJob {
         log.info("处理服务号投流-即转, id={}, rootSourceId={}", recordId, rootSourceId);
         record.setChannel("服务号投流-即转");
         record.setCardId(rootSourceId);
-        
+
         try {
             // 查询cgi_reply_bucket_data获取ghId和pagePath
             CgiReplyBucketDataExample example = new CgiReplyBucketDataExample();
@@ -666,20 +687,20 @@ public class ExternalChannelProcessJob {
         String rootSourceId = record.getRootSourceId();
         Long recordId = record.getId();
         log.info("处理公众号合作-Daily, id={}, rootSourceId={}", recordId, rootSourceId);
-        
+
         record.setChannel("公众号合作-Daily-自选");
-        
+
         try {
             // 查询content_platform_gzh_plan_video
             ContentPlatformGzhPlanVideoExample videoExample = new ContentPlatformGzhPlanVideoExample();
             videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1);
             List<ContentPlatformGzhPlanVideo> videoList = gzhPlanVideoMapper.selectByExample(videoExample);
-            
+
             if (!CollectionUtils.isEmpty(videoList)) {
                 ContentPlatformGzhPlanVideo video = videoList.get(0);
                 record.setPartnerId(String.valueOf(video.getCreateAccountId()));
                 record.setArticleId(String.valueOf(video.getVideoId()));
-                
+
                 // 关联content_platform_gzh_plan获取公众号
                 ContentPlatformGzhPlan plan = gzhPlanMapper.selectByPrimaryKey(video.getPlanId());
                 if (plan != null) {
@@ -714,7 +735,7 @@ public class ExternalChannelProcessJob {
         String rootSourceId = record.getRootSourceId();
         Long recordId = record.getId();
         log.info("处理公众号买号, id={}, rootSourceId={}", recordId, rootSourceId);
-        
+
         record.setChannel("公众号买号");
         record.setCardId(rootSourceId);
 
@@ -748,7 +769,7 @@ public class ExternalChannelProcessJob {
         String rootSourceId = record.getRootSourceId();
         Long recordId = record.getId();
         log.info("处理公众号代运营-Daily, id={}, rootSourceId={}", recordId, rootSourceId);
-        
+
         record.setChannel("公众号代运营-Daily-系统");
         record.setCardId(rootSourceId);
 
@@ -786,4 +807,257 @@ public class ExternalChannelProcessJob {
         log.info("记录处理完成, id={}, channel={}, status={}", record.getId(), record.getChannel(), status);
     }
 
+    /**
+     * 批量插入的批次大小
+     */
+    private static final int BATCH_INSERT_SIZE = 500;
+
+    /**
+     * 默认历史数据天数
+     */
+    private static final int DEFAULT_HISTORY_DAYS = 30;
+
+    /**
+     * 初始化历史数据
+     * 参数格式:days=30 或 channel=GZH_TOULIU,days=30
+     * days 表示距当日的间隔天数,不传默认30天
+     * channel 可选,不传则初始化所有已支持的渠道
+     *
+     * @param param 参数字符串
+     * @return 执行结果
+     */
+    @XxlJob("initExternalChannelHistory")
+    public ReturnT<String> initExternalChannelHistory(String param) {
+        log.info("开始初始化外部渠道历史数据, param={}", param);
+
+        // 解析参数
+        int days = DEFAULT_HISTORY_DAYS;
+        if (StringUtils.isNotBlank(param)) {
+            days = Integer.parseInt(param);
+        }
+
+        // 根据天数计算起始时间
+        Date now = new Date();
+        Date startDate = new Date(now.getTime() - TimeUnit.DAYS.toMillis(days));
+
+        log.info("初始化历史数据, days={}, startDate={}", days, startDate);
+
+        try {
+            int totalCount = 0;
+
+            // 初始化所有已支持的渠道
+            totalCount += initChannelHistory(ChannelType.MINIAPP_TOULIU, startDate, days);
+            totalCount += initChannelHistory(ChannelType.GZH_COOPERATE_JIZHUAN, startDate, days);
+            totalCount += initChannelHistory(ChannelType.QW_COOPERATE, startDate, days);
+            totalCount += initChannelHistory(ChannelType.GZH_TOULIU, startDate, days);
+            totalCount += initChannelHistory(ChannelType.FWH_COOPERATE_DAILY, startDate, days);
+            totalCount += initChannelHistory(ChannelType.FWH_TOULIU_DAILY, startDate, days);
+            totalCount += initChannelHistory(ChannelType.FWH_TOULIU_REPLY, startDate, days);
+            totalCount += initChannelHistory(ChannelType.GZH_COOPERATE_DAILY, startDate, days);
+            totalCount += initChannelHistory(ChannelType.GZH_BUY_ACCOUNT, startDate, days);
+            totalCount += initChannelHistory(ChannelType.GZH_OPERATION_DAILY, startDate, days);
+
+            log.info("历史数据初始化完成, 共插入{}条记录", totalCount);
+            return new ReturnT<>(ReturnT.SUCCESS_CODE, "初始化完成,共插入" + totalCount + "条记录");
+        } catch (Exception e) {
+            log.error("初始化历史数据异常", e);
+            return new ReturnT<>(ReturnT.FAIL_CODE, "初始化失败:" + e.getMessage());
+        }
+    }
+
+    /**
+     * 初始化指定渠道的历史数据
+     *
+     * @param channelType 渠道类型
+     * @param startDate   起始日期
+     * @return 插入的记录数
+     */
+    private int initChannelHistory(ChannelType channelType, Date startDate, int days) {
+        log.info("初始化渠道历史数据, channelType={}, startDate={}", channelType, startDate);
+
+        List<String> rootSourceIds = new ArrayList<>();
+        Date now = new Date();
+
+        switch (channelType) {
+            case MINIAPP_TOULIU:
+                // 小程序投流: 历史已手动导入一年
+                break;
+            case GZH_COOPERATE_JIZHUAN:
+                // 公众号合作-即转:从cgi_reply_bucket_data查询dyyjs_开头的rootSourceId
+                //rootSourceIds = queryRootSourceIdsFromCgiReply(PREFIX_DYYJS, startDate);
+                break;
+            case QW_COOPERATE:
+                // 群/企微合作:从content_platform_qw_plan查询
+                rootSourceIds = queryRootSourceIdsFromQwPlan(startDate);
+                break;
+            case GZH_TOULIU:
+                // 公众号投流:从cgi_reply_bucket_data查询多种前缀
+                rootSourceIds.addAll(queryRootSourceIdsFromCgiReply(PREFIX_TOULIU_TENCENTGZH, startDate));
+                rootSourceIds.addAll(queryRootSourceIdsFromCgiReply(PREFIX_TOULIU_TENCENTGZHARTICLE, startDate));
+                rootSourceIds.addAll(queryRootSourceIdsFromCgiReply(PREFIX_GZHTOULIU_ARTICLES, startDate));
+                break;
+            case FWH_COOPERATE_DAILY:
+                // 服务号合作-Daily:从content_platform_gzh_plan_video查询
+                rootSourceIds = queryRootSourceIdsFromGzhPlanVideo(PREFIX_FWHHZDYy, startDate);
+                rootSourceIds.addAll(queryRootSourceIdsFromGzhPlanVideo(PREFIX_FWHDYY, startDate));
+                break;
+            case FWH_TOULIU_REPLY:
+                // 服务号投流-即转:从cgi_reply_bucket_data查询fwhtouliu_tencentgzh开头
+                rootSourceIds = queryRootSourceIdsFromCgiReply(PREFIX_FWHTOULIU_TENCENTGZH, startDate);
+                break;
+            case FWH_TOULIU_DAILY:
+                // 服务号投流-Daily:从aigc getNearDaysRootSourceIds接口查询fwhtouliu_开头
+                rootSourceIds = queryRootSourceIdsFromAigc(PREFIX_FWHTOULIU, days);
+                break;
+            case GZH_COOPERATE_DAILY:
+                // 公众号合作-Daily:从content_platform_gzh_plan_video查询longArticles_开头
+                rootSourceIds = queryRootSourceIdsFromGzhPlanVideo(PREFIX_LONGARTICLES, startDate);
+                break;
+            case GZH_BUY_ACCOUNT:
+                // 公众号买号:从aigc getNearDaysRootSourceIds接口查询longArticles_inner_开头
+                rootSourceIds = queryRootSourceIdsFromAigc(PREFIX_LONGARTICLES, days);
+                break;
+            case GZH_OPERATION_DAILY:
+                // 公众号代运营-Daily:从aigc getNearDaysRootSourceIds接口查询longArticles_inner_开头
+                rootSourceIds = queryRootSourceIdsFromAigc(PREFIX_LONGARTICLES, days);
+                break;
+            default:
+                return 0;
+        }
+
+        if (CollectionUtils.isEmpty(rootSourceIds)) {
+            log.info("渠道{}未找到历史数据", channelType);
+            return 0;
+        }
+
+        log.info("渠道{}找到{}条历史数据记录", channelType, rootSourceIds.size());
+
+        // 构建待插入记录列表
+        List<ExternalChannel> records = new ArrayList<>();
+        for (String rootSourceId : rootSourceIds) {
+            ExternalChannel record = new ExternalChannel();
+            record.setRootSourceId(rootSourceId);
+            record.setStatus(ExternalChannelStatusEnum.PENDING.getVal());
+            record.setIsDelete(0);
+            record.setCreateTime(now);
+            record.setUpdateTime(now);
+            records.add(record);
+        }
+
+        // 分批插入
+        int insertedCount = 0;
+        for (int i = 0; i < records.size(); i += BATCH_INSERT_SIZE) {
+            int end = Math.min(i + BATCH_INSERT_SIZE, records.size());
+            List<ExternalChannel> batch = records.subList(i, end);
+            int count = externalChannelMapperExt.batchInsertIgnore(batch);
+            insertedCount += count;
+            log.info("渠道{}批次插入完成, 本批{}, 累计插入{}", channelType, count, insertedCount);
+        }
+
+        log.info("渠道{}历史数据初始化完成, 插入{}条", channelType, insertedCount);
+        return insertedCount;
+    }
+
+    /**
+     * 从aigc getNearDaysRootSourceIds接口查询指定前缀的rootSourceId列表
+     *
+     * @param prefix 前缀
+     * @param days   天数
+     * @return rootSourceId列表
+     */
+    private List<String> queryRootSourceIdsFromAigc(String prefix, int days) {
+        List<String> result = new ArrayList<>();
+        try {
+            List<String> list = recommendApiService.getArticleByRootSourceId(prefix, days);
+            if (!CollectionUtils.isEmpty(list)) {
+                result = list;
+            }
+        } catch (Exception e) {
+            log.error("从aigc查询历史数据失败, prefix={}, days={}", prefix, days, e);
+        }
+        return result;
+    }
+
+    /**
+     * 从cgi_reply_bucket_data表查询指定前缀的rootSourceId列表
+     *
+     * @param prefix    前缀
+     * @param startDate 起始日期
+     * @return rootSourceId列表
+     */
+    private List<String> queryRootSourceIdsFromCgiReply(String prefix, Date startDate) {
+        List<String> result = new ArrayList<>();
+        try {
+            CgiReplyBucketDataExample example = new CgiReplyBucketDataExample();
+            example.createCriteria()
+                    .andRootSourceIdLike(prefix + "%")
+                    .andCreateTimeGreaterThanOrEqualTo(startDate);
+            example.setDistinct(true);
+            List<CgiReplyBucketData> list = cgiReplyBucketDataMapper.selectByExample(example);
+            if (!CollectionUtils.isEmpty(list)) {
+                result = list.stream().map(CgiReplyBucketData::getRootSourceId).collect(Collectors.toList());
+            }
+        } catch (Exception e) {
+            log.error("从cgi_reply_bucket_data查询历史数据失败, prefix={}", prefix, e);
+        }
+        return result;
+    }
+
+    /**
+     * 从content_platform_qw_plan表查询rootSourceId列表
+     *
+     * @param startDate 起始日期
+     * @return rootSourceId列表
+     */
+    private List<String> queryRootSourceIdsFromQwPlan(Date startDate) {
+        List<String> result = new ArrayList<>();
+        try {
+            ContentPlatformQwPlanExample example = new ContentPlatformQwPlanExample();
+            example.createCriteria()
+                    .andStatusEqualTo(1)
+                    .andRootSourceIdIsNotNull()
+                    .andCreateTimestampGreaterThanOrEqualTo(startDate.getTime());
+            List<ContentPlatformQwPlan> list = qwPlanMapper.selectByExample(example);
+            if (!CollectionUtils.isEmpty(list)) {
+                for (ContentPlatformQwPlan plan : list) {
+                    if (StringUtils.isNotBlank(plan.getRootSourceId())) {
+                        result.add(plan.getRootSourceId());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("从content_platform_qw_plan查询历史数据失败", e);
+        }
+        return result;
+    }
+
+    /**
+     * 从content_platform_gzh_plan_video表查询指定前缀的rootSourceId列表
+     *
+     * @param prefix    前缀
+     * @param startDate 起始日期
+     * @return rootSourceId列表
+     */
+    private List<String> queryRootSourceIdsFromGzhPlanVideo(String prefix, Date startDate) {
+        List<String> result = new ArrayList<>();
+        try {
+            ContentPlatformGzhPlanVideoExample example = new ContentPlatformGzhPlanVideoExample();
+            example.createCriteria()
+                    .andRootSourceIdLike(prefix + "%")
+                    .andStatusEqualTo(1)
+                    .andCreateTimestampGreaterThanOrEqualTo(startDate.getTime());
+            List<ContentPlatformGzhPlanVideo> list = gzhPlanVideoMapper.selectByExample(example);
+            if (!CollectionUtils.isEmpty(list)) {
+                for (ContentPlatformGzhPlanVideo video : list) {
+                    if (StringUtils.isNotBlank(video.getRootSourceId())) {
+                        result.add(video.getRootSourceId());
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("从content_platform_gzh_plan_video查询历史数据失败, prefix={}", prefix, e);
+        }
+        return result;
+    }
+
 }

+ 28 - 0
api-module/src/main/resources/mapper/contentplatform/ext/ExternalChannelMapperExt.xml

@@ -45,4 +45,32 @@
           and create_time &lt; #{timeoutTime}
     </update>
 
+    <!-- 检查 rootSourceId 是否已存在 -->
+    <select id="checkRootSourceIdExists" resultType="int">
+        select count(1)
+        from external_channel
+        where root_source_id = #{rootSourceId}
+          and is_delete = 0
+    </select>
+
+    <!-- 批量插入待处理记录(忽略重复) -->
+    <insert id="batchInsertIgnore" parameterType="java.util.List">
+        insert ignore into external_channel (
+            root_source_id,
+            `status`,
+            is_delete,
+            create_time,
+            update_time
+        ) values
+        <foreach collection="list" item="item" separator=",">
+            (
+                #{item.rootSourceId,jdbcType=VARCHAR},
+                0,
+                0,
+                #{item.createTime,jdbcType=TIMESTAMP},
+                #{item.updateTime,jdbcType=TIMESTAMP}
+            )
+        </foreach>
+    </insert>
+
 </mapper>