package com.tzld.piaoquan.api.job; import com.alibaba.fastjson.JSONObject; import com.tzld.piaoquan.api.common.enums.contentplatform.ChannelType; import com.tzld.piaoquan.api.common.enums.contentplatform.ExternalChannelStatusEnum; import com.tzld.piaoquan.api.component.AdApiService; import com.tzld.piaoquan.api.component.AigcApiService; import com.tzld.piaoquan.api.component.RecommendApiService; import com.tzld.piaoquan.api.dao.mapper.contentplatform.*; import com.tzld.piaoquan.api.model.bo.AdPutTencentCreative; import com.tzld.piaoquan.api.model.po.contentplatform.*; import com.tzld.piaoquan.api.service.CgiReplyService; import com.tzld.piaoquan.api.service.contentplatform.ContentPlatformPlanService; import com.tzld.piaoquan.growth.common.dao.mapper.CgiReplyBucketDataMapper; import com.tzld.piaoquan.growth.common.dao.mapper.ext.CgiReplyBucketDataMapperExt; import com.tzld.piaoquan.growth.common.model.po.CgiReplyBucketData; import com.tzld.piaoquan.growth.common.model.po.CgiReplyBucketDataExample; import com.tzld.piaoquan.growth.common.utils.RedisUtils; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * 外部渠道处理定时任务 * 处理 external_channel 表中 status=0(待处理)的记录 * 根据 rootSourceId 判断渠道类型,获取合作方及其他内容 * 仅处理创建时间在 DEFAULT_MAX_RETRY_DAYS 天内的记录 */ @Slf4j @Component public class ExternalChannelProcessJob { @Autowired private ExternalChannelMapper externalChannelMapper; @Autowired private ExternalChannelMapperExt externalChannelMapperExt; @Autowired private ContentPlatformGzhAccountMapper gzhAccountMapper; @Autowired private ContentPlatformQwPlanMapper qwPlanMapper; @Autowired private ContentPlatformGzhPlanVideoMapper gzhPlanVideoMapper; @Autowired private ContentPlatformGzhPlanMapper gzhPlanMapper; @Autowired private CgiReplyBucketDataMapper cgiReplyBucketDataMapper; @Autowired private CgiReplyBucketDataMapperExt cgiReplyBucketDataMapperExt; @Autowired private CgiReplyService cgiReplyService; @Autowired private ContentPlatformPlanService contentPlatformPlanService; @Autowired private AdApiService adApiService; @Autowired private AigcApiService aigcApiService; @Autowired private RecommendApiService recommendApiService; @Autowired private RedisUtils redisUtils; // 渠道类型前缀定义 private static final String PREFIX_TOULIU_TENCENT = "touliu_tencent_"; private static final String PREFIX_DYYJS = "dyyjs_"; private static final String PREFIX_DYYQW = "dyyqw_"; private static final String PREFIX_TOULIU_TENCENTWBqw = "touliu_tencentwbqw_"; private static final String PREFIX_TOULIU_TENCENTGZH = "touliu_tencentgzh_"; private static final String PREFIX_TOULIU_TENCENTGZHARTICLE = "touliu_tencentGzhArticle_"; private static final String PREFIX_GZHTOULIU_ARTICLES = "GzhTouLiu_Articles_gh"; private static final String PREFIX_FWHHZDYy = "fwhhzdyy_"; private static final String PREFIX_FWHDYY = "fwhdyy_"; private static final String PREFIX_FWHTOULIU_TENCENTGZH = "fwhtouliu_tencentgzh"; private static final String PREFIX_FWHTOULIU = "fwhtouliu_"; private static final String PREFIX_LONGARTICLES = "longArticles_"; private static final String PREFIX_LONGARTICLES_OUTER = "longArticles_outer"; private static final String PREFIX_LONGARTICLES_INNER = "longArticles_inner_"; /** * 默认最大重试天数,超过此天数的待处理记录不再查询 */ private static final int DEFAULT_MAX_RETRY_DAYS = 3; /** * 每次查询的记录数限制 */ private static final int QUERY_LIMIT = 100; /** * 分布式锁前缀 */ private static final String LOCK_PREFIX = "external_channel:process:"; /** * 分布式锁过期时间(秒) */ private static final long LOCK_EXPIRE_SECONDS = 300; /** * 处理外部渠道待处理记录 * 1. 先将超过 DEFAULT_MAX_RETRY_DAYS 天的待处理记录标记为 FAILED * 2. 仅处理创建时间在 DEFAULT_MAX_RETRY_DAYS 天内的待处理记录 */ @XxlJob("processExternalChannelJob") public ReturnT processExternalChannel(String param) { log.info("开始处理外部渠道待处理记录, 最大重试天数={}", DEFAULT_MAX_RETRY_DAYS); try { // 提前计算时间阈值,避免在SQL中进行日期计算 Date now = new Date(); Date beforeDaysTime = new Date(now.getTime() - TimeUnit.DAYS.toMillis(DEFAULT_MAX_RETRY_DAYS)); // 0. 先将超过天数的待处理记录标记为失败 int failedCount = externalChannelMapperExt.markFailedForTimeoutRecords(beforeDaysTime); if (failedCount > 0) { log.info("已将{}条创建时间早于{}的记录标记为失败", failedCount, beforeDaysTime); } // 1. 循环分页查询并处理待处理的记录(status=0),使用id>lastId方式分页 int totalProcessed = 0; int pageNum = 0; long lastId = 0; List pendingList; do { pageNum++; pendingList = externalChannelMapperExt.selectPendingList(lastId, QUERY_LIMIT, beforeDaysTime); if (CollectionUtils.isEmpty(pendingList)) { if (pageNum == 1) { log.info("没有待处理的外部渠道记录"); } break; } // 更新lastId为本页最后一条记录的id lastId = pendingList.get(pendingList.size() - 1).getId(); log.info("第{}页, 找到{}条待处理记录, lastId={}", pageNum, pendingList.size(), lastId); // 2. 逐条处理 for (ExternalChannel record : pendingList) { try { processSingleRecord(record); totalProcessed++; } catch (Exception e) { log.error("处理记录异常, id={}, rootSourceId={}", record.getId(), record.getRootSourceId(), e); } } } while (pendingList.size() >= QUERY_LIMIT); log.info("外部渠道处理完成, 共处理{}条记录", totalProcessed); return ReturnT.SUCCESS; } catch (Exception e) { log.error("处理外部渠道记录时发生异常", e); return ReturnT.FAIL; } } /** * 处理单条记录 * 根据渠道类型路由到对应的处理方法,使用分布式锁防止并发处理 */ private void processSingleRecord(ExternalChannel record) { String rootSourceId = record.getRootSourceId(); Long recordId = record.getId(); if (StringUtils.isBlank(rootSourceId)) { log.warn("rootSourceId为空,标记为未知渠道, id={}", recordId); record.setChannel("未知渠道-rootSourceId为空"); updateRecordProcessed(record, ExternalChannelStatusEnum.UNKNOWN_CHANNEL.getVal()); return; } // 尝试获取分布式锁 String lockKey = LOCK_PREFIX + recordId; String lockValue = String.valueOf(System.currentTimeMillis()); boolean locked = false; try { locked = redisUtils.tryLock(lockKey, lockValue, LOCK_EXPIRE_SECONDS); if (!locked) { log.info("获取分布式锁失败,跳过处理, id={}, rootSourceId={}", recordId, rootSourceId); return; } // 判断渠道类型并处理 ChannelType channelType = determineChannelType(rootSourceId); if (channelType == null) { log.warn("无法识别渠道类型, id={}, rootSourceId={}", recordId, rootSourceId); record.setChannel("未知渠道-前缀不匹配"); updateRecordProcessed(record, ExternalChannelStatusEnum.UNKNOWN_CHANNEL.getVal()); return; } switch (channelType) { case MINIAPP_TOULIU: processMiniAppTouliu(record); break; case GZH_COOPERATE_JIZHUAN: processGzhCooperateJizhuan(record); break; case QW_COOPERATE: processQwCooperate(record); break; case GZH_TOULIU: processGzhTouliu(record); break; case FWH_COOPERATE_DAILY: processFwhCooperateDaily(record); break; case FWH_TOULIU_DAILY: processFwhTouliuDaily(record); break; case FWH_TOULIU_REPLY: processFwhTouliuReply(record); break; case GZH_COOPERATE_DAILY: processGzhCooperateDaily(record); break; case GZH_BUY_ACCOUNT: processGzhBuyAccount(record); break; case GZH_OPERATION_DAILY: processGzhOperationDaily(record); break; default: log.warn("未处理的渠道类型, id={}, rootSourceId={}, channelType={}", recordId, rootSourceId, channelType); record.setChannel("未知渠道-" + channelType.name()); updateRecordProcessed(record, ExternalChannelStatusEnum.UNKNOWN_CHANNEL.getVal()); } } finally { // 释放分布式锁 if (locked) { redisUtils.del(lockKey); } } } /** * 判断渠道类型 * 根据 rootSourceId 前缀判断所属渠道类型 */ private ChannelType determineChannelType(String rootSourceId) { if (rootSourceId.startsWith(PREFIX_TOULIU_TENCENT)) { return ChannelType.MINIAPP_TOULIU; } else if (rootSourceId.startsWith(PREFIX_DYYJS)) { return ChannelType.GZH_COOPERATE_JIZHUAN; } else if (rootSourceId.startsWith(PREFIX_DYYQW) || rootSourceId.startsWith(PREFIX_TOULIU_TENCENTWBqw)) { return ChannelType.QW_COOPERATE; } else if (rootSourceId.startsWith(PREFIX_TOULIU_TENCENTGZH) || rootSourceId.startsWith(PREFIX_TOULIU_TENCENTGZHARTICLE) || rootSourceId.startsWith(PREFIX_GZHTOULIU_ARTICLES)) { return ChannelType.GZH_TOULIU; } else if (rootSourceId.startsWith(PREFIX_FWHHZDYy) || rootSourceId.startsWith(PREFIX_FWHDYY)) { return ChannelType.FWH_COOPERATE_DAILY; } else if (rootSourceId.startsWith(PREFIX_FWHTOULIU_TENCENTGZH)) { return ChannelType.FWH_TOULIU_REPLY; } else if (rootSourceId.startsWith(PREFIX_FWHTOULIU)) { // 服务号投流需要通过接口判断是daily还是reply return determineFwhTouliuType(rootSourceId); } else if (rootSourceId.startsWith(PREFIX_LONGARTICLES)) { if (rootSourceId.startsWith(PREFIX_LONGARTICLES_OUTER)) { return ChannelType.GZH_COOPERATE_DAILY; } else { return determineLongArticlesType(rootSourceId); } } return null; } /** * 判断服务号投流类型(Daily或即转) * 依次调用 recommendApiService 和 cgiReplyService 判断类型 * 命中 recommend 接口返回 DAILY,命中 cgiReply 接口返回 REPLY */ private ChannelType determineFwhTouliuType(String rootSourceId) { try { Boolean dailyExists = recommendApiService.checkExistRootSourceId(rootSourceId); if (Boolean.TRUE.equals(dailyExists)) { log.info("服务号投流类型判断, rootSourceId={}, dailyExists=true", rootSourceId); return ChannelType.FWH_TOULIU_DAILY; } Boolean replyExists = cgiReplyService.checkExistRootSourceId(rootSourceId); if (Boolean.TRUE.equals(replyExists)) { log.info("服务号投流类型判断, rootSourceId={}, replyExists=true", rootSourceId); return ChannelType.FWH_TOULIU_REPLY; } log.info("服务号投流类型判断, rootSourceId={}, dailyExists=false, replyExists=false", rootSourceId); } catch (Exception e) { log.error("判断服务号投流类型失败, rootSourceId={}", rootSourceId, e); } // 默认返回daily return ChannelType.FWH_TOULIU_DAILY; } /** * 判断 longArticles 类型(inner或outer) * 命名 recommend 接口返回 inner(公众号买号/代运营) * 命名 contentPlatformPlanService 接口返回 outer(公众号合作-Daily) */ private ChannelType determineLongArticlesType(String rootSourceId) { try { Boolean innerExists = recommendApiService.checkExistRootSourceId(rootSourceId); if (Boolean.TRUE.equals(innerExists)) { log.info("longArticles类型判断, rootSourceId={}, innerExists=true", rootSourceId); // 区分 公众号买号 与 公众号代运营 if (isGzhBuyAccount(rootSourceId)) { return ChannelType.GZH_BUY_ACCOUNT; } else { return ChannelType.GZH_OPERATION_DAILY; } } Boolean outerExists = contentPlatformPlanService.gzhPushCheckExistRootSourceId(rootSourceId); if (Boolean.TRUE.equals(outerExists)) { log.info("longArticles类型判断, rootSourceId={}, outerExists=true", rootSourceId); return ChannelType.GZH_COOPERATE_DAILY; } log.info("longArticles类型判断, rootSourceId={}, innerExists=false, outerExists=false", rootSourceId); } catch (Exception e) { log.error("判断longArticles类型失败, rootSourceId={}", rootSourceId, e); } // 默认返回 公众号代运营-Daily return ChannelType.GZH_OPERATION_DAILY; } /** * 判断是否为公众号买号 * 调用 AIGC 接口根据 ghId 判断账号类型 * 返回值包含 "买号" 则为公众号买号,包含 "代运营" 则为公众号代运营 * * @param rootSourceId 根来源ID * @return true-公众号买号, false-公众号代运营 */ private boolean isGzhBuyAccount(String rootSourceId) { try { // 1. 先通过rootSourceId获取ghId JSONObject article = recommendApiService.getArticleByRootSourceId(rootSourceId); if (article == null || StringUtils.isBlank(article.getString("ghId"))) { log.warn("无法获取ghId, rootSourceId={}", rootSourceId); return false; } String ghId = article.getString("ghId"); // 2. 调用AIGC接口判断账号类型 String accountGroupSourceName = aigcApiService.getAccountGroupSourceName(ghId); log.info("判断公众号买号/代运营, rootSourceId={}, ghId={}, accountGroupSourceName={}", rootSourceId, ghId, accountGroupSourceName); if (StringUtils.isBlank(accountGroupSourceName)) { return false; } // 3. 根据返回判断账号类型 if (accountGroupSourceName.contains("买号")) { log.info("判断为公众号买号, rootSourceId={}, ghId={}, accountGroupSourceName={}", rootSourceId, ghId, accountGroupSourceName); return true; } else if (accountGroupSourceName.contains("代运营")) { log.info("判断为公众号代运营, rootSourceId={}, ghId={}, accountGroupSourceName={}", rootSourceId, ghId, accountGroupSourceName); return false; } else { log.warn("无法识别账号类型, rootSourceId={}, ghId={}, accountGroupSourceName={}", rootSourceId, ghId, accountGroupSourceName); return false; } } catch (Exception e) { log.error("判断公众号买号/代运营失败, rootSourceId={}", rootSourceId, e); // 默认返回false,即公众号代运营 return false; } } /** * 1. 小程序投流-稳定 * 渠道判断:rootSourceId 以 touliu_tencent_ 开头 * 处理逻辑:通过 adApiService 获取创意和人群包信息 */ private void processMiniAppTouliu(ExternalChannel record) { String rootSourceId = record.getRootSourceId(); Long recordId = record.getId(); log.info("处理小程序投流, id={}, rootSourceId={}", recordId, rootSourceId); record.setChannel("小程序投流-稳定"); // 调用AD API获取创意信息 try { AdPutTencentCreative creative = adApiService.getCreativeByRootSourceId(rootSourceId); if (creative != null) { record.setCreativeId(String.valueOf(creative.getCreativeId())); // 获取人群包信息 List packageIds = adApiService.getPackageIdByAdId(creative.getAdId()); if (packageIds != null && !packageIds.isEmpty()) { record.setPackageId(JSONObject.toJSONString(packageIds.get(0))); } } } catch (Exception e) { log.error("获取小程序投流信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e); } Integer status = ExternalChannelStatusEnum.PENDING.getVal(); if (StringUtils.isNotBlank(record.getCreativeId())) { status = ExternalChannelStatusEnum.PROCESSED.getVal(); } updateRecordProcessed(record, status); } /** * 2. 公众号合作-即转-稳定 * 渠道判断:rootSourceId 以 dyyjs_ 开头 * 处理逻辑:查询 cgi_reply_bucket_data 获取 ghId,再查询 content_platform_gzh_account 获取合作方 */ private void processGzhCooperateJizhuan(ExternalChannel record) { String rootSourceId = record.getRootSourceId(); Long recordId = record.getId(); log.info("处理公众号合作-即转, id={}, rootSourceId={}", recordId, rootSourceId); record.setChannel("公众号合作-即转-稳定"); record.setCardId(rootSourceId); try { // 查询cgi_reply_bucket_data获取ghId CgiReplyBucketDataExample example = new CgiReplyBucketDataExample(); example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andIsDeleteEqualTo(0); example.setOrderByClause("id desc"); List bucketDataList = cgiReplyBucketDataMapper.selectByExample(example); if (!CollectionUtils.isEmpty(bucketDataList)) { String ghId = bucketDataList.get(0).getGhId(); record.setAccountId(ghId); // 查询content_platform_gzh_account获取合作方 ContentPlatformGzhAccountExample accountExample = new ContentPlatformGzhAccountExample(); accountExample.createCriteria().andGhIdEqualTo(ghId).andStatusEqualTo(1); List accountList = gzhAccountMapper.selectByExample(accountExample); if (!CollectionUtils.isEmpty(accountList)) { record.setPartnerId(String.valueOf(accountList.get(0).getCreateAccountId())); } } } catch (Exception e) { log.error("获取公众号合作信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e); } Integer status = ExternalChannelStatusEnum.PENDING.getVal(); if (StringUtils.isNotBlank(record.getPartnerId())) { status = ExternalChannelStatusEnum.PROCESSED.getVal(); } updateRecordProcessed(record, status); } /** * 3. 群/企微合作-稳定 * 渠道判断:rootSourceId 以 dyyqw_ 或 touliu_tencentwbqw_ 开头 * 处理逻辑:查询 content_platform_qw_plan 获取合作方 */ private void processQwCooperate(ExternalChannel record) { String rootSourceId = record.getRootSourceId(); Long recordId = record.getId(); log.info("处理群/企微合作, id={}, rootSourceId={}", recordId, rootSourceId); record.setChannel("群/企微合作-稳定"); record.setCardId(rootSourceId); try { // 查询content_platform_qw_plan获取合作方 ContentPlatformQwPlanExample example = new ContentPlatformQwPlanExample(); example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1); List planList = qwPlanMapper.selectByExample(example); if (!CollectionUtils.isEmpty(planList)) { ContentPlatformQwPlan plan = planList.get(0); record.setPartnerId(String.valueOf(plan.getCreateAccountId())); } } catch (Exception e) { log.error("获取群/企微合作信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e); } Integer status = ExternalChannelStatusEnum.PENDING.getVal(); if (StringUtils.isNotBlank(record.getPartnerId())) { status = ExternalChannelStatusEnum.PROCESSED.getVal(); } updateRecordProcessed(record, status); } /** * 4. 公众号投流-稳定 * 渠道判断:rootSourceId 以 touliu_tencentgzh_、touliu_tencentGzhArticle_ 或 GzhTouLiu_Articles_gh 开头 * 处理逻辑:查询 cgi_reply_bucket_data 获取 ghId,通过 aigcApiService 获取 creativeId 和人群包信息 */ private void processGzhTouliu(ExternalChannel record) { String rootSourceId = record.getRootSourceId(); Long recordId = record.getId(); log.info("处理公众号投流, id={}, rootSourceId={}", recordId, rootSourceId); record.setChannel("公众号投流-稳定"); record.setCardId(rootSourceId); try { // 查询cgi_reply_bucket_data获取ghId和pagePath CgiReplyBucketDataExample example = new CgiReplyBucketDataExample(); example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andIsDeleteEqualTo(0); example.setOrderByClause("id desc"); List bucketDataList = cgiReplyBucketDataMapper.selectByExample(example); if (!CollectionUtils.isEmpty(bucketDataList)) { CgiReplyBucketData bucketData = bucketDataList.get(0); record.setAccountId(bucketData.getGhId()); // 投流即转 rootSourceId 查询 creativeId String creativeId = aigcApiService.getCreativeIdByRootSourceIdAndGhId(rootSourceId, bucketData.getGhId()); if (StringUtils.isNotBlank(creativeId)) { record.setCreativeId(creativeId); // 获取人群包信息:先根据creativeId获取创意信息,再用其中的adId查询人群包 AdPutTencentCreative creativeInfo = adApiService.getCreative(Long.valueOf(creativeId)); if (creativeInfo != null && creativeInfo.getAdId() != null) { List packageIds = adApiService.getPackageIdByAdId(creativeInfo.getAdId()); if (packageIds != null && !packageIds.isEmpty()) { record.setPackageId(JSONObject.toJSONString(packageIds.get(0))); } } } } } catch (Exception e) { log.error("获取公众号投流信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e); } Integer status = ExternalChannelStatusEnum.PENDING.getVal(); if (StringUtils.isNotBlank(record.getCreativeId())) { status = ExternalChannelStatusEnum.PROCESSED.getVal(); } updateRecordProcessed(record, status); } /** * 5. 服务号合作-Daily-自选 * 渠道判断:rootSourceId 以 fwhhzdyy_ 或 fwhdyy_ 开头 * 处理逻辑:查询 content_platform_gzh_plan_video 获取合作方和公众号信息 */ private void processFwhCooperateDaily(ExternalChannel record) { String rootSourceId = record.getRootSourceId(); Long recordId = record.getId(); log.info("处理服务号合作-Daily, id={}, rootSourceId={}", recordId, rootSourceId); record.setChannel("服务号合作-Daily-自选"); try { // 查询content_platform_gzh_plan_video ContentPlatformGzhPlanVideoExample videoExample = new ContentPlatformGzhPlanVideoExample(); videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1); List videoList = gzhPlanVideoMapper.selectByExample(videoExample); if (!CollectionUtils.isEmpty(videoList)) { ContentPlatformGzhPlanVideo video = videoList.get(0); record.setPartnerId(String.valueOf(video.getCreateAccountId())); // 关联content_platform_gzh_plan获取公众号 ContentPlatformGzhPlan plan = gzhPlanMapper.selectByPrimaryKey(video.getPlanId()); if (plan != null) { ContentPlatformGzhAccount account = gzhAccountMapper.selectByPrimaryKey(plan.getAccountId()); if (account != null) { record.setAccountId(account.getGhId()); } } // 爬虫获取文章ID String articleId = recommendApiService.getCooperateArticleIdByRootSourceId(rootSourceId); if (StringUtils.isNotBlank(articleId)) { record.setArticleId(articleId); } } } catch (Exception e) { log.error("获取服务号合作信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e); } Integer status = ExternalChannelStatusEnum.PENDING.getVal(); if (StringUtils.isNotBlank(record.getAccountId()) && StringUtils.isNotBlank(record.getArticleId())) { status = ExternalChannelStatusEnum.PROCESSED.getVal(); } updateRecordProcessed(record, status); } /** * 6a. 服务号投流-Daily * 渠道判断:rootSourceId 以 fwhtouliu_tencentgzh 开头且命中 recommend 接口 * 处理逻辑:通过 recommendApiService 获取文章信息 */ private void processFwhTouliuDaily(ExternalChannel record) { String rootSourceId = record.getRootSourceId(); Long recordId = record.getId(); log.info("处理服务号投流-Daily, id={}, rootSourceId={}", recordId, rootSourceId); record.setChannel("服务号投流-Daily"); record.setCardId(rootSourceId); try { // 通过 recommendApiService 获取文章信息 JSONObject article = recommendApiService.getArticleByRootSourceId(rootSourceId); if (article != null) { String contentId = article.getString("wxSn"); if (StringUtils.isNotBlank(contentId)) { record.setArticleId(contentId); record.setAccountId(article.getString("ghId")); } } } catch (Exception e) { log.error("获取服务号投流-Daily信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e); } Integer status = ExternalChannelStatusEnum.PENDING.getVal(); if (StringUtils.isNotBlank(record.getAccountId())) { status = ExternalChannelStatusEnum.PROCESSED.getVal(); } updateRecordProcessed(record, status); } /** * 6b. 服务号投流-即转 * 渠道判断:rootSourceId 以 fwhtouliu_tencentgzh 开头且命中 cgiReply 接口 * 处理逻辑:查询 cgi_reply_bucket_data 获取 ghId,通过 aigcApiService 获取 creativeId */ private void processFwhTouliuReply(ExternalChannel record) { String rootSourceId = record.getRootSourceId(); Long recordId = record.getId(); log.info("处理服务号投流-即转, id={}, rootSourceId={}", recordId, rootSourceId); record.setChannel("服务号投流-即转"); record.setCardId(rootSourceId); try { // 查询cgi_reply_bucket_data获取ghId和pagePath CgiReplyBucketDataExample example = new CgiReplyBucketDataExample(); example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andIsDeleteEqualTo(0); example.setOrderByClause("id desc"); List bucketDataList = cgiReplyBucketDataMapper.selectByExample(example); if (!CollectionUtils.isEmpty(bucketDataList)) { CgiReplyBucketData bucketData = bucketDataList.get(0); record.setAccountId(bucketData.getGhId()); // 投流即转 rootSourceId 查询 creativeId String creativeId = aigcApiService.getCreativeIdByRootSourceIdAndGhId(rootSourceId, bucketData.getGhId()); if (StringUtils.isNotBlank(creativeId)) { record.setCreativeId(creativeId); // 获取人群包信息:先根据creativeId获取创意信息,再用其中的adId查询人群包 AdPutTencentCreative creativeInfo = adApiService.getCreative(Long.valueOf(creativeId)); if (creativeInfo != null && creativeInfo.getAdId() != null) { List packageIds = adApiService.getPackageIdByAdId(creativeInfo.getAdId()); if (packageIds != null && !packageIds.isEmpty()) { record.setPackageId(JSONObject.toJSONString(packageIds.get(0))); } } } } } catch (Exception e) { log.error("获取服务号投流-即转信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e); } Integer status = ExternalChannelStatusEnum.PENDING.getVal(); if (StringUtils.isNotBlank(record.getCreativeId())) { status = ExternalChannelStatusEnum.PROCESSED.getVal(); } updateRecordProcessed(record, status); } /** * 7. 公众号合作-Daily-自选 * 渠道判断:rootSourceId 以 longArticles_ 或 longArticles_outer 开头 * 处理逻辑:查询 content_platform_gzh_plan_video 获取合作方和公众号信息 */ private void processGzhCooperateDaily(ExternalChannel record) { String rootSourceId = record.getRootSourceId(); Long recordId = record.getId(); log.info("处理公众号合作-Daily, id={}, rootSourceId={}", recordId, rootSourceId); record.setChannel("公众号合作-Daily-自选"); try { // 查询content_platform_gzh_plan_video ContentPlatformGzhPlanVideoExample videoExample = new ContentPlatformGzhPlanVideoExample(); videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1); List videoList = gzhPlanVideoMapper.selectByExample(videoExample); if (!CollectionUtils.isEmpty(videoList)) { ContentPlatformGzhPlanVideo video = videoList.get(0); record.setPartnerId(String.valueOf(video.getCreateAccountId())); record.setArticleId(String.valueOf(video.getVideoId())); // 关联content_platform_gzh_plan获取公众号 ContentPlatformGzhPlan plan = gzhPlanMapper.selectByPrimaryKey(video.getPlanId()); if (plan != null) { ContentPlatformGzhAccount account = gzhAccountMapper.selectByPrimaryKey(plan.getAccountId()); if (account != null) { record.setAccountId(account.getGhId()); } } // 爬虫获取文章ID String articleId = recommendApiService.getCooperateArticleIdByRootSourceId(rootSourceId); if (StringUtils.isNotBlank(articleId)) { record.setArticleId(articleId); } } } catch (Exception e) { log.error("获取公众号合作-Daily信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e); } Integer status = ExternalChannelStatusEnum.PENDING.getVal(); if (StringUtils.isNotBlank(record.getAccountId()) && StringUtils.isNotBlank(record.getArticleId())) { status = ExternalChannelStatusEnum.PROCESSED.getVal(); } updateRecordProcessed(record, status); } /** * 8. 公众号买号 * 渠道判断:rootSourceId 以 longArticles_inner_ 开头且 AIGC 接口返回 "买号" * 处理逻辑:通过 recommendApiService 获取文章和公众号信息 */ private void processGzhBuyAccount(ExternalChannel record) { String rootSourceId = record.getRootSourceId(); Long recordId = record.getId(); log.info("处理公众号买号, id={}, rootSourceId={}", recordId, rootSourceId); record.setChannel("公众号买号"); record.setCardId(rootSourceId); try { // 解析发布内容ID JSONObject article = recommendApiService.getArticleByRootSourceId(rootSourceId); if (article != null) { String contentId = article.getString("wxSn"); if (StringUtils.isNotBlank(contentId)) { record.setArticleId(contentId); record.setAccountId(article.getString("ghId")); } } } catch (Exception e) { log.error("获取公众号买号信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e); } Integer status = ExternalChannelStatusEnum.PENDING.getVal(); if (StringUtils.isNotBlank(record.getAccountId())) { status = ExternalChannelStatusEnum.PROCESSED.getVal(); } updateRecordProcessed(record, status); } /** * 9. 公众号代运营-Daily-系统 * 渠道判断:rootSourceId 以 longArticles_inner_ 开头且 AIGC 接口返回 "代运营" * 处理逻辑:通过 recommendApiService 获取文章和公众号信息 */ private void processGzhOperationDaily(ExternalChannel record) { String rootSourceId = record.getRootSourceId(); Long recordId = record.getId(); log.info("处理公众号代运营-Daily, id={}, rootSourceId={}", recordId, rootSourceId); record.setChannel("公众号代运营-Daily-系统"); record.setCardId(rootSourceId); try { // 解析发布内容ID JSONObject article = recommendApiService.getArticleByRootSourceId(rootSourceId); if (article != null) { String contentId = article.getString("wxSn"); if (StringUtils.isNotBlank(contentId)) { record.setArticleId(contentId); record.setAccountId(article.getString("ghId")); } } } catch (Exception e) { log.error("获取公众号代运营-Daily信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e); } Integer status = ExternalChannelStatusEnum.PENDING.getVal(); if (StringUtils.isNotBlank(record.getAccountId())) { status = ExternalChannelStatusEnum.PROCESSED.getVal(); } updateRecordProcessed(record, status); } /** * 更新记录状态 * * @param record 待更新记录 * @param status 目标状态 */ private void updateRecordProcessed(ExternalChannel record, Integer status) { record.setStatus(status); record.setUpdateTime(new Date()); externalChannelMapper.updateByPrimaryKeySelective(record); log.info("记录处理完成, id={}, channel={}, status={}", record.getId(), record.getChannel(), status); } /** * 批量插入的批次大小 */ private static final int BATCH_INSERT_SIZE = 500; /** * 默认历史数据天数 */ private static final int DEFAULT_HISTORY_DAYS = 30; /** * 初始化历史数据 * 参数格式:days=30 或 channel=GZH_TOULIU,days=30 * days 表示距当日的间隔天数,不传默认30天 * channel 可选,不传则初始化所有已支持的渠道 * * @param param 参数字符串 * @return 执行结果 */ @XxlJob("initExternalChannelHistory") public ReturnT initExternalChannelHistory(String param) { log.info("开始初始化外部渠道历史数据, param={}", param); // 解析参数 int days = DEFAULT_HISTORY_DAYS; if (StringUtils.isNotBlank(param)) { days = Integer.parseInt(param); } // 根据天数计算起始时间 Date now = new Date(); Date startDate = new Date(now.getTime() - TimeUnit.DAYS.toMillis(days)); log.info("初始化历史数据, days={}, startDate={}", days, startDate); try { int totalCount = 0; // 初始化所有已支持的渠道 totalCount += initChannelHistory(ChannelType.MINIAPP_TOULIU, startDate, days); totalCount += initChannelHistory(ChannelType.GZH_COOPERATE_JIZHUAN, startDate, days); totalCount += initChannelHistory(ChannelType.QW_COOPERATE, startDate, days); totalCount += initChannelHistory(ChannelType.GZH_TOULIU, startDate, days); totalCount += initChannelHistory(ChannelType.FWH_COOPERATE_DAILY, startDate, days); totalCount += initChannelHistory(ChannelType.FWH_TOULIU_DAILY, startDate, days); totalCount += initChannelHistory(ChannelType.FWH_TOULIU_REPLY, startDate, days); totalCount += initChannelHistory(ChannelType.GZH_COOPERATE_DAILY, startDate, days); totalCount += initChannelHistory(ChannelType.GZH_BUY_ACCOUNT, startDate, days); totalCount += initChannelHistory(ChannelType.GZH_OPERATION_DAILY, startDate, days); log.info("历史数据初始化完成, 共插入{}条记录", totalCount); return new ReturnT<>(ReturnT.SUCCESS_CODE, "初始化完成,共插入" + totalCount + "条记录"); } catch (Exception e) { log.error("初始化历史数据异常", e); return new ReturnT<>(ReturnT.FAIL_CODE, "初始化失败:" + e.getMessage()); } } /** * 初始化指定渠道的历史数据 * * @param channelType 渠道类型 * @param startDate 起始日期 * @return 插入的记录数 */ private int initChannelHistory(ChannelType channelType, Date startDate, int days) { log.info("初始化渠道历史数据, channelType={}, startDate={}", channelType, startDate); List rootSourceIds = new ArrayList<>(); Date now = new Date(); switch (channelType) { case MINIAPP_TOULIU: // 小程序投流: 历史已手动导入一年 break; case GZH_COOPERATE_JIZHUAN: // 公众号合作-即转:从cgi_reply_bucket_data查询dyyjs_开头的rootSourceId rootSourceIds = queryRootSourceIdsFromCgiReply(PREFIX_DYYJS, startDate); break; case QW_COOPERATE: // 群/企微合作:从content_platform_qw_plan查询 rootSourceIds = queryRootSourceIdsFromQwPlan(startDate); break; case GZH_TOULIU: // 公众号投流:从cgi_reply_bucket_data查询多种前缀 rootSourceIds.addAll(queryRootSourceIdsFromCgiReply(PREFIX_TOULIU_TENCENTGZH, startDate)); rootSourceIds.addAll(queryRootSourceIdsFromCgiReply(PREFIX_TOULIU_TENCENTGZHARTICLE, startDate)); rootSourceIds.addAll(queryRootSourceIdsFromCgiReply(PREFIX_GZHTOULIU_ARTICLES, startDate)); break; case FWH_COOPERATE_DAILY: // 服务号合作-Daily:从content_platform_gzh_plan_video查询 rootSourceIds = queryRootSourceIdsFromGzhPlanVideo(PREFIX_FWHHZDYy, startDate); rootSourceIds.addAll(queryRootSourceIdsFromGzhPlanVideo(PREFIX_FWHDYY, startDate)); break; case FWH_TOULIU_REPLY: // 服务号投流-即转:从cgi_reply_bucket_data查询fwhtouliu_tencentgzh开头 rootSourceIds = queryRootSourceIdsFromCgiReply(PREFIX_FWHTOULIU_TENCENTGZH, startDate); break; case FWH_TOULIU_DAILY: // 服务号投流-Daily:从aigc getNearDaysRootSourceIds接口查询fwhtouliu_开头 rootSourceIds = queryRootSourceIdsFromAigc(PREFIX_FWHTOULIU, days); break; case GZH_COOPERATE_DAILY: // 公众号合作-Daily:从content_platform_gzh_plan_video查询longArticles_开头 rootSourceIds = queryRootSourceIdsFromGzhPlanVideo(PREFIX_LONGARTICLES, startDate); break; case GZH_BUY_ACCOUNT: // 公众号买号:从aigc getNearDaysRootSourceIds接口查询longArticles_inner_开头 rootSourceIds = queryRootSourceIdsFromAigc(PREFIX_LONGARTICLES, days); break; case GZH_OPERATION_DAILY: // 公众号代运营-Daily:从aigc getNearDaysRootSourceIds接口查询longArticles_inner_开头 rootSourceIds = queryRootSourceIdsFromAigc(PREFIX_LONGARTICLES, days); break; default: return 0; } if (CollectionUtils.isEmpty(rootSourceIds)) { log.info("渠道{}未找到历史数据", channelType); return 0; } log.info("渠道{}找到{}条历史数据记录", channelType, rootSourceIds.size()); // 构建待插入记录列表 List records = new ArrayList<>(); for (String rootSourceId : rootSourceIds) { ExternalChannel record = new ExternalChannel(); record.setRootSourceId(rootSourceId); record.setStatus(ExternalChannelStatusEnum.PENDING.getVal()); record.setIsDelete(0); record.setCreateTime(now); record.setUpdateTime(now); records.add(record); } // 分批插入 int insertedCount = 0; for (int i = 0; i < records.size(); i += BATCH_INSERT_SIZE) { int end = Math.min(i + BATCH_INSERT_SIZE, records.size()); List batch = records.subList(i, end); // 批量查询已存在的rootSourceId,过滤掉已存在的记录 List batchRootSourceIds = batch.stream() .map(ExternalChannel::getRootSourceId) .collect(Collectors.toList()); List existingIds = externalChannelMapperExt.selectExistingRootSourceIds(batchRootSourceIds); List filteredBatch = batch; if (!CollectionUtils.isEmpty(existingIds)) { filteredBatch = batch.stream() .filter(r -> !existingIds.contains(r.getRootSourceId())) .collect(Collectors.toList()); } if (CollectionUtils.isEmpty(filteredBatch)) { log.info("渠道{}本批全部已存在, 跳过", channelType); continue; } int count = externalChannelMapperExt.batchInsertIgnore(filteredBatch); insertedCount += count; log.info("渠道{}批次插入完成, 本批{}, 过滤已存在{}, 累计插入{}", channelType, filteredBatch.size(), existingIds != null ? existingIds.size() : 0, insertedCount); } log.info("渠道{}历史数据初始化完成, 插入{}条", channelType, insertedCount); return insertedCount; } /** * 从aigc getNearDaysRootSourceIds接口查询指定前缀的rootSourceId列表 * * @param prefix 前缀 * @param days 天数 * @return rootSourceId列表 */ private List queryRootSourceIdsFromAigc(String prefix, int days) { List result = new ArrayList<>(); try { List list = recommendApiService.getArticleByRootSourceId(prefix, days); if (!CollectionUtils.isEmpty(list)) { result = list; } } catch (Exception e) { log.error("从aigc查询历史数据失败, prefix={}, days={}", prefix, days, e); } return result; } /** * 从cgi_reply_bucket_data表查询指定前缀的rootSourceId列表 * * @param prefix 前缀 * @param startDate 起始日期 * @return rootSourceId列表 */ private List queryRootSourceIdsFromCgiReply(String prefix, Date startDate) { List result = new ArrayList<>(); try { //CgiReplyBucketDataExample example = new CgiReplyBucketDataExample(); //example.createCriteria() // .andCreateTimeGreaterThanOrEqualTo(startDate) // .andRootSourceIdLike(prefix + "%"); //List list = cgiReplyBucketDataMapper.selectByExample(example); //if (!CollectionUtils.isEmpty(list)) { // result = list.stream().map(CgiReplyBucketData::getRootSourceId).collect(Collectors.toList()); //} result = cgiReplyBucketDataMapperExt.getRootSourceIdByPrefix(prefix, startDate); } catch (Exception e) { log.error("从cgi_reply_bucket_data查询历史数据失败, prefix={}", prefix, e); } return result; } /** * 从content_platform_qw_plan表查询rootSourceId列表 * * @param startDate 起始日期 * @return rootSourceId列表 */ private List queryRootSourceIdsFromQwPlan(Date startDate) { List result = new ArrayList<>(); try { ContentPlatformQwPlanExample example = new ContentPlatformQwPlanExample(); example.createCriteria() .andStatusEqualTo(1) .andRootSourceIdIsNotNull() .andCreateTimestampGreaterThanOrEqualTo(startDate.getTime()); List list = qwPlanMapper.selectByExample(example); if (!CollectionUtils.isEmpty(list)) { for (ContentPlatformQwPlan plan : list) { if (StringUtils.isNotBlank(plan.getRootSourceId())) { result.add(plan.getRootSourceId()); } } } } catch (Exception e) { log.error("从content_platform_qw_plan查询历史数据失败", e); } return result; } /** * 从content_platform_gzh_plan_video表查询指定前缀的rootSourceId列表 * * @param prefix 前缀 * @param startDate 起始日期 * @return rootSourceId列表 */ private List queryRootSourceIdsFromGzhPlanVideo(String prefix, Date startDate) { List result = new ArrayList<>(); try { ContentPlatformGzhPlanVideoExample example = new ContentPlatformGzhPlanVideoExample(); example.createCriteria() .andRootSourceIdLike(prefix + "%") .andCreateTimestampGreaterThanOrEqualTo(startDate.getTime()); List list = gzhPlanVideoMapper.selectByExample(example); if (!CollectionUtils.isEmpty(list)) { for (ContentPlatformGzhPlanVideo video : list) { if (StringUtils.isNotBlank(video.getRootSourceId())) { result.add(video.getRootSourceId()); } } } } catch (Exception e) { log.error("从content_platform_gzh_plan_video查询历史数据失败, prefix={}", prefix, e); } return result; } }