|
|
@@ -0,0 +1,395 @@
|
|
|
+package com.tzld.videoVector.job;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.tzld.videoVector.common.constant.VectorConstants;
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.DeconstructVectorConfigMapper;
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.ext.ArticleDeconstructResultMapperExt;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.ArticleDeconstructResult;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfig;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfigExample;
|
|
|
+import com.tzld.videoVector.service.ArticleVectorStoreService;
|
|
|
+import com.tzld.videoVector.service.EmbeddingService;
|
|
|
+import com.tzld.videoVector.util.Md5Util;
|
|
|
+import com.tzld.videoVector.util.OdpsUtil;
|
|
|
+import com.xxl.job.core.biz.model.ReturnT;
|
|
|
+import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 文章标题/摘要向量化定时任务(独立于 AIGC 解构流水线)
|
|
|
+ * <p>
|
|
|
+ * 数据源:ODPS 大数据表(通过 content_id 关联文章标题和摘要)
|
|
|
+ * 存储目标:article_vectors 表(configCode = ARTICLE_TITLE / ARTICLE_SUMMARY)
|
|
|
+ * <p>
|
|
|
+ * 设计参考 {@link VideoTitleVectorJob}:独立 Job → 外部数据源获取文本 → embedding → 写入向量表
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class ArticleTextVectorJob {
|
|
|
+
|
|
|
+ private static final String SOURCE_AIGC = "aigc_deconstruct";
|
|
|
+ private static final String SOURCE_ODPS_TEXT = "odps_text";
|
|
|
+
|
|
|
+ /** 每批 DB 写入数量 */
|
|
|
+ private static final int DB_BATCH_SIZE = 200;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private DeconstructVectorConfigMapper vectorConfigMapper;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private ArticleVectorStoreService articleVectorStoreService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private EmbeddingService embeddingService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private ArticleDeconstructResultMapperExt articleDeconstructResultMapperExt;
|
|
|
+
|
|
|
+ // ==================== ODPS 配置 ====================
|
|
|
+
|
|
|
+ @Value("${odps.article.text.table:loghubods.long_articles_text_hour}")
|
|
|
+ private String odpsTable;
|
|
|
+
|
|
|
+ @Value("${odps.article.text.id.column:content_id}")
|
|
|
+ private String idColumn;
|
|
|
+
|
|
|
+ @Value("${odps.article.text.title.column:article_title}")
|
|
|
+ private String titleColumn;
|
|
|
+
|
|
|
+ @Value("${odps.article.text.summary.column:kimi_summary}")
|
|
|
+ private String summaryColumn;
|
|
|
+
|
|
|
+ // ==================== 处理参数 ====================
|
|
|
+
|
|
|
+ /** embedding 调用间隔(毫秒),防止超过 API 频次限制 */
|
|
|
+ @Value("${article.text.vector.embedding.interval.ms:100}")
|
|
|
+ private long embeddingIntervalMs;
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // XXL-Job 入口
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ @XxlJob("articleTextVectorJob")
|
|
|
+ public ReturnT<String> articleTextVectorJob(String param) {
|
|
|
+ log.info("开始执行文章标题/摘要向量化任务, param: {}", param);
|
|
|
+
|
|
|
+ String dt = parseParamString(param, "dt", null);
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 1. 加载配置
|
|
|
+ DeconstructVectorConfig titleConfig = getConfigByCode(VectorConstants.ARTICLE_TITLE_CONFIG_CODE);
|
|
|
+ DeconstructVectorConfig summaryConfig = getConfigByCode(VectorConstants.ARTICLE_SUMMARY_CONFIG_CODE);
|
|
|
+ if (titleConfig == null && summaryConfig == null) {
|
|
|
+ log.error("未找到 ARTICLE_TITLE / ARTICLE_SUMMARY 向量化配置,请先插入 deconstruct_vector_config 记录");
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "未找到配置");
|
|
|
+ }
|
|
|
+ logConfig(titleConfig, "ARTICLE_TITLE");
|
|
|
+ logConfig(summaryConfig, "ARTICLE_SUMMARY");
|
|
|
+
|
|
|
+ // 2. 加载已入库的 articleId 集合(从 article_deconstruct_result 分页扫描)
|
|
|
+ Set<String> allArticleIds = loadAllArticleIds();
|
|
|
+ log.info("article_deconstruct_result 中共 {} 个 articleId", allArticleIds.size());
|
|
|
+ if (allArticleIds.isEmpty()) {
|
|
|
+ log.info("无文章数据,任务结束");
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 3. 从 ODPS 流式读取标题/摘要,仅保留已入库文章
|
|
|
+ Map<String, String> titleMap = new LinkedHashMap<>();
|
|
|
+ Map<String, String> summaryMap = new LinkedHashMap<>();
|
|
|
+ long odpsTotal = loadTextFromOdps(dt, allArticleIds, titleMap, summaryMap);
|
|
|
+ log.info("ODPS 流式读取完成, 总行数={}, 匹配标题={}, 匹配摘要={}",
|
|
|
+ odpsTotal, titleMap.size(), summaryMap.size());
|
|
|
+
|
|
|
+ // 3.5 持久化 ODPS 标题/摘要到 article_deconstruct_result(source=odps_text)
|
|
|
+ int persisted = persistOdpsTextToDb(titleMap, summaryMap);
|
|
|
+ log.info("ODPS 文本持久化完成, 写入 {} 条", persisted);
|
|
|
+
|
|
|
+ // 4. 向量化标题
|
|
|
+ if (titleConfig != null) {
|
|
|
+ processTextVector(titleConfig, titleMap, "标题");
|
|
|
+ }
|
|
|
+
|
|
|
+ // 5. 向量化摘要
|
|
|
+ if (summaryConfig != null) {
|
|
|
+ processTextVector(summaryConfig, summaryMap, "摘要");
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("文章标题/摘要向量化任务完成");
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("文章标题/摘要向量化任务失败: {}", e.getMessage(), e);
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "任务执行失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // 文章 ID 加载
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ private Set<String> loadAllArticleIds() {
|
|
|
+ Set<String> ids = new HashSet<>();
|
|
|
+ int pageNum = 0;
|
|
|
+ while (true) {
|
|
|
+ int offset = pageNum * VectorConstants.PAGE_SIZE;
|
|
|
+ List<String> page = articleDeconstructResultMapperExt
|
|
|
+ .selectArticleIdsBySourcePaged(SOURCE_AIGC, offset, VectorConstants.PAGE_SIZE);
|
|
|
+ if (CollectionUtils.isEmpty(page)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ ids.addAll(page);
|
|
|
+ if (page.size() < VectorConstants.PAGE_SIZE) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ pageNum++;
|
|
|
+ }
|
|
|
+ return ids;
|
|
|
+ }
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // ODPS 数据加载
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ private long loadTextFromOdps(String dt, Set<String> allArticleIds,
|
|
|
+ Map<String, String> titleMap, Map<String, String> summaryMap) {
|
|
|
+ String safeDt = dt != null ? dt.replace("'", "''") : null;
|
|
|
+ String safeIdCol = idColumn.replace("'", "''");
|
|
|
+ String safeTitleCol = titleColumn.replace("'", "''");
|
|
|
+ String safeSummaryCol = summaryColumn.replace("'", "''");
|
|
|
+ String safeTable = odpsTable.replace("'", "''");
|
|
|
+
|
|
|
+ StringBuilder sql = new StringBuilder();
|
|
|
+ sql.append("SELECT ").append(safeIdCol)
|
|
|
+ .append(", ").append(safeTitleCol)
|
|
|
+ .append(", ").append(safeSummaryCol)
|
|
|
+ .append(" FROM ").append(safeTable);
|
|
|
+ if (safeDt != null) {
|
|
|
+ sql.append(" WHERE dt = '").append(safeDt).append("'");
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("ODPS SQL: {}", sql);
|
|
|
+
|
|
|
+ long[] totalRows = {0};
|
|
|
+ long[] matchedRows = {0};
|
|
|
+
|
|
|
+ OdpsUtil.getOdpsDataStream(sql.toString(), record -> {
|
|
|
+ String contentId = record.getString(safeIdCol);
|
|
|
+ if (contentId == null || contentId.isEmpty()) {
|
|
|
+ totalRows[0]++;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ contentId = contentId.trim();
|
|
|
+
|
|
|
+ // 采样日志
|
|
|
+ if (totalRows[0] < 3) {
|
|
|
+ String t = record.getString(safeTitleCol);
|
|
|
+ String s = record.getString(safeSummaryCol);
|
|
|
+ log.info("[ODPS采样{}] contentId={}, title={}, summary={}",
|
|
|
+ totalRows[0], contentId,
|
|
|
+ t != null && t.length() > 100 ? t.substring(0, 100) + "..." : t,
|
|
|
+ s != null && s.length() > 100 ? s.substring(0, 100) + "..." : s);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (allArticleIds.contains(contentId)) {
|
|
|
+ String title = record.getString(safeTitleCol);
|
|
|
+ if (StringUtils.hasText(title)) {
|
|
|
+ titleMap.put(contentId, title.trim());
|
|
|
+ }
|
|
|
+ String summary = record.getString(safeSummaryCol);
|
|
|
+ if (StringUtils.hasText(summary)) {
|
|
|
+ summaryMap.put(contentId, summary.trim());
|
|
|
+ }
|
|
|
+ matchedRows[0]++;
|
|
|
+ }
|
|
|
+ totalRows[0]++;
|
|
|
+
|
|
|
+ if (totalRows[0] % 10000 == 0) {
|
|
|
+ log.info("[ODPS进度] {} 行, 匹配={}", totalRows[0], matchedRows[0]);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ return totalRows[0];
|
|
|
+ }
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // 持久化 ODPS 文本到 article_deconstruct_result
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 将 ODPS 标题/摘要以 source=odps_text 写入 article_deconstruct_result。
|
|
|
+ * result 字段存储精简 JSON:{"title":"...", "summary":"..."}
|
|
|
+ */
|
|
|
+ private int persistOdpsTextToDb(Map<String, String> titleMap, Map<String, String> summaryMap) {
|
|
|
+ Set<String> allIds = new LinkedHashSet<>();
|
|
|
+ allIds.addAll(titleMap.keySet());
|
|
|
+ allIds.addAll(summaryMap.keySet());
|
|
|
+ if (allIds.isEmpty()) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<ArticleDeconstructResult> batch = new ArrayList<>(DB_BATCH_SIZE);
|
|
|
+ int totalPersisted = 0;
|
|
|
+
|
|
|
+ for (String articleId : allIds) {
|
|
|
+ JSONObject json = new JSONObject();
|
|
|
+ String title = titleMap.get(articleId);
|
|
|
+ if (StringUtils.hasText(title)) {
|
|
|
+ json.put("title", title);
|
|
|
+ }
|
|
|
+ String summary = summaryMap.get(articleId);
|
|
|
+ if (StringUtils.hasText(summary)) {
|
|
|
+ json.put("summary", summary);
|
|
|
+ }
|
|
|
+ if (json.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ ArticleDeconstructResult row = new ArticleDeconstructResult();
|
|
|
+ row.setArticleId(articleId);
|
|
|
+ row.setSource(SOURCE_ODPS_TEXT);
|
|
|
+ row.setResult(json.toJSONString());
|
|
|
+ batch.add(row);
|
|
|
+
|
|
|
+ if (batch.size() >= DB_BATCH_SIZE) {
|
|
|
+ totalPersisted += articleDeconstructResultMapperExt.upsertOdpsText(batch);
|
|
|
+ batch.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
+ totalPersisted += articleDeconstructResultMapperExt.upsertOdpsText(batch);
|
|
|
+ }
|
|
|
+
|
|
|
+ return totalPersisted;
|
|
|
+ }
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // 向量化处理
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ private void processTextVector(DeconstructVectorConfig config, Map<String, String> textMap, String label) {
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
+ Set<String> existingIds = articleVectorStoreService.getAllArticleIds(configCode);
|
|
|
+ log.info("{}向量化开始, 待处理={}, 已有向量={}", label, textMap.size(), existingIds.size());
|
|
|
+
|
|
|
+ Integer maxLength = config.getMaxLength();
|
|
|
+ int success = 0;
|
|
|
+ int fail = 0;
|
|
|
+ int skip = 0;
|
|
|
+ int processed = 0;
|
|
|
+
|
|
|
+ for (Map.Entry<String, String> entry : textMap.entrySet()) {
|
|
|
+ String articleId = entry.getKey();
|
|
|
+ String text = entry.getValue();
|
|
|
+
|
|
|
+ if (existingIds.contains(articleId)) {
|
|
|
+ skip++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (!StringUtils.hasText(text)) {
|
|
|
+ skip++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 截断
|
|
|
+ if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
|
|
|
+ text = text.substring(0, maxLength);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ List<Float> vector = getOrEmbed(text, config);
|
|
|
+ if (vector == null || vector.isEmpty()) {
|
|
|
+ log.error("{}向量化失败, articleId={}, text={}", label, articleId,
|
|
|
+ text.length() > 50 ? text.substring(0, 50) + "..." : text);
|
|
|
+ fail++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!articleVectorStoreService.save(configCode, articleId, vector, text)) {
|
|
|
+ log.error("{}向量存储失败, articleId={}", label, articleId);
|
|
|
+ fail++;
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ success++;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("{}向量化异常, articleId={}: {}", label, articleId, e.getMessage(), e);
|
|
|
+ fail++;
|
|
|
+ }
|
|
|
+
|
|
|
+ processed++;
|
|
|
+ if (processed % 100 == 0) {
|
|
|
+ log.info("{}向量化进度: 成功={}, 失败={}, 跳过={}", label, success, fail, skip);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("{}向量化完成: 成功={}, 失败={}, 跳过={}", label, success, fail, skip);
|
|
|
+ }
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // Embedding(含 text_hash 缓存 + 限流)
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ private List<Float> getOrEmbed(String text, DeconstructVectorConfig config) {
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
+ String textHash = Md5Util.encoderByMd5(text);
|
|
|
+ if (StringUtils.hasText(textHash)) {
|
|
|
+ List<Float> cached = articleVectorStoreService.getVectorByTextHash(textHash, configCode);
|
|
|
+ if (cached != null && !cached.isEmpty()) {
|
|
|
+ log.debug("命中 text_hash 缓存, hash={}, configCode={}", textHash, configCode);
|
|
|
+ return cached;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 限流
|
|
|
+ try {
|
|
|
+ Thread.sleep(embeddingIntervalMs);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ }
|
|
|
+ return embeddingService.embed(text, config);
|
|
|
+ }
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // 工具方法
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ private DeconstructVectorConfig getConfigByCode(String configCode) {
|
|
|
+ DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
|
|
|
+ example.createCriteria()
|
|
|
+ .andEnabledEqualTo((short) 1)
|
|
|
+ .andConfigCodeEqualTo(configCode);
|
|
|
+ List<DeconstructVectorConfig> configs = vectorConfigMapper.selectByExample(example);
|
|
|
+ return CollectionUtils.isEmpty(configs) ? null : configs.get(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void logConfig(DeconstructVectorConfig config, String label) {
|
|
|
+ if (config == null) {
|
|
|
+ log.info("{} 配置未找到,跳过", label);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("{} 配置: model={}, dimension={}, maxLength={}",
|
|
|
+ label, config.getEmbeddingModel(), config.getDimension(), config.getMaxLength());
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String parseParamString(String param, String key, String defaultValue) {
|
|
|
+ if (param == null || param.isEmpty()) return defaultValue;
|
|
|
+ for (String part : param.split(",")) {
|
|
|
+ String[] kv = part.trim().split("=", 2);
|
|
|
+ if (kv.length == 2 && kv[0].trim().equals(key)) {
|
|
|
+ return kv[1].trim();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return defaultValue;
|
|
|
+ }
|
|
|
+}
|