|
@@ -2,6 +2,7 @@ package com.tzld.videoVector.job;
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
import com.alibaba.fastjson.JSON;
|
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
import com.ctrip.framework.apollo.spring.annotation.ApolloJsonValue;
|
|
|
|
|
+import com.tzld.videoVector.api.RecommendApiService;
|
|
|
import com.tzld.videoVector.api.VideoApiService;
|
|
import com.tzld.videoVector.api.VideoApiService;
|
|
|
import com.tzld.videoVector.common.constant.VectorConstants;
|
|
import com.tzld.videoVector.common.constant.VectorConstants;
|
|
|
import com.tzld.videoVector.dao.mapper.pgVector.ChannelDemandMatchResultMapper;
|
|
import com.tzld.videoVector.dao.mapper.pgVector.ChannelDemandMatchResultMapper;
|
|
@@ -69,6 +70,9 @@ public class VideoArticleMatchJob {
|
|
|
@Resource
|
|
@Resource
|
|
|
private VideoApiService videoApiService;
|
|
private VideoApiService videoApiService;
|
|
|
|
|
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private RecommendApiService recommendApiService;
|
|
|
|
|
+
|
|
|
@Resource
|
|
@Resource
|
|
|
private RedisUtils redisUtils;
|
|
private RedisUtils redisUtils;
|
|
|
|
|
|
|
@@ -173,11 +177,18 @@ public class VideoArticleMatchJob {
|
|
|
return ReturnT.SUCCESS;
|
|
return ReturnT.SUCCESS;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // 2.1 获取各账号已发送素材 ID(用于过滤已投文章)
|
|
|
|
|
+ Map<String, Set<String>> sentSourceIds = fetchSentSourceIds(records);
|
|
|
|
|
+
|
|
|
// 3. 批量获取视频标题
|
|
// 3. 批量获取视频标题
|
|
|
Map<Long, String> videoTitleMap = fetchVideoTitles(records);
|
|
Map<Long, String> videoTitleMap = fetchVideoTitles(records);
|
|
|
|
|
|
|
|
- // 4. 视频标题 → 长文向量召回
|
|
|
|
|
- Map<Long, List<ArticleMatchItem>> videoArticleMatches = matchArticlesByTitles(videoTitleMap);
|
|
|
|
|
|
|
+ // 3.1 构建 videoId → channelLevel3 映射(用于匹配时过滤已发送文章)
|
|
|
|
|
+ Map<Long, String> videoChannelMap = buildVideoChannelMap(records);
|
|
|
|
|
+
|
|
|
|
|
+ // 4. 视频标题 → 长文向量召回(过滤已发送文章)
|
|
|
|
|
+ Map<Long, List<ArticleMatchItem>> videoArticleMatches =
|
|
|
|
|
+ matchArticlesByTitles(videoTitleMap, videoChannelMap, sentSourceIds);
|
|
|
|
|
|
|
|
// 5. 1v1 去重配对
|
|
// 5. 1v1 去重配对
|
|
|
Map<Long, ArticleMatchItem> finalPairs = dedupOneToOne(videoArticleMatches);
|
|
Map<Long, ArticleMatchItem> finalPairs = dedupOneToOne(videoArticleMatches);
|
|
@@ -233,6 +244,30 @@ public class VideoArticleMatchJob {
|
|
|
return records;
|
|
return records;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // =====================================================
|
|
|
|
|
+ // 步骤 2.1: 获取各账号已发送素材 ID
|
|
|
|
|
+ // =====================================================
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 调用外部 API 获取各账号已发送的素材 ID 集合,用于过滤已投放内容。
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param records 需求匹配记录列表
|
|
|
|
|
+ * @return channelLevel3 → 已发送素材 ID 集合
|
|
|
|
|
+ */
|
|
|
|
|
+ private Map<String, Set<String>> fetchSentSourceIds(List<ChannelDemandMatchResult> records) {
|
|
|
|
|
+ // 提取所有唯一的 channelLevel3
|
|
|
|
|
+ Set<String> uniqueAccounts = records.stream()
|
|
|
|
|
+ .map(ChannelDemandMatchResult::getChannelLevel3)
|
|
|
|
|
+ .filter(StringUtils::hasText)
|
|
|
|
|
+ .collect(Collectors.toSet());
|
|
|
|
|
+
|
|
|
|
|
+ Map<String, Set<String>> result = recommendApiService.getAllSentSourceIds(uniqueAccounts);
|
|
|
|
|
+
|
|
|
|
|
+ int totalSent = result.values().stream().mapToInt(Set::size).sum();
|
|
|
|
|
+ log.info("获取各账号已发送素材: {} 个账号, 共 {} 条", result.size(), totalSent);
|
|
|
|
|
+ return result;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// =====================================================
|
|
// =====================================================
|
|
|
// 步骤 3: 批量获取视频标题
|
|
// 步骤 3: 批量获取视频标题
|
|
|
// =====================================================
|
|
// =====================================================
|
|
@@ -309,6 +344,22 @@ public class VideoArticleMatchJob {
|
|
|
return videoTitleMap;
|
|
return videoTitleMap;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // =====================================================
|
|
|
|
|
+ // 步骤 3.1: 构建 videoId → channelLevel3 映射
|
|
|
|
|
+ // =====================================================
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 从匹配记录中提取 videoId → channelLevel3 映射。
|
|
|
|
|
+ * <p>同一视频可能对应多条记录,取第一条的 channelLevel3。
|
|
|
|
|
+ */
|
|
|
|
|
+ private Map<Long, String> buildVideoChannelMap(List<ChannelDemandMatchResult> records) {
|
|
|
|
|
+ Map<Long, String> map = new LinkedHashMap<>();
|
|
|
|
|
+ for (ChannelDemandMatchResult r : records) {
|
|
|
|
|
+ map.putIfAbsent(r.getMatchVideoId(), r.getChannelLevel3());
|
|
|
|
|
+ }
|
|
|
|
|
+ return map;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
// =====================================================
|
|
// =====================================================
|
|
|
// 步骤 4: 视频标题 → 长文向量召回
|
|
// 步骤 4: 视频标题 → 长文向量召回
|
|
|
// =====================================================
|
|
// =====================================================
|
|
@@ -319,23 +370,31 @@ public class VideoArticleMatchJob {
|
|
|
* <p>使用线程池并发执行,单条失败不影响整体流程。
|
|
* <p>使用线程池并发执行,单条失败不影响整体流程。
|
|
|
* 每个标题使用 configCodes=[ARTICLE_TITLE, ARTICLE_SUMMARY] 进行并行 ANN 查询,
|
|
* 每个标题使用 configCodes=[ARTICLE_TITLE, ARTICLE_SUMMARY] 进行并行 ANN 查询,
|
|
|
* 结果只保留 modality=ARTICLE 的条目,按 score 降序排列。
|
|
* 结果只保留 modality=ARTICLE 的条目,按 score 降序排列。
|
|
|
|
|
+ * 召回结果从上往下(按 score 降序)排除该账号已发送的文章,取剩余的第一条。
|
|
|
*
|
|
*
|
|
|
- * @param videoTitleMap videoId → title 映射
|
|
|
|
|
- * @return videoId → 文章匹配列表 映射(按 score 降序)
|
|
|
|
|
|
|
+ * @param videoTitleMap videoId → title 映射
|
|
|
|
|
+ * @param videoChannelMap videoId → channelLevel3 映射
|
|
|
|
|
+ * @param sentSourceIds channelLevel3 → 已发送素材 ID 集合
|
|
|
|
|
+ * @return videoId → 文章匹配列表 映射(已过滤已发送,按 score 降序)
|
|
|
*/
|
|
*/
|
|
|
- private Map<Long, List<ArticleMatchItem>> matchArticlesByTitles(Map<Long, String> videoTitleMap) {
|
|
|
|
|
|
|
+ private Map<Long, List<ArticleMatchItem>> matchArticlesByTitles(
|
|
|
|
|
+ Map<Long, String> videoTitleMap,
|
|
|
|
|
+ Map<Long, String> videoChannelMap,
|
|
|
|
|
+ Map<String, Set<String>> sentSourceIds) {
|
|
|
ConcurrentHashMap<Long, List<ArticleMatchItem>> resultMap = new ConcurrentHashMap<>();
|
|
ConcurrentHashMap<Long, List<ArticleMatchItem>> resultMap = new ConcurrentHashMap<>();
|
|
|
RankingSpec ranking = buildRankingSpec();
|
|
RankingSpec ranking = buildRankingSpec();
|
|
|
|
|
|
|
|
int totalVideos = videoTitleMap.size();
|
|
int totalVideos = videoTitleMap.size();
|
|
|
AtomicInteger processed = new AtomicInteger(0);
|
|
AtomicInteger processed = new AtomicInteger(0);
|
|
|
AtomicInteger matchedCount = new AtomicInteger(0);
|
|
AtomicInteger matchedCount = new AtomicInteger(0);
|
|
|
|
|
+ AtomicInteger skippedSentCount = new AtomicInteger(0);
|
|
|
|
|
|
|
|
// 构建并发任务
|
|
// 构建并发任务
|
|
|
List<CompletableFuture<Void>> futures = new ArrayList<>(totalVideos);
|
|
List<CompletableFuture<Void>> futures = new ArrayList<>(totalVideos);
|
|
|
for (Map.Entry<Long, String> entry : videoTitleMap.entrySet()) {
|
|
for (Map.Entry<Long, String> entry : videoTitleMap.entrySet()) {
|
|
|
Long videoId = entry.getKey();
|
|
Long videoId = entry.getKey();
|
|
|
String title = entry.getValue();
|
|
String title = entry.getValue();
|
|
|
|
|
+ String channelLevel3 = videoChannelMap.get(videoId);
|
|
|
|
|
|
|
|
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
|
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
|
|
try {
|
|
try {
|
|
@@ -344,17 +403,24 @@ public class VideoArticleMatchJob {
|
|
|
List<ArticleMatchItem> articles = extractArticleItems(recallResult);
|
|
List<ArticleMatchItem> articles = extractArticleItems(recallResult);
|
|
|
|
|
|
|
|
if (!articles.isEmpty()) {
|
|
if (!articles.isEmpty()) {
|
|
|
- resultMap.put(videoId, articles);
|
|
|
|
|
- matchedCount.incrementAndGet();
|
|
|
|
|
|
|
+ // 从上往下(score 降序)排除已发送的文章,保留剩余列表
|
|
|
|
|
+ List<ArticleMatchItem> filtered = filterSentArticles(articles, channelLevel3, sentSourceIds);
|
|
|
|
|
+ int skipped = articles.size() - filtered.size();
|
|
|
|
|
+ if (skipped > 0) {
|
|
|
|
|
+ skippedSentCount.addAndGet(skipped);
|
|
|
|
|
+ }
|
|
|
|
|
+ if (!filtered.isEmpty()) {
|
|
|
|
|
+ resultMap.put(videoId, filtered);
|
|
|
|
|
+ matchedCount.incrementAndGet();
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("视频 {} (标题: {}) 长文匹配失败: {}", videoId, title, e.getMessage());
|
|
log.error("视频 {} (标题: {}) 长文匹配失败: {}", videoId, title, e.getMessage());
|
|
|
} finally {
|
|
} finally {
|
|
|
int done = processed.incrementAndGet();
|
|
int done = processed.incrementAndGet();
|
|
|
- // 每 10 条或每 50 条倍数的进度输出一次
|
|
|
|
|
if (done % 10 == 0 || done == totalVideos) {
|
|
if (done % 10 == 0 || done == totalVideos) {
|
|
|
- log.info("长文匹配进度: {}/{} 视频已处理, {} 个命中",
|
|
|
|
|
- done, totalVideos, matchedCount.get());
|
|
|
|
|
|
|
+ log.info("长文匹配进度: {}/{} 视频已处理, {} 个命中, 跳过已发送 {} 个",
|
|
|
|
|
+ done, totalVideos, matchedCount.get(), skippedSentCount.get());
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}, matchExecutor);
|
|
}, matchExecutor);
|
|
@@ -362,21 +428,48 @@ public class VideoArticleMatchJob {
|
|
|
futures.add(future);
|
|
futures.add(future);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 等待所有任务完成(每个 future 内部已 catch 异常,不会失败)
|
|
|
|
|
|
|
+ // 等待所有任务完成
|
|
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
|
|
|
|
|
|
|
- // 二次校验:确保所有任务都已执行完毕
|
|
|
|
|
int finalProcessed = processed.get();
|
|
int finalProcessed = processed.get();
|
|
|
if (finalProcessed != totalVideos) {
|
|
if (finalProcessed != totalVideos) {
|
|
|
log.warn("长文匹配未完全完成: 预期 {} 个, 实际完成 {} 个", totalVideos, finalProcessed);
|
|
log.warn("长文匹配未完全完成: 预期 {} 个, 实际完成 {} 个", totalVideos, finalProcessed);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 转换回 LinkedHashMap 保持顺序
|
|
|
|
|
Map<Long, List<ArticleMatchItem>> result = new LinkedHashMap<>(resultMap);
|
|
Map<Long, List<ArticleMatchItem>> result = new LinkedHashMap<>(resultMap);
|
|
|
- log.info("长文匹配完成: {}/{} 个视频命中长文", matchedCount.get(), totalVideos);
|
|
|
|
|
|
|
+ log.info("长文匹配完成: {}/{} 个视频命中长文, 跳过已发送 {} 篇",
|
|
|
|
|
+ matchedCount.get(), totalVideos, skippedSentCount.get());
|
|
|
return result;
|
|
return result;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 过滤已发送的文章:从上往下(按原始 score 降序)遍历,跳过在已发送集合中的文章。
|
|
|
|
|
+ * <p>注意:batchByText 结果已按 score 降序排列,跳过已发送后自动取下一个最优的。
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param articles 原始匹配文章列表(已按 score 降序)
|
|
|
|
|
+ * @param channelLevel3 该视频所属账号
|
|
|
|
|
+ * @param sentSourceIds channelLevel3 → 已发送素材 ID 集合
|
|
|
|
|
+ * @return 过滤后的文章列表(仍按 score 降序)
|
|
|
|
|
+ */
|
|
|
|
|
+ private List<ArticleMatchItem> filterSentArticles(
|
|
|
|
|
+ List<ArticleMatchItem> articles,
|
|
|
|
|
+ String channelLevel3,
|
|
|
|
|
+ Map<String, Set<String>> sentSourceIds) {
|
|
|
|
|
+ Set<String> sentIds = (channelLevel3 != null) ? sentSourceIds.get(channelLevel3) : null;
|
|
|
|
|
+ if (sentIds == null || sentIds.isEmpty()) {
|
|
|
|
|
+ return articles;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ List<ArticleMatchItem> filtered = new ArrayList<>(articles.size());
|
|
|
|
|
+ for (ArticleMatchItem item : articles) {
|
|
|
|
|
+ if (item.articleId == null || sentIds.contains(item.articleId)) {
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
+ filtered.add(item);
|
|
|
|
|
+ }
|
|
|
|
|
+ return filtered;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* 从 batchByText 返回结果中提取 Article 模态的匹配条目。
|
|
* 从 batchByText 返回结果中提取 Article 模态的匹配条目。
|
|
|
*
|
|
*
|
|
@@ -467,7 +560,7 @@ public class VideoArticleMatchJob {
|
|
|
*
|
|
*
|
|
|
* <p>每对 (video, article) 只产生一条记录(不关联需求维度),
|
|
* <p>每对 (video, article) 只产生一条记录(不关联需求维度),
|
|
|
* channelLevel3 / account 取自原始匹配记录。
|
|
* channelLevel3 / account 取自原始匹配记录。
|
|
|
- * 先清理同日 dt 旧数据(幂等重跑),再批量插入新结果。
|
|
|
|
|
|
|
+ * 先清理同日 dt 旧数据(幂等重跑),再分批插入新结果。
|
|
|
*/
|
|
*/
|
|
|
private void saveResults(String dt,
|
|
private void saveResults(String dt,
|
|
|
List<ChannelDemandMatchResult> records,
|
|
List<ChannelDemandMatchResult> records,
|