|
@@ -15,7 +15,6 @@ import com.tzld.videoVector.model.po.pgVector.ChannelDemandMatchResultExample;
|
|
|
import com.tzld.videoVector.model.po.pgVector.VideoArticleMatchResult;
|
|
import com.tzld.videoVector.model.po.pgVector.VideoArticleMatchResult;
|
|
|
import com.tzld.videoVector.model.vo.recall.RecallResultVO;
|
|
import com.tzld.videoVector.model.vo.recall.RecallResultVO;
|
|
|
import com.tzld.videoVector.service.recall.VectorRecallTestService;
|
|
import com.tzld.videoVector.service.recall.VectorRecallTestService;
|
|
|
-import com.tzld.videoVector.util.Md5Util;
|
|
|
|
|
import com.tzld.videoVector.util.RedisUtils;
|
|
import com.tzld.videoVector.util.RedisUtils;
|
|
|
import com.xxl.job.core.biz.model.ReturnT;
|
|
import com.xxl.job.core.biz.model.ReturnT;
|
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
@@ -126,12 +125,6 @@ public class VideoArticleMatchJob {
|
|
|
/** 视频标题缓存过期时间(秒),默认 7 天 */
|
|
/** 视频标题缓存过期时间(秒),默认 7 天 */
|
|
|
private static final long VIDEO_TITLE_CACHE_EXPIRE = 7 * 24 * 60 * 60;
|
|
private static final long VIDEO_TITLE_CACHE_EXPIRE = 7 * 24 * 60 * 60;
|
|
|
|
|
|
|
|
- /** 长文召回结果 Redis 缓存 key 前缀 */
|
|
|
|
|
- private static final String ARTICLE_RECALL_CACHE_PREFIX = "video_article_match:article_recall:";
|
|
|
|
|
-
|
|
|
|
|
- /** 长文召回结果缓存过期时间(秒),默认 7 天 */
|
|
|
|
|
- private static final long ARTICLE_RECALL_CACHE_EXPIRE = 7 * 24 * 60 * 60;
|
|
|
|
|
-
|
|
|
|
|
/** 长文匹配并发线程数 */
|
|
/** 长文匹配并发线程数 */
|
|
|
@Value("${video.article.match.thread-pool-size:10}")
|
|
@Value("${video.article.match.thread-pool-size:10}")
|
|
|
private int matchThreadPoolSize;
|
|
private int matchThreadPoolSize;
|
|
@@ -374,10 +367,9 @@ public class VideoArticleMatchJob {
|
|
|
// =====================================================
|
|
// =====================================================
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 对每个视频标题调用 batchByText 进行长文相似度召回,带 Redis 缓存。
|
|
|
|
|
|
|
+ * 对每个视频标题调用 batchByText 进行长文相似度召回。
|
|
|
*
|
|
*
|
|
|
- * <p>缓存 key: video_article_match:article_recall:{md5(title)},过期 7 天。
|
|
|
|
|
- * 先去重标题后批量 mGet,命中直接复用;未命中并发调用 batchByText 后批量回写。
|
|
|
|
|
|
|
+ * <p>使用线程池并发执行,单条失败不影响整体流程。
|
|
|
* 每个标题使用 configCodes=[ARTICLE_TITLE, ARTICLE_SUMMARY] 进行并行 ANN 查询,
|
|
* 每个标题使用 configCodes=[ARTICLE_TITLE, ARTICLE_SUMMARY] 进行并行 ANN 查询,
|
|
|
* 结果只保留 modality=ARTICLE 的条目,按 score 降序排列。
|
|
* 结果只保留 modality=ARTICLE 的条目,按 score 降序排列。
|
|
|
* 同一视频可能属于多个账号,对每个账号独立过滤已发送文章,结果按账号分组。
|
|
* 同一视频可能属于多个账号,对每个账号独立过滤已发送文章,结果按账号分组。
|
|
@@ -394,41 +386,11 @@ public class VideoArticleMatchJob {
|
|
|
ConcurrentHashMap<Long, Map<String, List<ArticleMatchItem>>> resultMap = new ConcurrentHashMap<>();
|
|
ConcurrentHashMap<Long, Map<String, List<ArticleMatchItem>>> resultMap = new ConcurrentHashMap<>();
|
|
|
RankingSpec ranking = buildRankingSpec();
|
|
RankingSpec ranking = buildRankingSpec();
|
|
|
|
|
|
|
|
- // ---- 第一阶段:批量 Redis 缓存查找(按去重标题) ----
|
|
|
|
|
- Set<String> uniqueTitles = new LinkedHashSet<>(videoTitleMap.values());
|
|
|
|
|
- Map<String, String> titleToCacheKey = new LinkedHashMap<>((int) (uniqueTitles.size() / 0.75f) + 1);
|
|
|
|
|
- List<String> cacheKeys = new ArrayList<>(uniqueTitles.size());
|
|
|
|
|
- for (String title : uniqueTitles) {
|
|
|
|
|
- String cacheKey = ARTICLE_RECALL_CACHE_PREFIX + Md5Util.encoderByMd5(title);
|
|
|
|
|
- titleToCacheKey.put(title, cacheKey);
|
|
|
|
|
- cacheKeys.add(cacheKey);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- List<String> cachedValues = redisUtils.mGet(cacheKeys);
|
|
|
|
|
- Map<String, List<ArticleMatchItem>> titleToArticles = new ConcurrentHashMap<>();
|
|
|
|
|
- int idx = 0;
|
|
|
|
|
- int cacheHitTitles = 0;
|
|
|
|
|
- for (String title : uniqueTitles) {
|
|
|
|
|
- String cached = (idx < cachedValues.size()) ? cachedValues.get(idx) : null;
|
|
|
|
|
- if (StringUtils.hasText(cached)) {
|
|
|
|
|
- titleToArticles.put(title, JSON.parseArray(cached, ArticleMatchItem.class));
|
|
|
|
|
- cacheHitTitles++;
|
|
|
|
|
- }
|
|
|
|
|
- idx++;
|
|
|
|
|
- }
|
|
|
|
|
- log.info("长文召回缓存: {} 个去重标题, 命中 {} 个, 未命中 {} 个",
|
|
|
|
|
- uniqueTitles.size(), cacheHitTitles, uniqueTitles.size() - cacheHitTitles);
|
|
|
|
|
-
|
|
|
|
|
- // 缓存未命中的标题 → 回写集合(并发安全)
|
|
|
|
|
- Map<String, String> newCacheEntries = new ConcurrentHashMap<>();
|
|
|
|
|
-
|
|
|
|
|
int totalVideos = videoTitleMap.size();
|
|
int totalVideos = videoTitleMap.size();
|
|
|
AtomicInteger processed = new AtomicInteger(0);
|
|
AtomicInteger processed = new AtomicInteger(0);
|
|
|
AtomicInteger matchedCount = new AtomicInteger(0);
|
|
AtomicInteger matchedCount = new AtomicInteger(0);
|
|
|
AtomicInteger skippedSentCount = new AtomicInteger(0);
|
|
AtomicInteger skippedSentCount = new AtomicInteger(0);
|
|
|
- AtomicInteger cacheHitCount = new AtomicInteger(0);
|
|
|
|
|
|
|
|
|
|
- // ---- 第二阶段:并发处理每个视频 ----
|
|
|
|
|
List<CompletableFuture<Void>> futures = new ArrayList<>(totalVideos);
|
|
List<CompletableFuture<Void>> futures = new ArrayList<>(totalVideos);
|
|
|
for (Map.Entry<Long, String> entry : videoTitleMap.entrySet()) {
|
|
for (Map.Entry<Long, String> entry : videoTitleMap.entrySet()) {
|
|
|
Long videoId = entry.getKey();
|
|
Long videoId = entry.getKey();
|
|
@@ -437,17 +399,9 @@ public class VideoArticleMatchJob {
|
|
|
|
|
|
|
|
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
|
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
|
|
|
try {
|
|
try {
|
|
|
- // 从缓存获取或实时召回
|
|
|
|
|
- List<ArticleMatchItem> articles = titleToArticles.get(title);
|
|
|
|
|
- if (articles != null) {
|
|
|
|
|
- cacheHitCount.incrementAndGet();
|
|
|
|
|
- } else {
|
|
|
|
|
- BatchByTextParam batchParam = buildBatchByTextParam(title, ranking);
|
|
|
|
|
- RecallResultVO recallResult = vectorRecallTestService.batchByText(batchParam);
|
|
|
|
|
- articles = extractArticleItems(recallResult);
|
|
|
|
|
- titleToArticles.put(title, articles);
|
|
|
|
|
- newCacheEntries.put(titleToCacheKey.get(title), JSON.toJSONString(articles));
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ BatchByTextParam batchParam = buildBatchByTextParam(title, ranking);
|
|
|
|
|
+ RecallResultVO recallResult = vectorRecallTestService.batchByText(batchParam);
|
|
|
|
|
+ List<ArticleMatchItem> articles = extractArticleItems(recallResult);
|
|
|
|
|
|
|
|
if (!articles.isEmpty() && !channelLevel3s.isEmpty()) {
|
|
if (!articles.isEmpty() && !channelLevel3s.isEmpty()) {
|
|
|
Map<String, List<ArticleMatchItem>> perChannelResults = new LinkedHashMap<>();
|
|
Map<String, List<ArticleMatchItem>> perChannelResults = new LinkedHashMap<>();
|
|
@@ -471,8 +425,8 @@ public class VideoArticleMatchJob {
|
|
|
} finally {
|
|
} finally {
|
|
|
int done = processed.incrementAndGet();
|
|
int done = processed.incrementAndGet();
|
|
|
if (done % 10 == 0 || done == totalVideos) {
|
|
if (done % 10 == 0 || done == totalVideos) {
|
|
|
- log.info("长文匹配进度: {}/{} 视频已处理, {} 个命中, 跳过已发送 {} 个, 缓存命中 {} 个",
|
|
|
|
|
- done, totalVideos, matchedCount.get(), skippedSentCount.get(), cacheHitCount.get());
|
|
|
|
|
|
|
+ log.info("长文匹配进度: {}/{} 视频已处理, {} 个命中, 跳过已发送 {} 个",
|
|
|
|
|
+ done, totalVideos, matchedCount.get(), skippedSentCount.get());
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}, matchExecutor);
|
|
}, matchExecutor);
|
|
@@ -480,23 +434,16 @@ public class VideoArticleMatchJob {
|
|
|
futures.add(future);
|
|
futures.add(future);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 等待所有任务完成
|
|
|
|
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
|
|
|
|
|
|
|
- // 批量回写缓存
|
|
|
|
|
- if (!newCacheEntries.isEmpty()) {
|
|
|
|
|
- redisUtils.batchSetWithExpire(newCacheEntries, ARTICLE_RECALL_CACHE_EXPIRE);
|
|
|
|
|
- log.info("长文召回缓存回写 {} 条", newCacheEntries.size());
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
int finalProcessed = processed.get();
|
|
int finalProcessed = processed.get();
|
|
|
if (finalProcessed != totalVideos) {
|
|
if (finalProcessed != totalVideos) {
|
|
|
log.warn("长文匹配未完全完成: 预期 {} 个, 实际完成 {} 个", totalVideos, finalProcessed);
|
|
log.warn("长文匹配未完全完成: 预期 {} 个, 实际完成 {} 个", totalVideos, finalProcessed);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
Map<Long, Map<String, List<ArticleMatchItem>>> result = new LinkedHashMap<>(resultMap);
|
|
Map<Long, Map<String, List<ArticleMatchItem>>> result = new LinkedHashMap<>(resultMap);
|
|
|
- log.info("长文匹配完成: {}/{} 个视频命中长文, 跳过已发送 {} 篇, 缓存命中 {} 次",
|
|
|
|
|
- matchedCount.get(), totalVideos, skippedSentCount.get(), cacheHitCount.get());
|
|
|
|
|
|
|
+ log.info("长文匹配完成: {}/{} 个视频命中长文, 跳过已发送 {} 篇",
|
|
|
|
|
+ matchedCount.get(), totalVideos, skippedSentCount.get());
|
|
|
return result;
|
|
return result;
|
|
|
}
|
|
}
|
|
|
|
|
|