|
|
@@ -13,9 +13,11 @@ import com.tzld.videoVector.model.po.pgVector.ChannelDemandMatchResultExample;
|
|
|
import com.tzld.videoVector.model.vo.RecallVideoScoreVO;
|
|
|
import com.tzld.videoVector.service.VideoSearchService;
|
|
|
import com.tzld.videoVector.util.OdpsUtil;
|
|
|
+import com.tzld.videoVector.util.VectorUtils;
|
|
|
import com.xxl.job.core.biz.model.ReturnT;
|
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
import org.springframework.util.StringUtils;
|
|
|
@@ -58,6 +60,23 @@ public class ChannelDemandMatchJob {
|
|
|
*/
|
|
|
private static final int MATCH_THREAD_POOL_SIZE = 5;
|
|
|
|
|
|
+ /**
|
|
|
+ * 先验需求-场景 数据源策略标识
|
|
|
+ */
|
|
|
+ private static final String DIMENSION_STAT_STRATEGY = "先验需求-场景";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 先验需求-场景 数据源的ROV阈值 (默认3%)
|
|
|
+ */
|
|
|
+ @Value("${channel.demand.dimension-stat.min-rov:0.03}")
|
|
|
+ private double dimensionStatMinRov;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 先验需求-场景 数据源的UV占比阈值 (默认0.2%)
|
|
|
+ */
|
|
|
+ @Value("${channel.demand.dimension-stat.min-uv-ratio:0.002}")
|
|
|
+ private double dimensionStatMinUvRatio;
|
|
|
+
|
|
|
/**
|
|
|
* 多渠道配置并发执行线程池
|
|
|
*/
|
|
|
@@ -144,6 +163,12 @@ public class ChannelDemandMatchJob {
|
|
|
// 1. 先清理该渠道+日期的历史数据(支持重跑)
|
|
|
deleteExistingResults(config.getId(), dt);
|
|
|
|
|
|
+ // 如果是先验需求-场景数据源,走独立的处理逻辑(不需要向量召回)
|
|
|
+ if (DIMENSION_STAT_STRATEGY.equals(config.getDemandStrategy())) {
|
|
|
+ processDimensionStatSource(config, dt, totalDemands, totalMatched, totalFailed);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
// 2. 构造ODPS SQL并查询需求数据
|
|
|
String sql = buildDemandSql(config, dt, minUv, minRov);
|
|
|
log.info("查询ODPS需求, 渠道: {}, sql长度: {}", channelName, sql.length());
|
|
|
@@ -396,6 +421,139 @@ public class ChannelDemandMatchJob {
|
|
|
return StringUtils.hasText(value) && !"-".equals(value.trim());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 处理先验需求-场景数据源:从dwd_channel_element_dimension_stat查询,直接入库(不需向量召回)
|
|
|
+ */
|
|
|
+ private void processDimensionStatSource(ChannelDemandMatchConfig config, String dt,
|
|
|
+ AtomicInteger totalDemands, AtomicInteger totalMatched, AtomicInteger totalFailed) {
|
|
|
+ log.info("开始处理先验需求-场景数据源, configId={}, dt={}", config.getId(), dt);
|
|
|
+
|
|
|
+ String sql = buildDimensionStatSql(config, dt);
|
|
|
+ log.info("先验需求-场景ODPS SQL长度: {}", sql.length());
|
|
|
+
|
|
|
+ List<ChannelDemandMatchResult> results = new ArrayList<>();
|
|
|
+
|
|
|
+ OdpsUtil.getOdpsDataStream(sql, record -> {
|
|
|
+ try {
|
|
|
+ ChannelDemandMatchResult result = new ChannelDemandMatchResult();
|
|
|
+ result.setConfigId(config.getId());
|
|
|
+ result.setDt(dt);
|
|
|
+ result.setDemandStrategy(DIMENSION_STAT_STRATEGY);
|
|
|
+
|
|
|
+ // 需求维度字段
|
|
|
+ result.setChannelName(record.getString("渠道类"));
|
|
|
+ result.setCrowdSegment(record.getString("人群细分"));
|
|
|
+ result.setChannelLevel3(record.getString("三级渠道"));
|
|
|
+ result.setDimension(record.getString("维度"));
|
|
|
+ result.setStandardElement(record.getString("标准化元素"));
|
|
|
+ result.setCategoryName(record.getString("分类名称"));
|
|
|
+
|
|
|
+ // 统计指标(总访问uv/总uv占比 被 访问uv/uv占比 覆盖,最终取行级指标)
|
|
|
+ result.setVisitUv(safeGetLong(record, "访问uv"));
|
|
|
+ result.setUvRatio(safeGetDouble(record, "uv占比"));
|
|
|
+ result.setTotalRov(safeGetDouble(record, "总rov"));
|
|
|
+ result.setMatchRov(safeGetDouble(record, "rov"));
|
|
|
+
|
|
|
+ // 计算综合评分:sim默认1,rov取totalRov
|
|
|
+ double rov = result.getMatchRov() != null ? result.getMatchRov() : 0.0;
|
|
|
+ result.setMatchScore(VectorUtils.calculateScore(1.0, rov));
|
|
|
+ result.setMatchSim(1.0);
|
|
|
+ result.setMatchRov(rov);
|
|
|
+
|
|
|
+ // 视频信息:该表已有videoid,直接作为匹配结果
|
|
|
+ String videoIdStr = record.getString("videoid");
|
|
|
+ if (StringUtils.hasText(videoIdStr)) {
|
|
|
+ try {
|
|
|
+ result.setMatchVideoId(Long.parseLong(videoIdStr.trim()));
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ log.warn("先验需求-场景 videoid解析失败: {}", videoIdStr);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ return; // 无videoid则跳过
|
|
|
+ }
|
|
|
+
|
|
|
+ // 标题同时映射到matchText和demandContentTitle
|
|
|
+ String title = record.getString("标题");
|
|
|
+ result.setMatchText(title);
|
|
|
+ result.setDemandContentTitle(title);
|
|
|
+
|
|
|
+ result.setMatchConfigCode("DIMENSION_STAT");
|
|
|
+ result.setMatchStatus((short) 1); // 已匹配(无需向量召回)
|
|
|
+
|
|
|
+ // 生成确定性实验ID
|
|
|
+ result.setExperimentId(generateExperimentId(result, result.getMatchVideoId(), result.getMatchConfigCode()));
|
|
|
+
|
|
|
+ synchronized (results) {
|
|
|
+ results.add(result);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("解析先验需求-场景ODPS记录失败: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ log.info("先验需求-场景数据源查询到 {} 条记录", results.size());
|
|
|
+ totalDemands.addAndGet(results.size());
|
|
|
+
|
|
|
+ if (results.isEmpty()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 批量写入
|
|
|
+ for (List<ChannelDemandMatchResult> partition : Lists.partition(results, 1000)) {
|
|
|
+ resultMapperExt.batchInsert(partition);
|
|
|
+ }
|
|
|
+ totalMatched.addAndGet(results.size());
|
|
|
+ log.info("先验需求-场景数据源写入完成, 共 {} 条", results.size());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 构造先验需求-场景数据源的ODPS SQL
|
|
|
+ * 查询loghubods.dwd_channel_element_dimension_stat,过滤rov>3%且uv占比>0.2%
|
|
|
+ */
|
|
|
+ private String buildDimensionStatSql(ChannelDemandMatchConfig config, String dt) {
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
+ sb.append("SELECT DISTINCT dt");
|
|
|
+ sb.append(",渠道类");
|
|
|
+ sb.append(",人群细分");
|
|
|
+ sb.append(",三级渠道");
|
|
|
+ sb.append(",维度");
|
|
|
+ sb.append(",标准化元素");
|
|
|
+ sb.append(",分类名称");
|
|
|
+ sb.append(",总访问uv");
|
|
|
+ sb.append(",总uv占比");
|
|
|
+ sb.append(",总str");
|
|
|
+ sb.append(",总rov");
|
|
|
+ sb.append(",videoid");
|
|
|
+ sb.append(",标题");
|
|
|
+ sb.append(",`merge二级品类`");
|
|
|
+ sb.append(",访问uv");
|
|
|
+ sb.append(",访问pv");
|
|
|
+ sb.append(",单层分享pv");
|
|
|
+ sb.append(",拉回uv");
|
|
|
+ sb.append(",uv占比");
|
|
|
+ sb.append(",str");
|
|
|
+ sb.append(",rov");
|
|
|
+ sb.append(",全局分发pv");
|
|
|
+ sb.append(",全局分发回流uv");
|
|
|
+ sb.append(",全局总回流uv");
|
|
|
+ sb.append(",全局rov");
|
|
|
+ sb.append(",rank");
|
|
|
+ sb.append(" FROM loghubods.dwd_channel_element_dimension_stat");
|
|
|
+ sb.append(" WHERE dt = '").append(dt).append("'");
|
|
|
+ // 过滤条件:rov > 3% 且 uv占比 > 0.2%
|
|
|
+ sb.append(" AND rov > ").append(dimensionStatMinRov);
|
|
|
+ sb.append(" AND uv占比 > ").append(dimensionStatMinUvRatio);
|
|
|
+ sb.append(" AND `merge二级品类` not in ('早中晚好','节日祝福') ");
|
|
|
+ // 渠道筛选
|
|
|
+ if (StringUtils.hasText(config.getChannelName())) {
|
|
|
+ sb.append(" AND 渠道类 = '").append(config.getChannelName().replace("'", "''")).append("'");
|
|
|
+ }
|
|
|
+ sb.append(" ORDER BY 总rov DESC,全局rov DESC");
|
|
|
+ sb.append(";");
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* 构造ODPS查询需求SQL(返回所有字段)
|
|
|
*/
|