|
@@ -43,8 +43,8 @@ public class VideoDetailSyncJob {
|
|
|
/** ODPS SQL IN 子句批次大小 */
|
|
/** ODPS SQL IN 子句批次大小 */
|
|
|
private static final int ODPS_BATCH_SIZE = 1000;
|
|
private static final int ODPS_BATCH_SIZE = 1000;
|
|
|
|
|
|
|
|
- /** 查询近一年数据的 dt 偏移天数 */
|
|
|
|
|
- private static final int DT_RANGE_DAYS = 365;
|
|
|
|
|
|
|
+ /** 支持的日期维度列表(同步时查询所有维度并存储,使用时通过Apollo配置选择) */
|
|
|
|
|
+ private static final List<Integer> SUPPORTED_DATE_RANGES = Arrays.asList(3, 7, 15, 365);
|
|
|
|
|
|
|
|
// ========================== 维度字段(取最新日期) ==========================
|
|
// ========================== 维度字段(取最新日期) ==========================
|
|
|
|
|
|
|
@@ -62,8 +62,8 @@ public class VideoDetailSyncJob {
|
|
|
|
|
|
|
|
// ========================== 聚合指标 ==========================
|
|
// ========================== 聚合指标 ==========================
|
|
|
|
|
|
|
|
- /** 聚合指标 SQL 片段(参照 query_video_dimension_metrics.py 的核心指标) */
|
|
|
|
|
- private static final String METRICS_SQL = String.join(",\n",
|
|
|
|
|
|
|
+ /** 聚合指标 SQL 片段模板(参照 query_video_dimension_metrics.py 的核心指标) */
|
|
|
|
|
+ private static final List<String> METRICS_EXPRESSIONS = Arrays.asList(
|
|
|
"sum(当日分发曝光pv) as 分发曝光pv",
|
|
"sum(当日分发曝光pv) as 分发曝光pv",
|
|
|
"sum(累计分享回流uv) AS 总回流",
|
|
"sum(累计分享回流uv) AS 总回流",
|
|
|
"sum(当日分发回流uv)/(sum(当日分发曝光pv)+100) as rov",
|
|
"sum(当日分发回流uv)/(sum(当日分发曝光pv)+100) as rov",
|
|
@@ -89,6 +89,7 @@ public class VideoDetailSyncJob {
|
|
|
/**
|
|
/**
|
|
|
* 同步视频基础信息到 Redis
|
|
* 同步视频基础信息到 Redis
|
|
|
* XxlJob handler: syncVideoDetailJob
|
|
* XxlJob handler: syncVideoDetailJob
|
|
|
|
|
+ * 查询所有日期维度(3天、7天、15天、365天)的指标数据并全部存储
|
|
|
*/
|
|
*/
|
|
|
@XxlJob("syncVideoDetailJob")
|
|
@XxlJob("syncVideoDetailJob")
|
|
|
public ReturnT<String> syncVideoDetailJob(String param) {
|
|
public ReturnT<String> syncVideoDetailJob(String param) {
|
|
@@ -103,19 +104,19 @@ public class VideoDetailSyncJob {
|
|
|
}
|
|
}
|
|
|
log.info("查询到 {} 个不重复的 video_id", allVideoIds.size());
|
|
log.info("查询到 {} 个不重复的 video_id", allVideoIds.size());
|
|
|
|
|
|
|
|
- // 计算 dt 范围(近一年)
|
|
|
|
|
String dtMax = LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
|
|
String dtMax = LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
|
|
|
- String dtMin = LocalDate.now().minusDays(DT_RANGE_DAYS).format(DateTimeFormatter.BASIC_ISO_DATE);
|
|
|
|
|
- log.info("ODPS 查询 dt 范围: {} ~ {}", dtMin, dtMax);
|
|
|
|
|
|
|
+ // 维度查询使用最大范围(365天)
|
|
|
|
|
+ String dtMin = LocalDate.now().minusDays(365).format(DateTimeFormatter.BASIC_ISO_DATE);
|
|
|
|
|
+ log.info("ODPS 查询 dt 范围: {} ~ {}, 指标日期维度: {}", dtMin, dtMax, SUPPORTED_DATE_RANGES);
|
|
|
|
|
|
|
|
- // 2. 分批查询大数据表并流式写入 Redis(拆分为维度查询 + 指标查询)
|
|
|
|
|
|
|
+ // 2. 分批查询大数据表并流式写入 Redis(拆分为维度查询 + 多维度指标查询)
|
|
|
AtomicInteger totalSuccess = new AtomicInteger(0);
|
|
AtomicInteger totalSuccess = new AtomicInteger(0);
|
|
|
AtomicInteger totalFail = new AtomicInteger(0);
|
|
AtomicInteger totalFail = new AtomicInteger(0);
|
|
|
|
|
|
|
|
for (int i = 0; i < allVideoIds.size(); i += ODPS_BATCH_SIZE) {
|
|
for (int i = 0; i < allVideoIds.size(); i += ODPS_BATCH_SIZE) {
|
|
|
int end = Math.min(i + ODPS_BATCH_SIZE, allVideoIds.size());
|
|
int end = Math.min(i + ODPS_BATCH_SIZE, allVideoIds.size());
|
|
|
List<Long> batchIds = allVideoIds.subList(i, end);
|
|
List<Long> batchIds = allVideoIds.subList(i, end);
|
|
|
- processBatchVideoDetail(batchIds, dtMin, dtMax, totalSuccess, totalFail, i, end);
|
|
|
|
|
|
|
+ processBatchVideoDetail(batchIds, dtMax, totalSuccess, totalFail, i, end);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
log.info("视频基础信息同步任务完成,总成功: {}, 总失败: {}", totalSuccess.get(), totalFail.get());
|
|
log.info("视频基础信息同步任务完成,总成功: {}, 总失败: {}", totalSuccess.get(), totalFail.get());
|
|
@@ -128,8 +129,9 @@ public class VideoDetailSyncJob {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 处理单批次视频详情的 ODPS 查询与 Redis 写入
|
|
* 处理单批次视频详情的 ODPS 查询与 Redis 写入
|
|
|
|
|
+ * DIMENSION_FIELDS 仅在 365d 时查询一次,写入所有日期维度的 Redis key
|
|
|
*/
|
|
*/
|
|
|
- private void processBatchVideoDetail(List<Long> batchIds, String dtMin, String dtMax,
|
|
|
|
|
|
|
+ private void processBatchVideoDetail(List<Long> batchIds, String dtMax,
|
|
|
AtomicInteger totalSuccess, AtomicInteger totalFail,
|
|
AtomicInteger totalSuccess, AtomicInteger totalFail,
|
|
|
int batchStart, int batchEnd) {
|
|
int batchStart, int batchEnd) {
|
|
|
try {
|
|
try {
|
|
@@ -137,17 +139,62 @@ public class VideoDetailSyncJob {
|
|
|
.map(String::valueOf)
|
|
.map(String::valueOf)
|
|
|
.collect(Collectors.joining(","));
|
|
.collect(Collectors.joining(","));
|
|
|
|
|
|
|
|
- // 1. 查询维度字段(按日期取最大那天的数据)
|
|
|
|
|
- Map<Long, JSONObject> dimensionMap = queryDimensionData(idsStr, dtMin, dtMax);
|
|
|
|
|
- log.info("批次 {}-{} 维度查询完成,获取 {} 条记录", batchStart, batchEnd, dimensionMap.size());
|
|
|
|
|
|
|
+ // 1. 查询维度字段(仅用 365 天范围查询一次,结果写入所有 key)
|
|
|
|
|
+ String dtMinFor365 = LocalDate.now().minusDays(365).format(DateTimeFormatter.BASIC_ISO_DATE);
|
|
|
|
|
+ Map<Long, JSONObject> dimensionMap = queryDimensionData(idsStr, dtMinFor365, dtMax);
|
|
|
|
|
+ log.info("批次 {}-{} 维度查询完成(365d范围),获取 {} 条记录", batchStart, batchEnd, dimensionMap.size());
|
|
|
|
|
|
|
|
- // 2. 查询聚合指标并合并写入 Redis
|
|
|
|
|
- String metricsSql = buildMetricsSql(idsStr, dtMin, dtMax);
|
|
|
|
|
- long processed = OdpsUtil.getOdpsDataStream(metricsSql, record ->
|
|
|
|
|
- mergeMetricsAndWriteRedis(record, dimensionMap, totalSuccess, totalFail)
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ // 2. 每个日期维度单独查询指标,合并维度字段后写入对应 Redis key
|
|
|
|
|
+ for (int days : SUPPORTED_DATE_RANGES) {
|
|
|
|
|
+ String dtMinForMetrics = LocalDate.now().minusDays(days).format(DateTimeFormatter.BASIC_ISO_DATE);
|
|
|
|
|
+ String metricsSql = buildMetricsSql(idsStr, dtMinForMetrics, dtMax);
|
|
|
|
|
+
|
|
|
|
|
+ // 收集该维度的指标数据
|
|
|
|
|
+ Map<Long, JSONObject> metricsMap = new HashMap<>();
|
|
|
|
|
+ OdpsUtil.getOdpsDataStream(metricsSql, record -> {
|
|
|
|
|
+ try {
|
|
|
|
|
+ Long videoId = record.getBigint("视频id");
|
|
|
|
|
+ if (videoId != null) {
|
|
|
|
|
+ metricsMap.put(videoId, buildVideoDetail(record));
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("处理{}天指标记录失败: {}", days, e.getMessage());
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 合并维度字段(来自365d查询) + 该维度指标,写入 Redis key
|
|
|
|
|
+ Set<Long> videoIdsForDay = new HashSet<>();
|
|
|
|
|
+ videoIdsForDay.addAll(dimensionMap.keySet());
|
|
|
|
|
+ videoIdsForDay.addAll(metricsMap.keySet());
|
|
|
|
|
+
|
|
|
|
|
+ for (Long videoId : videoIdsForDay) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ JSONObject detail = new JSONObject();
|
|
|
|
|
+ // 写入维度字段(365d查询结果复用到所有key)
|
|
|
|
|
+ JSONObject dimension = dimensionMap.get(videoId);
|
|
|
|
|
+ if (dimension != null) {
|
|
|
|
|
+ detail.putAll(dimension);
|
|
|
|
|
+ }
|
|
|
|
|
+ // 写入该日期维度的指标数据
|
|
|
|
|
+ JSONObject metrics = metricsMap.get(videoId);
|
|
|
|
|
+ if (metrics != null) {
|
|
|
|
|
+ detail.putAll(metrics);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (!detail.isEmpty()) {
|
|
|
|
|
+ // key格式: video:detail:3d:12345
|
|
|
|
|
+ String redisKey = VectorConstants.VIDEO_DETAIL_DAYS_KEY_PREFIX + days + "d:" + videoId;
|
|
|
|
|
+ redisUtils.set(redisKey, detail.toJSONString(),
|
|
|
|
|
+ VectorConstants.VIDEO_DETAIL_EXPIRE_SECONDS);
|
|
|
|
|
+ totalSuccess.incrementAndGet();
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("写入Redis失败,days={}, videoId={}: {}", days, videoId, e.getMessage());
|
|
|
|
|
+ totalFail.incrementAndGet();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ log.info("批次 {}-{} {}天指标查询并写入完成,处理 {} 条", batchStart, batchEnd, days, videoIdsForDay.size());
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- log.info("批次 {}-{} 指标查询完成,处理 {} 条记录", batchStart, batchEnd, processed);
|
|
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("批次 {}-{} 查询ODPS失败: {}", batchStart, batchEnd, e.getMessage(), e);
|
|
log.error("批次 {}-{} 查询ODPS失败: {}", batchStart, batchEnd, e.getMessage(), e);
|
|
|
totalFail.addAndGet(batchIds.size());
|
|
totalFail.addAndGet(batchIds.size());
|
|
@@ -173,33 +220,7 @@ public class VideoDetailSyncJob {
|
|
|
return dimensionMap;
|
|
return dimensionMap;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /**
|
|
|
|
|
- * 合并指标数据与维度数据,写入 Redis
|
|
|
|
|
- */
|
|
|
|
|
- private void mergeMetricsAndWriteRedis(com.aliyun.odps.data.Record record,
|
|
|
|
|
- Map<Long, JSONObject> dimensionMap,
|
|
|
|
|
- AtomicInteger totalSuccess, AtomicInteger totalFail) {
|
|
|
|
|
- try {
|
|
|
|
|
- Long videoId = record.getBigint("视频id");
|
|
|
|
|
- if (videoId == null) {
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- JSONObject detail = dimensionMap.getOrDefault(videoId, new JSONObject());
|
|
|
|
|
- JSONObject metrics = buildVideoDetail(record);
|
|
|
|
|
- if (metrics != null) {
|
|
|
|
|
- detail.putAll(metrics);
|
|
|
|
|
- }
|
|
|
|
|
- if (!detail.isEmpty()) {
|
|
|
|
|
- String redisKey = VectorConstants.VIDEO_DETAIL_KEY_PREFIX + videoId;
|
|
|
|
|
- redisUtils.set(redisKey, detail.toJSONString(),
|
|
|
|
|
- VectorConstants.VIDEO_DETAIL_EXPIRE_SECONDS);
|
|
|
|
|
- totalSuccess.incrementAndGet();
|
|
|
|
|
- }
|
|
|
|
|
- } catch (Exception e) {
|
|
|
|
|
- log.error("处理指标记录失败: {}", e.getMessage());
|
|
|
|
|
- totalFail.incrementAndGet();
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 构建维度字段查询 SQL
|
|
* 构建维度字段查询 SQL
|
|
@@ -224,15 +245,16 @@ public class VideoDetailSyncJob {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* 构建指标聚合查询 SQL
|
|
* 构建指标聚合查询 SQL
|
|
|
- * 只按视频id分组,聚合核心指标
|
|
|
|
|
|
|
+ * 只按视频id分组,聚合核心指标(不带日期后缀,每个维度单独存储)
|
|
|
*/
|
|
*/
|
|
|
private String buildMetricsSql(String idsStr, String dtMin, String dtMax) {
|
|
private String buildMetricsSql(String idsStr, String dtMin, String dtMax) {
|
|
|
|
|
+ String metricsSql = String.join(",\n", METRICS_EXPRESSIONS);
|
|
|
return String.format(
|
|
return String.format(
|
|
|
"SELECT 视频id, %s " +
|
|
"SELECT 视频id, %s " +
|
|
|
"FROM loghubods.video_dimension_detail_add_column " +
|
|
"FROM loghubods.video_dimension_detail_add_column " +
|
|
|
"WHERE dt >= '%s' AND dt <= '%s' AND 视频id IN (%s) " +
|
|
"WHERE dt >= '%s' AND dt <= '%s' AND 视频id IN (%s) " +
|
|
|
"GROUP BY 视频id;",
|
|
"GROUP BY 视频id;",
|
|
|
- METRICS_SQL,
|
|
|
|
|
|
|
+ metricsSql,
|
|
|
dtMin, dtMax, idsStr);
|
|
dtMin, dtMax, idsStr);
|
|
|
}
|
|
}
|
|
|
|
|
|