Przeglądaj źródła

ExternalChannel 视频ID 写入

wangyunpeng 1 miesiąc temu
rodzic
commit
209dcb8113

+ 1 - 1
api-module/src/main/java/com/tzld/piaoquan/api/component/AdApiService.java

@@ -157,7 +157,7 @@ public class AdApiService {
                 return data.getLong("videoId");
             }
         } catch (Exception e) {
-            log.error("获取人群包信息失败, rootSourceId={}", rootSourceId, e);
+            log.error("获取tencentFlow videoId失败, rootSourceId={}", rootSourceId, e);
         }
         return null;
     }

+ 7 - 0
api-module/src/main/java/com/tzld/piaoquan/api/controller/ExternalController.java

@@ -43,4 +43,11 @@ public class ExternalController {
         return CommonResponse.success();
     }
 
+    @ApiOperation(value = "历史数据导入")
+    @GetMapping("/job/fillExternalChannelVideoId")
+    public CommonResponse<Void> fillExternalChannelVideoId() {
+        job.fillExternalChannelVideoId(null);
+        return CommonResponse.success();
+    }
+
 }

+ 16 - 0
api-module/src/main/java/com/tzld/piaoquan/api/dao/mapper/contentplatform/ExternalChannelMapperExt.java

@@ -52,4 +52,20 @@ public interface ExternalChannelMapperExt {
      * @return 插入成功的记录数
      */
     int batchInsertIgnore(@Param("list") List<ExternalChannel> list);
+
+    /**
+     * 查询 video_id 为空的记录列表
+     * 用于历史数据补全 video_id
+     * @param lastId 上一页最后一条记录的id,首次传0
+     * @param limit 查询条数限制
+     * @return video_id 为空的记录列表
+     */
+    List<ExternalChannel> selectEmptyVideoIdList(@Param("lastId") long lastId, @Param("limit") int limit);
+
+    /**
+     * 批量更新记录的 video_id
+     * @param list 待更新记录列表(需包含 id 和 videoId)
+     * @return 更新成功的记录数
+     */
+    int batchUpdateVideoId(@Param("list") List<ExternalChannel> list);
 } 

+ 323 - 2
api-module/src/main/java/com/tzld/piaoquan/api/job/ExternalChannelProcessJob.java

@@ -671,7 +671,7 @@ public class ExternalChannelProcessJob {
         try {
             // 查询content_platform_gzh_plan_video
             ContentPlatformGzhPlanVideoExample videoExample = new ContentPlatformGzhPlanVideoExample();
-            videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1);
+            videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId);
             List<ContentPlatformGzhPlanVideo> videoList = gzhPlanVideoMapper.selectByExample(videoExample);
 
             if (!CollectionUtils.isEmpty(videoList)) {
@@ -807,7 +807,7 @@ public class ExternalChannelProcessJob {
         try {
             // 查询content_platform_gzh_plan_video
             ContentPlatformGzhPlanVideoExample videoExample = new ContentPlatformGzhPlanVideoExample();
-            videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1);
+            videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId);
             List<ContentPlatformGzhPlanVideo> videoList = gzhPlanVideoMapper.selectByExample(videoExample);
 
             if (!CollectionUtils.isEmpty(videoList)) {
@@ -1202,4 +1202,325 @@ public class ExternalChannelProcessJob {
         return result;
     }
 
+    /**
+     * 批量更新批次大小
+     */
+    private static final int BATCH_UPDATE_SIZE = 100;
+
+    /**
+     * video_id 更新结果封装类
+     */
+    private static class VideoIdUpdateResult {
+        final Long recordId;
+        final Long videoId;
+        final ExternalChannel record;
+
+        VideoIdUpdateResult(Long recordId, Long videoId, ExternalChannel record) {
+            this.recordId = recordId;
+            this.videoId = videoId;
+            this.record = record;
+        }
+    }
+
+    /**
+     * 补全历史数据的 video_id
+     * 遍历 external_channel 表中 video_id 为空的记录,根据渠道类型获取 video_id 并更新
+     *
+     * @return 执行结果
+     */
+    @XxlJob("fillExternalChannelVideoId")
+    public ReturnT<String> fillExternalChannelVideoId(String param) {
+        log.info("开始补全 external_channel 历史数据的 video_id, param={}", param);
+
+        // 解析参数
+
+        int totalProcessed = 0;
+        int totalUpdated = 0;
+        int totalFailed = 0;
+        long lastId = 0;
+        List<ExternalChannel> emptyVideoIdList;
+
+        try {
+            do {
+                // 查询 video_id 为空的记录
+                emptyVideoIdList = externalChannelMapperExt.selectEmptyVideoIdList(lastId, QUERY_LIMIT);
+
+                if (CollectionUtils.isEmpty(emptyVideoIdList)) {
+                    log.info("没有更多 video_id 为空的记录");
+                    break;
+                }
+
+                // 更新 lastId
+                lastId = emptyVideoIdList.get(emptyVideoIdList.size() - 1).getId();
+                log.info("本批查询到{}条 video_id 为空的记录, lastId={}", emptyVideoIdList.size(), lastId);
+
+
+                // 并发处理本批记录
+                List<Future<VideoIdUpdateResult>> futures = new ArrayList<>();
+                for (ExternalChannel record : emptyVideoIdList) {
+                    Future<VideoIdUpdateResult> future = executor.submit(() -> {
+                        Long videoId = fetchVideoIdByChannelType(record);
+                        return new VideoIdUpdateResult(record.getId(), videoId, record);
+                    });
+                    futures.add(future);
+                }
+
+                // 等待所有任务完成并处理结果
+                for (Future<VideoIdUpdateResult> future : futures) {
+                    try {
+                        VideoIdUpdateResult result = future.get(SINGLE_RECORD_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+                        totalProcessed++;
+
+                        if (result.videoId != null && result.videoId > 0) {
+                            ExternalChannel record = result.record;
+                            record.setVideoId(result.videoId);
+                            record.setUpdateTime(new Date());
+                            externalChannelMapper.updateByPrimaryKeySelective(record);
+                            totalUpdated++;
+                            log.debug("获取到 video_id={}, id={}, rootSourceId={}",
+                                    result.videoId, result.recordId, result.record.getRootSourceId());
+                        } else {
+                            totalFailed++;
+                            log.warn("无法获取 video_id, id={}, rootSourceId={}, channel={}",
+                                    result.recordId, result.record.getRootSourceId(), result.record.getChannel());
+                        }
+                    } catch (TimeoutException e) {
+                        log.warn("获取 video_id 超时,取消任务");
+                        future.cancel(true);
+                        totalFailed++;
+                    } catch (InterruptedException e) {
+                        log.warn("处理被中断");
+                        Thread.currentThread().interrupt();
+                        totalFailed++;
+                    } catch (ExecutionException e) {
+                        log.error("任务执行异常", e.getCause());
+                        totalFailed++;
+                    }
+                }
+
+            } while (emptyVideoIdList.size() >= QUERY_LIMIT);
+
+            log.info("video_id 补全完成, 共处理{}条记录, 成功更新{}条, 失败{}条",
+                    totalProcessed, totalUpdated, totalFailed);
+            return new ReturnT<>(ReturnT.SUCCESS_CODE,
+                    String.format("处理完成,共处理%d条记录,成功更新%d条,失败%d条", totalProcessed, totalUpdated, totalFailed));
+
+        } catch (Exception e) {
+            log.error("补全 video_id 时发生异常", e);
+            return new ReturnT<>(ReturnT.FAIL_CODE, "处理失败:" + e.getMessage());
+        }
+    }
+
+    /**
+     * 根据渠道类型获取 video_id
+     *
+     * @param record external_channel 记录
+     * @return video_id,如果无法获取则返回 null
+     */
+    private Long fetchVideoIdByChannelType(ExternalChannel record) {
+        String rootSourceId = record.getRootSourceId();
+        String channel = record.getChannel();
+
+        if (StringUtils.isBlank(rootSourceId)) {
+            return null;
+        }
+
+        try {
+            // 根据渠道类型获取 video_id
+            if (StringUtils.isNotBlank(channel)) {
+                // 根据 channel 字段判断渠道类型
+                if (channel.contains("小程序投流")) {
+                    return fetchVideoIdForMiniAppTouliu(rootSourceId);
+                } else if (channel.contains("公众号合作-即转")) {
+                    return fetchVideoIdForGzhCooperateReply(rootSourceId);
+                } else if (channel.contains("群/企微合作")) {
+                    return fetchVideoIdForQwCooperate(rootSourceId);
+                } else if (channel.contains("公众号投流")) {
+                    return fetchVideoIdForGzhTouliu(rootSourceId);
+                } else if (channel.contains("服务号合作-Daily")) {
+                    return fetchVideoIdForFwhCooperateDaily(rootSourceId);
+                } else if (channel.contains("服务号投流-Daily")) {
+                    return fetchVideoIdForFwhTouliuDaily(rootSourceId);
+                } else if (channel.contains("服务号投流-即转")) {
+                    return fetchVideoIdForFwhTouliuReply(rootSourceId);
+                } else if (channel.contains("公众号合作-Daily")) {
+                    return fetchVideoIdForGzhCooperateDaily(rootSourceId);
+                } else if (channel.contains("公众号买号")) {
+                    return fetchVideoIdForGzhBuyAccount(rootSourceId);
+                } else if (channel.contains("公众号代运营")) {
+                    return fetchVideoIdForGzhOperationDaily(rootSourceId);
+                }
+            }
+        } catch (Exception e) {
+            log.error("获取 video_id 异常, id={}, rootSourceId={}", record.getId(), rootSourceId, e);
+            return null;
+        }
+        return null;
+    }
+
+    /**
+     * 小程序投流 - 获取 video_id
+     */
+    private Long fetchVideoIdForMiniAppTouliu(String rootSourceId) {
+        try {
+            return adApiService.getVideoIdByRootSourceId(rootSourceId);
+        } catch (Exception e) {
+            log.error("小程序投流获取 video_id 失败, rootSourceId={}", rootSourceId, e);
+            return null;
+        }
+    }
+
+    /**
+     * 公众号合作-即转 - 获取 video_id
+     */
+    private Long fetchVideoIdForGzhCooperateReply(String rootSourceId) {
+        try {
+            CgiReplyBucketDataExample example = new CgiReplyBucketDataExample();
+            example.createCriteria().andRootSourceIdEqualTo(rootSourceId);
+            example.setOrderByClause("id desc");
+            List<CgiReplyBucketData> bucketDataList = cgiReplyBucketDataMapper.selectByExample(example);
+
+            if (!CollectionUtils.isEmpty(bucketDataList)) {
+                return bucketDataList.get(0).getMiniVideoId();
+            }
+        } catch (Exception e) {
+            log.error("公众号合作-即转获取 video_id 失败, rootSourceId={}", rootSourceId, e);
+        }
+        return null;
+    }
+
+    /**
+     * 群/企微合作 - 获取 video_id
+     */
+    private Long fetchVideoIdForQwCooperate(String rootSourceId) {
+        try {
+            ContentPlatformQwPlanExample example = new ContentPlatformQwPlanExample();
+            example.createCriteria().andRootSourceIdEqualTo(rootSourceId).andStatusEqualTo(1);
+            List<ContentPlatformQwPlan> planList = qwPlanMapper.selectByExample(example);
+
+            if (!CollectionUtils.isEmpty(planList)) {
+                ContentPlatformQwPlan plan = planList.get(0);
+                ContentPlatformQwPlanVideoExample videoExample = new ContentPlatformQwPlanVideoExample();
+                videoExample.createCriteria().andPlanIdEqualTo(plan.getId());
+                List<ContentPlatformQwPlanVideo> videoList = qwPlanVideoMapper.selectByExample(videoExample);
+                if (!CollectionUtils.isEmpty(videoList)) {
+                    return videoList.get(0).getVideoId();
+                }
+            }
+        } catch (Exception e) {
+            log.error("群/企微合作获取 video_id 失败, rootSourceId={}", rootSourceId, e);
+        }
+        return null;
+    }
+
+    /**
+     * 公众号投流 - 获取 video_id
+     */
+    private Long fetchVideoIdForGzhTouliu(String rootSourceId) {
+        try {
+            CgiReplyBucketDataExample example = new CgiReplyBucketDataExample();
+            example.createCriteria().andRootSourceIdEqualTo(rootSourceId);
+            example.setOrderByClause("id desc");
+            List<CgiReplyBucketData> bucketDataList = cgiReplyBucketDataMapper.selectByExample(example);
+
+            if (!CollectionUtils.isEmpty(bucketDataList)) {
+                return bucketDataList.get(0).getMiniVideoId();
+            }
+        } catch (Exception e) {
+            log.error("公众号投流获取 video_id 失败, rootSourceId={}", rootSourceId, e);
+        }
+        return null;
+    }
+
+    /**
+     * 服务号合作-Daily - 获取 video_id
+     */
+    private Long fetchVideoIdForFwhCooperateDaily(String rootSourceId) {
+        try {
+            ContentPlatformGzhPlanVideoExample videoExample = new ContentPlatformGzhPlanVideoExample();
+            videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId);
+            List<ContentPlatformGzhPlanVideo> videoList = gzhPlanVideoMapper.selectByExample(videoExample);
+
+            if (!CollectionUtils.isEmpty(videoList)) {
+                return videoList.get(0).getVideoId();
+            }
+        } catch (Exception e) {
+            log.error("服务号合作-Daily获取 video_id 失败, rootSourceId={}", rootSourceId, e);
+        }
+        return null;
+    }
+
+    /**
+     * 服务号投流-Daily - 获取 video_id
+     */
+    private Long fetchVideoIdForFwhTouliuDaily(String rootSourceId) {
+        try {
+            return recommendApiService.getVideoIdByRootSourceId(rootSourceId);
+        } catch (Exception e) {
+            log.error("服务号投流-Daily获取 video_id 失败, rootSourceId={}", rootSourceId, e);
+            return null;
+        }
+    }
+
+    /**
+     * 服务号投流-即转 - 获取 video_id
+     */
+    private Long fetchVideoIdForFwhTouliuReply(String rootSourceId) {
+        try {
+            CgiReplyBucketDataExample example = new CgiReplyBucketDataExample();
+            example.createCriteria().andRootSourceIdEqualTo(rootSourceId);
+            example.setOrderByClause("id desc");
+            List<CgiReplyBucketData> bucketDataList = cgiReplyBucketDataMapper.selectByExample(example);
+
+            if (!CollectionUtils.isEmpty(bucketDataList)) {
+                return bucketDataList.get(0).getMiniVideoId();
+            }
+        } catch (Exception e) {
+            log.error("服务号投流-即转获取 video_id 失败, rootSourceId={}", rootSourceId, e);
+        }
+        return null;
+    }
+
+    /**
+     * 公众号合作-Daily - 获取 video_id
+     */
+    private Long fetchVideoIdForGzhCooperateDaily(String rootSourceId) {
+        try {
+            ContentPlatformGzhPlanVideoExample videoExample = new ContentPlatformGzhPlanVideoExample();
+            videoExample.createCriteria().andRootSourceIdEqualTo(rootSourceId);
+            List<ContentPlatformGzhPlanVideo> videoList = gzhPlanVideoMapper.selectByExample(videoExample);
+
+            if (!CollectionUtils.isEmpty(videoList)) {
+                return videoList.get(0).getVideoId();
+            }
+        } catch (Exception e) {
+            log.error("公众号合作-Daily获取 video_id 失败, rootSourceId={}", rootSourceId, e);
+        }
+        return null;
+    }
+
+    /**
+     * 公众号买号 - 获取 video_id
+     */
+    private Long fetchVideoIdForGzhBuyAccount(String rootSourceId) {
+        try {
+            return recommendApiService.getVideoIdByRootSourceId(rootSourceId);
+        } catch (Exception e) {
+            log.error("公众号买号获取 video_id 失败, rootSourceId={}", rootSourceId, e);
+            return null;
+        }
+    }
+
+    /**
+     * 公众号代运营-Daily - 获取 video_id
+     */
+    private Long fetchVideoIdForGzhOperationDaily(String rootSourceId) {
+        try {
+            return recommendApiService.getVideoIdByRootSourceId(rootSourceId);
+        } catch (Exception e) {
+            log.error("公众号代运营-Daily获取 video_id 失败, rootSourceId={}", rootSourceId, e);
+            return null;
+        }
+    }
+
 }

+ 22 - 0
api-module/src/main/resources/mapper/contentplatform/ext/ExternalChannelMapperExt.xml

@@ -85,4 +85,26 @@
         </foreach>
     </insert>
 
+    <!-- 查询 video_id 为空的记录列表 -->
+    <select id="selectEmptyVideoIdList" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List" />
+        from external_channel
+        where is_delete = 0
+        and (video_id is null or video_id = 0)
+        and id > #{lastId}
+        order by id asc
+        limit #{limit}
+    </select>
+
+    <!-- 批量更新记录的 video_id -->
+    <update id="batchUpdateVideoId" parameterType="java.util.List">
+        <foreach collection="list" item="item" separator=";">
+            update external_channel
+            set video_id = #{item.videoId,jdbcType=BIGINT},
+                update_time = NOW()
+            where id = #{item.id,jdbcType=BIGINT}
+        </foreach>
+    </update>
+
 </mapper>