ExternalChannelProcessJob.java 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087
  1. package com.tzld.piaoquan.api.job;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.tzld.piaoquan.api.common.enums.contentplatform.ChannelType;
  4. import com.tzld.piaoquan.api.common.enums.contentplatform.ExternalChannelStatusEnum;
  5. import com.tzld.piaoquan.api.component.AdApiService;
  6. import com.tzld.piaoquan.api.component.AigcApiService;
  7. import com.tzld.piaoquan.api.component.RecommendApiService;
  8. import com.tzld.piaoquan.api.dao.mapper.contentplatform.*;
  9. import com.tzld.piaoquan.api.model.bo.AdPutTencentCreative;
  10. import com.tzld.piaoquan.api.model.po.contentplatform.*;
  11. import com.tzld.piaoquan.api.service.CgiReplyService;
  12. import com.tzld.piaoquan.api.service.contentplatform.ContentPlatformPlanService;
  13. import com.tzld.piaoquan.growth.common.dao.mapper.CgiReplyBucketDataMapper;
  14. import com.tzld.piaoquan.growth.common.dao.mapper.ext.CgiReplyBucketDataMapperExt;
  15. import com.tzld.piaoquan.growth.common.model.po.CgiReplyBucketData;
  16. import com.tzld.piaoquan.growth.common.model.po.CgiReplyBucketDataExample;
  17. import com.tzld.piaoquan.growth.common.utils.RedisUtils;
  18. import com.xxl.job.core.biz.model.ReturnT;
  19. import com.xxl.job.core.handler.annotation.XxlJob;
  20. import lombok.extern.slf4j.Slf4j;
  21. import org.apache.commons.lang3.StringUtils;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.stereotype.Component;
  24. import org.springframework.util.CollectionUtils;
  25. import java.util.ArrayList;
  26. import java.util.Date;
  27. import java.util.List;
  28. import java.util.concurrent.TimeUnit;
  29. import java.util.stream.Collectors;
  30. /**
  31. * 外部渠道处理定时任务
  32. * 处理 external_channel 表中 status=0(待处理)的记录
  33. * 根据 rootSourceId 判断渠道类型,获取合作方及其他内容
  34. * 仅处理创建时间在 DEFAULT_MAX_RETRY_DAYS 天内的记录
  35. */
  36. @Slf4j
  37. @Component
  38. public class ExternalChannelProcessJob {
  39. @Autowired
  40. private ExternalChannelMapper externalChannelMapper;
  41. @Autowired
  42. private ExternalChannelMapperExt externalChannelMapperExt;
  43. @Autowired
  44. private ContentPlatformGzhAccountMapper gzhAccountMapper;
  45. @Autowired
  46. private ContentPlatformQwPlanMapper qwPlanMapper;
  47. @Autowired
  48. private ContentPlatformGzhPlanVideoMapper gzhPlanVideoMapper;
  49. @Autowired
  50. private ContentPlatformGzhPlanMapper gzhPlanMapper;
  51. @Autowired
  52. private CgiReplyBucketDataMapper cgiReplyBucketDataMapper;
  53. @Autowired
  54. private CgiReplyBucketDataMapperExt cgiReplyBucketDataMapperExt;
  55. @Autowired
  56. private CgiReplyService cgiReplyService;
  57. @Autowired
  58. private ContentPlatformPlanService contentPlatformPlanService;
  59. @Autowired
  60. private AdApiService adApiService;
  61. @Autowired
  62. private AigcApiService aigcApiService;
  63. @Autowired
  64. private RecommendApiService recommendApiService;
  65. @Autowired
  66. private RedisUtils redisUtils;
  67. // 渠道类型前缀定义
  68. private static final String PREFIX_TOULIU_TENCENT = "touliu_tencent_";
  69. private static final String PREFIX_DYYJS = "dyyjs_";
  70. private static final String PREFIX_DYYQW = "dyyqw_";
  71. private static final String PREFIX_TOULIU_TENCENTWBqw = "touliu_tencentwbqw_";
  72. private static final String PREFIX_TOULIU_TENCENTGZH = "touliu_tencentgzh_";
  73. private static final String PREFIX_TOULIU_TENCENTGZHARTICLE = "touliu_tencentGzhArticle_";
  74. private static final String PREFIX_GZHTOULIU_ARTICLES = "GzhTouLiu_Articles_gh";
  75. private static final String PREFIX_FWHHZDYy = "fwhhzdyy_";
  76. private static final String PREFIX_FWHDYY = "fwhdyy_";
  77. private static final String PREFIX_FWHTOULIU_TENCENTGZH = "fwhtouliu_tencentgzh";
  78. private static final String PREFIX_FWHTOULIU = "fwhtouliu_";
  79. private static final String PREFIX_LONGARTICLES = "longArticles_";
  80. private static final String PREFIX_LONGARTICLES_OUTER = "longArticles_outer";
  81. private static final String PREFIX_LONGARTICLES_INNER = "longArticles_inner_";
  82. /**
  83. * 默认最大重试天数,超过此天数的待处理记录不再查询
  84. */
  85. private static final int DEFAULT_MAX_RETRY_DAYS = 3;
  86. /**
  87. * 每次查询的记录数限制
  88. */
  89. private static final int QUERY_LIMIT = 100;
  90. /**
  91. * 分布式锁前缀
  92. */
  93. private static final String LOCK_PREFIX = "external_channel:process:";
  94. /**
  95. * 分布式锁过期时间(秒)
  96. */
  97. private static final long LOCK_EXPIRE_SECONDS = 300;
  98. /**
  99. * 处理外部渠道待处理记录
  100. * 1. 先将超过 DEFAULT_MAX_RETRY_DAYS 天的待处理记录标记为 FAILED
  101. * 2. 仅处理创建时间在 DEFAULT_MAX_RETRY_DAYS 天内的待处理记录
  102. */
  103. @XxlJob("processExternalChannelJob")
  104. public ReturnT<String> processExternalChannel(String param) {
  105. log.info("开始处理外部渠道待处理记录, 最大重试天数={}", DEFAULT_MAX_RETRY_DAYS);
  106. try {
  107. // 提前计算时间阈值,避免在SQL中进行日期计算
  108. Date now = new Date();
  109. Date beforeDaysTime = new Date(now.getTime() - TimeUnit.DAYS.toMillis(DEFAULT_MAX_RETRY_DAYS));
  110. // 0. 先将超过天数的待处理记录标记为失败
  111. int failedCount = externalChannelMapperExt.markFailedForTimeoutRecords(beforeDaysTime);
  112. if (failedCount > 0) {
  113. log.info("已将{}条创建时间早于{}的记录标记为失败", failedCount, beforeDaysTime);
  114. }
  115. // 1. 循环分页查询并处理待处理的记录(status=0),使用id>lastId方式分页
  116. int totalProcessed = 0;
  117. int pageNum = 0;
  118. long lastId = 0;
  119. List<ExternalChannel> pendingList;
  120. do {
  121. pageNum++;
  122. pendingList = externalChannelMapperExt.selectPendingList(lastId, QUERY_LIMIT, beforeDaysTime);
  123. if (CollectionUtils.isEmpty(pendingList)) {
  124. if (pageNum == 1) {
  125. log.info("没有待处理的外部渠道记录");
  126. }
  127. break;
  128. }
  129. // 更新lastId为本页最后一条记录的id
  130. lastId = pendingList.get(pendingList.size() - 1).getId();
  131. log.info("第{}页, 找到{}条待处理记录, lastId={}", pageNum, pendingList.size(), lastId);
  132. // 2. 逐条处理
  133. for (ExternalChannel record : pendingList) {
  134. try {
  135. processSingleRecord(record);
  136. totalProcessed++;
  137. } catch (Exception e) {
  138. log.error("处理记录异常, id={}, rootSourceId={}", record.getId(), record.getRootSourceId(), e);
  139. }
  140. }
  141. } while (pendingList.size() >= QUERY_LIMIT);
  142. log.info("外部渠道处理完成, 共处理{}条记录", totalProcessed);
  143. return ReturnT.SUCCESS;
  144. } catch (Exception e) {
  145. log.error("处理外部渠道记录时发生异常", e);
  146. return ReturnT.FAIL;
  147. }
  148. }
  149. /**
  150. * 处理单条记录
  151. * 根据渠道类型路由到对应的处理方法,使用分布式锁防止并发处理
  152. */
  153. private void processSingleRecord(ExternalChannel record) {
  154. String rootSourceId = record.getRootSourceId();
  155. Long recordId = record.getId();
  156. if (StringUtils.isBlank(rootSourceId)) {
  157. log.warn("rootSourceId为空,标记为未知渠道, id={}", recordId);
  158. record.setChannel("未知渠道-rootSourceId为空");
  159. updateRecordProcessed(record, ExternalChannelStatusEnum.UNKNOWN_CHANNEL.getVal());
  160. return;
  161. }
  162. // 尝试获取分布式锁
  163. String lockKey = LOCK_PREFIX + recordId;
  164. String lockValue = String.valueOf(System.currentTimeMillis());
  165. boolean locked = false;
  166. try {
  167. locked = redisUtils.tryLock(lockKey, lockValue, LOCK_EXPIRE_SECONDS);
  168. if (!locked) {
  169. log.info("获取分布式锁失败,跳过处理, id={}, rootSourceId={}", recordId, rootSourceId);
  170. return;
  171. }
  172. // 判断渠道类型并处理
  173. ChannelType channelType = determineChannelType(rootSourceId);
  174. if (channelType == null) {
  175. log.warn("无法识别渠道类型, id={}, rootSourceId={}", recordId, rootSourceId);
  176. record.setChannel("未知渠道-前缀不匹配");
  177. updateRecordProcessed(record, ExternalChannelStatusEnum.UNKNOWN_CHANNEL.getVal());
  178. return;
  179. }
  180. switch (channelType) {
  181. case MINIAPP_TOULIU:
  182. processMiniAppTouliu(record);
  183. break;
  184. case GZH_COOPERATE_JIZHUAN:
  185. processGzhCooperateJizhuan(record);
  186. break;
  187. case QW_COOPERATE:
  188. processQwCooperate(record);
  189. break;
  190. case GZH_TOULIU:
  191. processGzhTouliu(record);
  192. break;
  193. case FWH_COOPERATE_DAILY:
  194. processFwhCooperateDaily(record);
  195. break;
  196. case FWH_TOULIU_DAILY:
  197. processFwhTouliuDaily(record);
  198. break;
  199. case FWH_TOULIU_REPLY:
  200. processFwhTouliuReply(record);
  201. break;
  202. case GZH_COOPERATE_DAILY:
  203. processGzhCooperateDaily(record);
  204. break;
  205. case GZH_BUY_ACCOUNT:
  206. processGzhBuyAccount(record);
  207. break;
  208. case GZH_OPERATION_DAILY:
  209. processGzhOperationDaily(record);
  210. break;
  211. default:
  212. log.warn("未处理的渠道类型, id={}, rootSourceId={}, channelType={}", recordId, rootSourceId, channelType);
  213. record.setChannel("未知渠道-" + channelType.name());
  214. updateRecordProcessed(record, ExternalChannelStatusEnum.UNKNOWN_CHANNEL.getVal());
  215. }
  216. } finally {
  217. // 释放分布式锁
  218. if (locked) {
  219. redisUtils.del(lockKey);
  220. }
  221. }
  222. }
  223. /**
  224. * 判断渠道类型
  225. * 根据 rootSourceId 前缀判断所属渠道类型
  226. */
  227. private ChannelType determineChannelType(String rootSourceId) {
  228. if (rootSourceId.startsWith(PREFIX_TOULIU_TENCENT)) {
  229. return ChannelType.MINIAPP_TOULIU;
  230. } else if (rootSourceId.startsWith(PREFIX_DYYJS)) {
  231. return ChannelType.GZH_COOPERATE_JIZHUAN;
  232. } else if (rootSourceId.startsWith(PREFIX_DYYQW) || rootSourceId.startsWith(PREFIX_TOULIU_TENCENTWBqw)) {
  233. return ChannelType.QW_COOPERATE;
  234. } else if (rootSourceId.startsWith(PREFIX_TOULIU_TENCENTGZH)
  235. || rootSourceId.startsWith(PREFIX_TOULIU_TENCENTGZHARTICLE)
  236. || rootSourceId.startsWith(PREFIX_GZHTOULIU_ARTICLES)) {
  237. return ChannelType.GZH_TOULIU;
  238. } else if (rootSourceId.startsWith(PREFIX_FWHHZDYy) || rootSourceId.startsWith(PREFIX_FWHDYY)) {
  239. return ChannelType.FWH_COOPERATE_DAILY;
  240. } else if (rootSourceId.startsWith(PREFIX_FWHTOULIU_TENCENTGZH)) {
  241. return ChannelType.FWH_TOULIU_REPLY;
  242. } else if (rootSourceId.startsWith(PREFIX_FWHTOULIU)) {
  243. // 服务号投流需要通过接口判断是daily还是reply
  244. return determineFwhTouliuType(rootSourceId);
  245. } else if (rootSourceId.startsWith(PREFIX_LONGARTICLES)) {
  246. if (rootSourceId.startsWith(PREFIX_LONGARTICLES_OUTER)) {
  247. return ChannelType.GZH_COOPERATE_DAILY;
  248. } else {
  249. return determineLongArticlesType(rootSourceId);
  250. }
  251. }
  252. return null;
  253. }
  254. /**
  255. * 判断服务号投流类型(Daily或即转)
  256. * 依次调用 recommendApiService 和 cgiReplyService 判断类型
  257. * 命中 recommend 接口返回 DAILY,命中 cgiReply 接口返回 REPLY
  258. */
  259. private ChannelType determineFwhTouliuType(String rootSourceId) {
  260. try {
  261. Boolean dailyExists = recommendApiService.checkExistRootSourceId(rootSourceId);
  262. if (Boolean.TRUE.equals(dailyExists)) {
  263. log.info("服务号投流类型判断, rootSourceId={}, dailyExists=true", rootSourceId);
  264. return ChannelType.FWH_TOULIU_DAILY;
  265. }
  266. Boolean replyExists = cgiReplyService.checkExistRootSourceId(rootSourceId);
  267. if (Boolean.TRUE.equals(replyExists)) {
  268. log.info("服务号投流类型判断, rootSourceId={}, replyExists=true", rootSourceId);
  269. return ChannelType.FWH_TOULIU_REPLY;
  270. }
  271. log.info("服务号投流类型判断, rootSourceId={}, dailyExists=false, replyExists=false", rootSourceId);
  272. } catch (Exception e) {
  273. log.error("判断服务号投流类型失败, rootSourceId={}", rootSourceId, e);
  274. }
  275. // 默认返回daily
  276. return ChannelType.FWH_TOULIU_DAILY;
  277. }
  278. /**
  279. * 判断 longArticles 类型(inner或outer)
  280. * 命名 recommend 接口返回 inner(公众号买号/代运营)
  281. * 命名 contentPlatformPlanService 接口返回 outer(公众号合作-Daily)
  282. */
  283. private ChannelType determineLongArticlesType(String rootSourceId) {
  284. try {
  285. Boolean innerExists = recommendApiService.checkExistRootSourceId(rootSourceId);
  286. if (Boolean.TRUE.equals(innerExists)) {
  287. log.info("longArticles类型判断, rootSourceId={}, innerExists=true", rootSourceId);
  288. // 区分 公众号买号 与 公众号代运营
  289. if (isGzhBuyAccount(rootSourceId)) {
  290. return ChannelType.GZH_BUY_ACCOUNT;
  291. } else {
  292. return ChannelType.GZH_OPERATION_DAILY;
  293. }
  294. }
  295. Boolean outerExists = contentPlatformPlanService.gzhPushCheckExistRootSourceId(rootSourceId);
  296. if (Boolean.TRUE.equals(outerExists)) {
  297. log.info("longArticles类型判断, rootSourceId={}, outerExists=true", rootSourceId);
  298. return ChannelType.GZH_COOPERATE_DAILY;
  299. }
  300. log.info("longArticles类型判断, rootSourceId={}, innerExists=false, outerExists=false", rootSourceId);
  301. } catch (Exception e) {
  302. log.error("判断longArticles类型失败, rootSourceId={}", rootSourceId, e);
  303. }
  304. // 默认返回 公众号代运营-Daily
  305. return ChannelType.GZH_OPERATION_DAILY;
  306. }
  307. /**
  308. * 判断是否为公众号买号
  309. * 调用 AIGC 接口根据 ghId 判断账号类型
  310. * 返回值包含 "买号" 则为公众号买号,包含 "代运营" 则为公众号代运营
  311. *
  312. * @param rootSourceId 根来源ID
  313. * @return true-公众号买号, false-公众号代运营
  314. */
  315. private boolean isGzhBuyAccount(String rootSourceId) {
  316. try {
  317. // 1. 先通过rootSourceId获取ghId
  318. JSONObject article = recommendApiService.getArticleByRootSourceId(rootSourceId);
  319. if (article == null || StringUtils.isBlank(article.getString("ghId"))) {
  320. log.warn("无法获取ghId, rootSourceId={}", rootSourceId);
  321. return false;
  322. }
  323. String ghId = article.getString("ghId");
  324. // 2. 调用AIGC接口判断账号类型
  325. String accountGroupSourceName = aigcApiService.getAccountGroupSourceName(ghId);
  326. log.info("判断公众号买号/代运营, rootSourceId={}, ghId={}, accountGroupSourceName={}", rootSourceId, ghId, accountGroupSourceName);
  327. if (StringUtils.isBlank(accountGroupSourceName)) {
  328. return false;
  329. }
  330. // 3. 根据返回判断账号类型
  331. if (accountGroupSourceName.contains("买号")) {
  332. log.info("判断为公众号买号, rootSourceId={}, ghId={}, accountGroupSourceName={}", rootSourceId, ghId, accountGroupSourceName);
  333. return true;
  334. } else if (accountGroupSourceName.contains("代运营")) {
  335. log.info("判断为公众号代运营, rootSourceId={}, ghId={}, accountGroupSourceName={}", rootSourceId, ghId, accountGroupSourceName);
  336. return false;
  337. } else {
  338. log.warn("无法识别账号类型, rootSourceId={}, ghId={}, accountGroupSourceName={}", rootSourceId, ghId, accountGroupSourceName);
  339. return false;
  340. }
  341. } catch (Exception e) {
  342. log.error("判断公众号买号/代运营失败, rootSourceId={}", rootSourceId, e);
  343. // 默认返回false,即公众号代运营
  344. return false;
  345. }
  346. }
  347. /**
  348. * 1. 小程序投流-稳定
  349. * 渠道判断:rootSourceId 以 touliu_tencent_ 开头
  350. * 处理逻辑:通过 adApiService 获取创意和人群包信息
  351. */
  352. private void processMiniAppTouliu(ExternalChannel record) {
  353. String rootSourceId = record.getRootSourceId();
  354. Long recordId = record.getId();
  355. log.info("处理小程序投流, id={}, rootSourceId={}", recordId, rootSourceId);
  356. record.setChannel("小程序投流-稳定");
  357. // 调用AD API获取创意信息
  358. try {
  359. AdPutTencentCreative creative = adApiService.getCreativeByRootSourceId(rootSourceId);
  360. if (creative != null) {
  361. record.setCreativeId(String.valueOf(creative.getCreativeId()));
  362. // 获取人群包信息
  363. List<Long> packageIds = adApiService.getPackageIdByAdId(creative.getAdId());
  364. if (packageIds != null && !packageIds.isEmpty()) {
  365. record.setPackageId(JSONObject.toJSONString(packageIds.get(0)));
  366. }
  367. }
  368. } catch (Exception e) {
  369. log.error("获取小程序投流信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e);
  370. }
  371. Integer status = ExternalChannelStatusEnum.PENDING.getVal();
  372. if (StringUtils.isNotBlank(record.getCreativeId())) {
  373. status = ExternalChannelStatusEnum.PROCESSED.getVal();
  374. }
  375. updateRecordProcessed(record, status);
  376. }
  377. /**
  378. * 2. 公众号合作-即转-稳定
  379. * 渠道判断:rootSourceId 以 dyyjs_ 开头
  380. * 处理逻辑:查询 cgi_reply_bucket_data 获取 ghId,再查询 content_platform_gzh_account 获取合作方
  381. */
  382. private void processGzhCooperateJizhuan(ExternalChannel record) {
  383. String rootSourceId = record.getRootSourceId();
  384. Long recordId = record.getId();
  385. log.info("处理公众号合作-即转, id={}, rootSourceId={}", recordId, rootSourceId);
  386. record.setChannel("公众号合作-即转-稳定");
  387. record.setCardId(rootSourceId);
  388. try {
  389. // 查询cgi_reply_bucket_data获取ghId
  390. CgiReplyBucketDataExample example = new CgiReplyBucketDataExample();
  391. example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andIsDeleteEqualTo(0);
  392. example.setOrderByClause("id desc");
  393. List<CgiReplyBucketData> bucketDataList = cgiReplyBucketDataMapper.selectByExample(example);
  394. if (!CollectionUtils.isEmpty(bucketDataList)) {
  395. String ghId = bucketDataList.get(0).getGhId();
  396. record.setAccountId(ghId);
  397. // 查询content_platform_gzh_account获取合作方
  398. ContentPlatformGzhAccountExample accountExample = new ContentPlatformGzhAccountExample();
  399. accountExample.createCriteria().andGhIdEqualTo(ghId).andStatusEqualTo(1);
  400. List<ContentPlatformGzhAccount> accountList = gzhAccountMapper.selectByExample(accountExample);
  401. if (!CollectionUtils.isEmpty(accountList)) {
  402. record.setPartnerId(String.valueOf(accountList.get(0).getCreateAccountId()));
  403. }
  404. }
  405. } catch (Exception e) {
  406. log.error("获取公众号合作信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e);
  407. }
  408. Integer status = ExternalChannelStatusEnum.PENDING.getVal();
  409. if (StringUtils.isNotBlank(record.getPartnerId())) {
  410. status = ExternalChannelStatusEnum.PROCESSED.getVal();
  411. }
  412. updateRecordProcessed(record, status);
  413. }
  414. /**
  415. * 3. 群/企微合作-稳定
  416. * 渠道判断:rootSourceId 以 dyyqw_ 或 touliu_tencentwbqw_ 开头
  417. * 处理逻辑:查询 content_platform_qw_plan 获取合作方
  418. */
  419. private void processQwCooperate(ExternalChannel record) {
  420. String rootSourceId = record.getRootSourceId();
  421. Long recordId = record.getId();
  422. log.info("处理群/企微合作, id={}, rootSourceId={}", recordId, rootSourceId);
  423. record.setChannel("群/企微合作-稳定");
  424. record.setCardId(rootSourceId);
  425. try {
  426. // 查询content_platform_qw_plan获取合作方
  427. ContentPlatformQwPlanExample example = new ContentPlatformQwPlanExample();
  428. example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1);
  429. List<ContentPlatformQwPlan> planList = qwPlanMapper.selectByExample(example);
  430. if (!CollectionUtils.isEmpty(planList)) {
  431. ContentPlatformQwPlan plan = planList.get(0);
  432. record.setPartnerId(String.valueOf(plan.getCreateAccountId()));
  433. }
  434. } catch (Exception e) {
  435. log.error("获取群/企微合作信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e);
  436. }
  437. Integer status = ExternalChannelStatusEnum.PENDING.getVal();
  438. if (StringUtils.isNotBlank(record.getPartnerId())) {
  439. status = ExternalChannelStatusEnum.PROCESSED.getVal();
  440. }
  441. updateRecordProcessed(record, status);
  442. }
  443. /**
  444. * 4. 公众号投流-稳定
  445. * 渠道判断:rootSourceId 以 touliu_tencentgzh_、touliu_tencentGzhArticle_ 或 GzhTouLiu_Articles_gh 开头
  446. * 处理逻辑:查询 cgi_reply_bucket_data 获取 ghId,通过 aigcApiService 获取 creativeId 和人群包信息
  447. */
  448. private void processGzhTouliu(ExternalChannel record) {
  449. String rootSourceId = record.getRootSourceId();
  450. Long recordId = record.getId();
  451. log.info("处理公众号投流, id={}, rootSourceId={}", recordId, rootSourceId);
  452. record.setChannel("公众号投流-稳定");
  453. record.setCardId(rootSourceId);
  454. try {
  455. // 查询cgi_reply_bucket_data获取ghId和pagePath
  456. CgiReplyBucketDataExample example = new CgiReplyBucketDataExample();
  457. example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andIsDeleteEqualTo(0);
  458. example.setOrderByClause("id desc");
  459. List<CgiReplyBucketData> bucketDataList = cgiReplyBucketDataMapper.selectByExample(example);
  460. if (!CollectionUtils.isEmpty(bucketDataList)) {
  461. CgiReplyBucketData bucketData = bucketDataList.get(0);
  462. record.setAccountId(bucketData.getGhId());
  463. // 投流即转 rootSourceId 查询 creativeId
  464. String creativeId = aigcApiService.getCreativeIdByRootSourceIdAndGhId(rootSourceId, bucketData.getGhId());
  465. if (StringUtils.isNotBlank(creativeId)) {
  466. record.setCreativeId(creativeId);
  467. // 获取人群包信息:先根据creativeId获取创意信息,再用其中的adId查询人群包
  468. AdPutTencentCreative creativeInfo = adApiService.getCreative(Long.valueOf(creativeId));
  469. if (creativeInfo != null && creativeInfo.getAdId() != null) {
  470. List<Long> packageIds = adApiService.getPackageIdByAdId(creativeInfo.getAdId());
  471. if (packageIds != null && !packageIds.isEmpty()) {
  472. record.setPackageId(JSONObject.toJSONString(packageIds.get(0)));
  473. }
  474. }
  475. }
  476. }
  477. } catch (Exception e) {
  478. log.error("获取公众号投流信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e);
  479. }
  480. Integer status = ExternalChannelStatusEnum.PENDING.getVal();
  481. if (StringUtils.isNotBlank(record.getCreativeId())) {
  482. status = ExternalChannelStatusEnum.PROCESSED.getVal();
  483. }
  484. updateRecordProcessed(record, status);
  485. }
  486. /**
  487. * 5. 服务号合作-Daily-自选
  488. * 渠道判断:rootSourceId 以 fwhhzdyy_ 或 fwhdyy_ 开头
  489. * 处理逻辑:查询 content_platform_gzh_plan_video 获取合作方和公众号信息
  490. */
  491. private void processFwhCooperateDaily(ExternalChannel record) {
  492. String rootSourceId = record.getRootSourceId();
  493. Long recordId = record.getId();
  494. log.info("处理服务号合作-Daily, id={}, rootSourceId={}", recordId, rootSourceId);
  495. record.setChannel("服务号合作-Daily-自选");
  496. try {
  497. // 查询content_platform_gzh_plan_video
  498. ContentPlatformGzhPlanVideoExample videoExample = new ContentPlatformGzhPlanVideoExample();
  499. videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1);
  500. List<ContentPlatformGzhPlanVideo> videoList = gzhPlanVideoMapper.selectByExample(videoExample);
  501. if (!CollectionUtils.isEmpty(videoList)) {
  502. ContentPlatformGzhPlanVideo video = videoList.get(0);
  503. record.setPartnerId(String.valueOf(video.getCreateAccountId()));
  504. // 关联content_platform_gzh_plan获取公众号
  505. ContentPlatformGzhPlan plan = gzhPlanMapper.selectByPrimaryKey(video.getPlanId());
  506. if (plan != null) {
  507. ContentPlatformGzhAccount account = gzhAccountMapper.selectByPrimaryKey(plan.getAccountId());
  508. if (account != null) {
  509. record.setAccountId(account.getGhId());
  510. }
  511. }
  512. // 爬虫获取文章ID
  513. String articleId = recommendApiService.getCooperateArticleIdByRootSourceId(rootSourceId);
  514. if (StringUtils.isNotBlank(articleId)) {
  515. record.setArticleId(articleId);
  516. }
  517. }
  518. } catch (Exception e) {
  519. log.error("获取服务号合作信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e);
  520. }
  521. Integer status = ExternalChannelStatusEnum.PENDING.getVal();
  522. if (StringUtils.isNotBlank(record.getAccountId()) && StringUtils.isNotBlank(record.getArticleId())) {
  523. status = ExternalChannelStatusEnum.PROCESSED.getVal();
  524. }
  525. updateRecordProcessed(record, status);
  526. }
  527. /**
  528. * 6a. 服务号投流-Daily
  529. * 渠道判断:rootSourceId 以 fwhtouliu_tencentgzh 开头且命中 recommend 接口
  530. * 处理逻辑:通过 recommendApiService 获取文章信息
  531. */
  532. private void processFwhTouliuDaily(ExternalChannel record) {
  533. String rootSourceId = record.getRootSourceId();
  534. Long recordId = record.getId();
  535. log.info("处理服务号投流-Daily, id={}, rootSourceId={}", recordId, rootSourceId);
  536. record.setChannel("服务号投流-Daily");
  537. record.setCardId(rootSourceId);
  538. try {
  539. // 通过 recommendApiService 获取文章信息
  540. JSONObject article = recommendApiService.getArticleByRootSourceId(rootSourceId);
  541. if (article != null) {
  542. String contentId = article.getString("wxSn");
  543. if (StringUtils.isNotBlank(contentId)) {
  544. record.setArticleId(contentId);
  545. record.setAccountId(article.getString("ghId"));
  546. }
  547. }
  548. } catch (Exception e) {
  549. log.error("获取服务号投流-Daily信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e);
  550. }
  551. Integer status = ExternalChannelStatusEnum.PENDING.getVal();
  552. if (StringUtils.isNotBlank(record.getAccountId())) {
  553. status = ExternalChannelStatusEnum.PROCESSED.getVal();
  554. }
  555. updateRecordProcessed(record, status);
  556. }
  557. /**
  558. * 6b. 服务号投流-即转
  559. * 渠道判断:rootSourceId 以 fwhtouliu_tencentgzh 开头且命中 cgiReply 接口
  560. * 处理逻辑:查询 cgi_reply_bucket_data 获取 ghId,通过 aigcApiService 获取 creativeId
  561. */
  562. private void processFwhTouliuReply(ExternalChannel record) {
  563. String rootSourceId = record.getRootSourceId();
  564. Long recordId = record.getId();
  565. log.info("处理服务号投流-即转, id={}, rootSourceId={}", recordId, rootSourceId);
  566. record.setChannel("服务号投流-即转");
  567. record.setCardId(rootSourceId);
  568. try {
  569. // 查询cgi_reply_bucket_data获取ghId和pagePath
  570. CgiReplyBucketDataExample example = new CgiReplyBucketDataExample();
  571. example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andIsDeleteEqualTo(0);
  572. example.setOrderByClause("id desc");
  573. List<CgiReplyBucketData> bucketDataList = cgiReplyBucketDataMapper.selectByExample(example);
  574. if (!CollectionUtils.isEmpty(bucketDataList)) {
  575. CgiReplyBucketData bucketData = bucketDataList.get(0);
  576. record.setAccountId(bucketData.getGhId());
  577. // 投流即转 rootSourceId 查询 creativeId
  578. String creativeId = aigcApiService.getCreativeIdByRootSourceIdAndGhId(rootSourceId, bucketData.getGhId());
  579. if (StringUtils.isNotBlank(creativeId)) {
  580. record.setCreativeId(creativeId);
  581. // 获取人群包信息:先根据creativeId获取创意信息,再用其中的adId查询人群包
  582. AdPutTencentCreative creativeInfo = adApiService.getCreative(Long.valueOf(creativeId));
  583. if (creativeInfo != null && creativeInfo.getAdId() != null) {
  584. List<Long> packageIds = adApiService.getPackageIdByAdId(creativeInfo.getAdId());
  585. if (packageIds != null && !packageIds.isEmpty()) {
  586. record.setPackageId(JSONObject.toJSONString(packageIds.get(0)));
  587. }
  588. }
  589. }
  590. }
  591. } catch (Exception e) {
  592. log.error("获取服务号投流-即转信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e);
  593. }
  594. Integer status = ExternalChannelStatusEnum.PENDING.getVal();
  595. if (StringUtils.isNotBlank(record.getCreativeId())) {
  596. status = ExternalChannelStatusEnum.PROCESSED.getVal();
  597. }
  598. updateRecordProcessed(record, status);
  599. }
  600. /**
  601. * 7. 公众号合作-Daily-自选
  602. * 渠道判断:rootSourceId 以 longArticles_ 或 longArticles_outer 开头
  603. * 处理逻辑:查询 content_platform_gzh_plan_video 获取合作方和公众号信息
  604. */
  605. private void processGzhCooperateDaily(ExternalChannel record) {
  606. String rootSourceId = record.getRootSourceId();
  607. Long recordId = record.getId();
  608. log.info("处理公众号合作-Daily, id={}, rootSourceId={}", recordId, rootSourceId);
  609. record.setChannel("公众号合作-Daily-自选");
  610. try {
  611. // 查询content_platform_gzh_plan_video
  612. ContentPlatformGzhPlanVideoExample videoExample = new ContentPlatformGzhPlanVideoExample();
  613. videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1);
  614. List<ContentPlatformGzhPlanVideo> videoList = gzhPlanVideoMapper.selectByExample(videoExample);
  615. if (!CollectionUtils.isEmpty(videoList)) {
  616. ContentPlatformGzhPlanVideo video = videoList.get(0);
  617. record.setPartnerId(String.valueOf(video.getCreateAccountId()));
  618. record.setArticleId(String.valueOf(video.getVideoId()));
  619. // 关联content_platform_gzh_plan获取公众号
  620. ContentPlatformGzhPlan plan = gzhPlanMapper.selectByPrimaryKey(video.getPlanId());
  621. if (plan != null) {
  622. ContentPlatformGzhAccount account = gzhAccountMapper.selectByPrimaryKey(plan.getAccountId());
  623. if (account != null) {
  624. record.setAccountId(account.getGhId());
  625. }
  626. }
  627. // 爬虫获取文章ID
  628. String articleId = recommendApiService.getCooperateArticleIdByRootSourceId(rootSourceId);
  629. if (StringUtils.isNotBlank(articleId)) {
  630. record.setArticleId(articleId);
  631. }
  632. }
  633. } catch (Exception e) {
  634. log.error("获取公众号合作-Daily信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e);
  635. }
  636. Integer status = ExternalChannelStatusEnum.PENDING.getVal();
  637. if (StringUtils.isNotBlank(record.getAccountId()) && StringUtils.isNotBlank(record.getArticleId())) {
  638. status = ExternalChannelStatusEnum.PROCESSED.getVal();
  639. }
  640. updateRecordProcessed(record, status);
  641. }
  642. /**
  643. * 8. 公众号买号
  644. * 渠道判断:rootSourceId 以 longArticles_inner_ 开头且 AIGC 接口返回 "买号"
  645. * 处理逻辑:通过 recommendApiService 获取文章和公众号信息
  646. */
  647. private void processGzhBuyAccount(ExternalChannel record) {
  648. String rootSourceId = record.getRootSourceId();
  649. Long recordId = record.getId();
  650. log.info("处理公众号买号, id={}, rootSourceId={}", recordId, rootSourceId);
  651. record.setChannel("公众号买号");
  652. record.setCardId(rootSourceId);
  653. try {
  654. // 解析发布内容ID
  655. JSONObject article = recommendApiService.getArticleByRootSourceId(rootSourceId);
  656. if (article != null) {
  657. String contentId = article.getString("wxSn");
  658. if (StringUtils.isNotBlank(contentId)) {
  659. record.setArticleId(contentId);
  660. record.setAccountId(article.getString("ghId"));
  661. }
  662. }
  663. } catch (Exception e) {
  664. log.error("获取公众号买号信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e);
  665. }
  666. Integer status = ExternalChannelStatusEnum.PENDING.getVal();
  667. if (StringUtils.isNotBlank(record.getAccountId())) {
  668. status = ExternalChannelStatusEnum.PROCESSED.getVal();
  669. }
  670. updateRecordProcessed(record, status);
  671. }
  672. /**
  673. * 9. 公众号代运营-Daily-系统
  674. * 渠道判断:rootSourceId 以 longArticles_inner_ 开头且 AIGC 接口返回 "代运营"
  675. * 处理逻辑:通过 recommendApiService 获取文章和公众号信息
  676. */
  677. private void processGzhOperationDaily(ExternalChannel record) {
  678. String rootSourceId = record.getRootSourceId();
  679. Long recordId = record.getId();
  680. log.info("处理公众号代运营-Daily, id={}, rootSourceId={}", recordId, rootSourceId);
  681. record.setChannel("公众号代运营-Daily-系统");
  682. record.setCardId(rootSourceId);
  683. try {
  684. // 解析发布内容ID
  685. JSONObject article = recommendApiService.getArticleByRootSourceId(rootSourceId);
  686. if (article != null) {
  687. String contentId = article.getString("wxSn");
  688. if (StringUtils.isNotBlank(contentId)) {
  689. record.setArticleId(contentId);
  690. record.setAccountId(article.getString("ghId"));
  691. }
  692. }
  693. } catch (Exception e) {
  694. log.error("获取公众号代运营-Daily信息异常, id={}, rootSourceId={}", recordId, rootSourceId, e);
  695. }
  696. Integer status = ExternalChannelStatusEnum.PENDING.getVal();
  697. if (StringUtils.isNotBlank(record.getAccountId())) {
  698. status = ExternalChannelStatusEnum.PROCESSED.getVal();
  699. }
  700. updateRecordProcessed(record, status);
  701. }
  702. /**
  703. * 更新记录状态
  704. *
  705. * @param record 待更新记录
  706. * @param status 目标状态
  707. */
  708. private void updateRecordProcessed(ExternalChannel record, Integer status) {
  709. record.setStatus(status);
  710. record.setUpdateTime(new Date());
  711. externalChannelMapper.updateByPrimaryKeySelective(record);
  712. log.info("记录处理完成, id={}, channel={}, status={}", record.getId(), record.getChannel(), status);
  713. }
  714. /**
  715. * 批量插入的批次大小
  716. */
  717. private static final int BATCH_INSERT_SIZE = 500;
  718. /**
  719. * 默认历史数据天数
  720. */
  721. private static final int DEFAULT_HISTORY_DAYS = 30;
  722. /**
  723. * 初始化历史数据
  724. * 参数格式:days=30 或 channel=GZH_TOULIU,days=30
  725. * days 表示距当日的间隔天数,不传默认30天
  726. * channel 可选,不传则初始化所有已支持的渠道
  727. *
  728. * @param param 参数字符串
  729. * @return 执行结果
  730. */
  731. @XxlJob("initExternalChannelHistory")
  732. public ReturnT<String> initExternalChannelHistory(String param) {
  733. log.info("开始初始化外部渠道历史数据, param={}", param);
  734. // 解析参数
  735. int days = DEFAULT_HISTORY_DAYS;
  736. if (StringUtils.isNotBlank(param)) {
  737. days = Integer.parseInt(param);
  738. }
  739. // 根据天数计算起始时间
  740. Date now = new Date();
  741. Date startDate = new Date(now.getTime() - TimeUnit.DAYS.toMillis(days));
  742. log.info("初始化历史数据, days={}, startDate={}", days, startDate);
  743. try {
  744. int totalCount = 0;
  745. // 初始化所有已支持的渠道
  746. totalCount += initChannelHistory(ChannelType.MINIAPP_TOULIU, startDate, days);
  747. totalCount += initChannelHistory(ChannelType.GZH_COOPERATE_JIZHUAN, startDate, days);
  748. totalCount += initChannelHistory(ChannelType.QW_COOPERATE, startDate, days);
  749. totalCount += initChannelHistory(ChannelType.GZH_TOULIU, startDate, days);
  750. totalCount += initChannelHistory(ChannelType.FWH_COOPERATE_DAILY, startDate, days);
  751. totalCount += initChannelHistory(ChannelType.FWH_TOULIU_DAILY, startDate, days);
  752. totalCount += initChannelHistory(ChannelType.FWH_TOULIU_REPLY, startDate, days);
  753. totalCount += initChannelHistory(ChannelType.GZH_COOPERATE_DAILY, startDate, days);
  754. totalCount += initChannelHistory(ChannelType.GZH_BUY_ACCOUNT, startDate, days);
  755. totalCount += initChannelHistory(ChannelType.GZH_OPERATION_DAILY, startDate, days);
  756. log.info("历史数据初始化完成, 共插入{}条记录", totalCount);
  757. return new ReturnT<>(ReturnT.SUCCESS_CODE, "初始化完成,共插入" + totalCount + "条记录");
  758. } catch (Exception e) {
  759. log.error("初始化历史数据异常", e);
  760. return new ReturnT<>(ReturnT.FAIL_CODE, "初始化失败:" + e.getMessage());
  761. }
  762. }
  763. /**
  764. * 初始化指定渠道的历史数据
  765. *
  766. * @param channelType 渠道类型
  767. * @param startDate 起始日期
  768. * @return 插入的记录数
  769. */
  770. private int initChannelHistory(ChannelType channelType, Date startDate, int days) {
  771. log.info("初始化渠道历史数据, channelType={}, startDate={}", channelType, startDate);
  772. List<String> rootSourceIds = new ArrayList<>();
  773. Date now = new Date();
  774. switch (channelType) {
  775. case MINIAPP_TOULIU:
  776. // 小程序投流: 历史已手动导入一年
  777. break;
  778. case GZH_COOPERATE_JIZHUAN:
  779. // 公众号合作-即转:从cgi_reply_bucket_data查询dyyjs_开头的rootSourceId
  780. rootSourceIds = queryRootSourceIdsFromCgiReply(PREFIX_DYYJS, startDate);
  781. break;
  782. case QW_COOPERATE:
  783. // 群/企微合作:从content_platform_qw_plan查询
  784. rootSourceIds = queryRootSourceIdsFromQwPlan(startDate);
  785. break;
  786. case GZH_TOULIU:
  787. // 公众号投流:从cgi_reply_bucket_data查询多种前缀
  788. rootSourceIds.addAll(queryRootSourceIdsFromCgiReply(PREFIX_TOULIU_TENCENTGZH, startDate));
  789. rootSourceIds.addAll(queryRootSourceIdsFromCgiReply(PREFIX_TOULIU_TENCENTGZHARTICLE, startDate));
  790. rootSourceIds.addAll(queryRootSourceIdsFromCgiReply(PREFIX_GZHTOULIU_ARTICLES, startDate));
  791. break;
  792. case FWH_COOPERATE_DAILY:
  793. // 服务号合作-Daily:从content_platform_gzh_plan_video查询
  794. rootSourceIds = queryRootSourceIdsFromGzhPlanVideo(PREFIX_FWHHZDYy, startDate);
  795. rootSourceIds.addAll(queryRootSourceIdsFromGzhPlanVideo(PREFIX_FWHDYY, startDate));
  796. break;
  797. case FWH_TOULIU_REPLY:
  798. // 服务号投流-即转:从cgi_reply_bucket_data查询fwhtouliu_tencentgzh开头
  799. rootSourceIds = queryRootSourceIdsFromCgiReply(PREFIX_FWHTOULIU_TENCENTGZH, startDate);
  800. break;
  801. case FWH_TOULIU_DAILY:
  802. // 服务号投流-Daily:从aigc getNearDaysRootSourceIds接口查询fwhtouliu_开头
  803. rootSourceIds = queryRootSourceIdsFromAigc(PREFIX_FWHTOULIU, days);
  804. break;
  805. case GZH_COOPERATE_DAILY:
  806. // 公众号合作-Daily:从content_platform_gzh_plan_video查询longArticles_开头
  807. rootSourceIds = queryRootSourceIdsFromGzhPlanVideo(PREFIX_LONGARTICLES, startDate);
  808. break;
  809. case GZH_BUY_ACCOUNT:
  810. // 公众号买号:从aigc getNearDaysRootSourceIds接口查询longArticles_inner_开头
  811. rootSourceIds = queryRootSourceIdsFromAigc(PREFIX_LONGARTICLES, days);
  812. break;
  813. case GZH_OPERATION_DAILY:
  814. // 公众号代运营-Daily:从aigc getNearDaysRootSourceIds接口查询longArticles_inner_开头
  815. rootSourceIds = queryRootSourceIdsFromAigc(PREFIX_LONGARTICLES, days);
  816. break;
  817. default:
  818. return 0;
  819. }
  820. if (CollectionUtils.isEmpty(rootSourceIds)) {
  821. log.info("渠道{}未找到历史数据", channelType);
  822. return 0;
  823. }
  824. log.info("渠道{}找到{}条历史数据记录", channelType, rootSourceIds.size());
  825. // 构建待插入记录列表
  826. List<ExternalChannel> records = new ArrayList<>();
  827. for (String rootSourceId : rootSourceIds) {
  828. ExternalChannel record = new ExternalChannel();
  829. record.setRootSourceId(rootSourceId);
  830. record.setStatus(ExternalChannelStatusEnum.PENDING.getVal());
  831. record.setIsDelete(0);
  832. record.setCreateTime(now);
  833. record.setUpdateTime(now);
  834. records.add(record);
  835. }
  836. // 分批插入
  837. int insertedCount = 0;
  838. for (int i = 0; i < records.size(); i += BATCH_INSERT_SIZE) {
  839. int end = Math.min(i + BATCH_INSERT_SIZE, records.size());
  840. List<ExternalChannel> batch = records.subList(i, end);
  841. // 批量查询已存在的rootSourceId,过滤掉已存在的记录
  842. List<String> batchRootSourceIds = batch.stream()
  843. .map(ExternalChannel::getRootSourceId)
  844. .collect(Collectors.toList());
  845. List<String> existingIds = externalChannelMapperExt.selectExistingRootSourceIds(batchRootSourceIds);
  846. List<ExternalChannel> filteredBatch = batch;
  847. if (!CollectionUtils.isEmpty(existingIds)) {
  848. filteredBatch = batch.stream()
  849. .filter(r -> !existingIds.contains(r.getRootSourceId()))
  850. .collect(Collectors.toList());
  851. }
  852. if (CollectionUtils.isEmpty(filteredBatch)) {
  853. log.info("渠道{}本批全部已存在, 跳过", channelType);
  854. continue;
  855. }
  856. int count = externalChannelMapperExt.batchInsertIgnore(filteredBatch);
  857. insertedCount += count;
  858. log.info("渠道{}批次插入完成, 本批{}, 过滤已存在{}, 累计插入{}", channelType, filteredBatch.size(), existingIds != null ? existingIds.size() : 0, insertedCount);
  859. }
  860. log.info("渠道{}历史数据初始化完成, 插入{}条", channelType, insertedCount);
  861. return insertedCount;
  862. }
  863. /**
  864. * 从aigc getNearDaysRootSourceIds接口查询指定前缀的rootSourceId列表
  865. *
  866. * @param prefix 前缀
  867. * @param days 天数
  868. * @return rootSourceId列表
  869. */
  870. private List<String> queryRootSourceIdsFromAigc(String prefix, int days) {
  871. List<String> result = new ArrayList<>();
  872. try {
  873. List<String> list = recommendApiService.getArticleByRootSourceId(prefix, days);
  874. if (!CollectionUtils.isEmpty(list)) {
  875. result = list;
  876. }
  877. } catch (Exception e) {
  878. log.error("从aigc查询历史数据失败, prefix={}, days={}", prefix, days, e);
  879. }
  880. return result;
  881. }
  882. /**
  883. * 从cgi_reply_bucket_data表查询指定前缀的rootSourceId列表
  884. *
  885. * @param prefix 前缀
  886. * @param startDate 起始日期
  887. * @return rootSourceId列表
  888. */
  889. private List<String> queryRootSourceIdsFromCgiReply(String prefix, Date startDate) {
  890. List<String> result = new ArrayList<>();
  891. try {
  892. //CgiReplyBucketDataExample example = new CgiReplyBucketDataExample();
  893. //example.createCriteria()
  894. // .andCreateTimeGreaterThanOrEqualTo(startDate)
  895. // .andRootSourceIdLike(prefix + "%");
  896. //List<CgiReplyBucketData> list = cgiReplyBucketDataMapper.selectByExample(example);
  897. //if (!CollectionUtils.isEmpty(list)) {
  898. // result = list.stream().map(CgiReplyBucketData::getRootSourceId).collect(Collectors.toList());
  899. //}
  900. result = cgiReplyBucketDataMapperExt.getRootSourceIdByPrefix(prefix, startDate);
  901. } catch (Exception e) {
  902. log.error("从cgi_reply_bucket_data查询历史数据失败, prefix={}", prefix, e);
  903. }
  904. return result;
  905. }
  906. /**
  907. * 从content_platform_qw_plan表查询rootSourceId列表
  908. *
  909. * @param startDate 起始日期
  910. * @return rootSourceId列表
  911. */
  912. private List<String> queryRootSourceIdsFromQwPlan(Date startDate) {
  913. List<String> result = new ArrayList<>();
  914. try {
  915. ContentPlatformQwPlanExample example = new ContentPlatformQwPlanExample();
  916. example.createCriteria()
  917. .andStatusEqualTo(1)
  918. .andRootSourceIdIsNotNull()
  919. .andCreateTimestampGreaterThanOrEqualTo(startDate.getTime());
  920. List<ContentPlatformQwPlan> list = qwPlanMapper.selectByExample(example);
  921. if (!CollectionUtils.isEmpty(list)) {
  922. for (ContentPlatformQwPlan plan : list) {
  923. if (StringUtils.isNotBlank(plan.getRootSourceId())) {
  924. result.add(plan.getRootSourceId());
  925. }
  926. }
  927. }
  928. } catch (Exception e) {
  929. log.error("从content_platform_qw_plan查询历史数据失败", e);
  930. }
  931. return result;
  932. }
  933. /**
  934. * 从content_platform_gzh_plan_video表查询指定前缀的rootSourceId列表
  935. *
  936. * @param prefix 前缀
  937. * @param startDate 起始日期
  938. * @return rootSourceId列表
  939. */
  940. private List<String> queryRootSourceIdsFromGzhPlanVideo(String prefix, Date startDate) {
  941. List<String> result = new ArrayList<>();
  942. try {
  943. ContentPlatformGzhPlanVideoExample example = new ContentPlatformGzhPlanVideoExample();
  944. example.createCriteria()
  945. .andRootSourceIdLike(prefix + "%")
  946. .andCreateTimestampGreaterThanOrEqualTo(startDate.getTime());
  947. List<ContentPlatformGzhPlanVideo> list = gzhPlanVideoMapper.selectByExample(example);
  948. if (!CollectionUtils.isEmpty(list)) {
  949. for (ContentPlatformGzhPlanVideo video : list) {
  950. if (StringUtils.isNotBlank(video.getRootSourceId())) {
  951. result.add(video.getRootSourceId());
  952. }
  953. }
  954. }
  955. } catch (Exception e) {
  956. log.error("从content_platform_gzh_plan_video查询历史数据失败, prefix={}", prefix, e);
  957. }
  958. return result;
  959. }
  960. }