|
|
@@ -0,0 +1,689 @@
|
|
|
+package com.tzld.videoVector.job;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.google.common.collect.Lists;
|
|
|
+import com.tzld.videoVector.api.AigcApiService;
|
|
|
+import com.tzld.videoVector.common.constant.VectorConstants;
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.DeconstructVectorConfigMapper;
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.ext.ArticleDeconstructResultMapperExt;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.ArticleDeconstructResult;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfig;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.DeconstructVectorConfigExample;
|
|
|
+import com.tzld.videoVector.service.ArticleVectorStoreService;
|
|
|
+import com.tzld.videoVector.service.EmbeddingService;
|
|
|
+import com.tzld.videoVector.util.Md5Util;
|
|
|
+import com.tzld.videoVector.util.VectorUtils;
|
|
|
+import com.xxl.job.core.biz.model.ReturnT;
|
|
|
+import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Executors;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 文章向量化定时任务(对称 MaterialVectorJob)
|
|
|
+ * <p>
|
|
|
+ * 数据流:
|
|
|
+ * <ul>
|
|
|
+ * <li>{@link #syncArticleDeconstructJob(String)}:从 AIGC API 拉取文章解构结果,写入 article_deconstruct_result</li>
|
|
|
+ * <li>{@link #vectorArticleJob(String)}:扫描 article_deconstruct_result,按配置提取文本并向量化,写入 article_vectors</li>
|
|
|
+ * <li>{@link #articleJob(String)}:编排前两步串行执行</li>
|
|
|
+ * </ul>
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class ArticleVectorJob {
|
|
|
+
|
|
|
+ private static final String SOURCE_AIGC = "aigc_deconstruct";
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private DeconstructVectorConfigMapper vectorConfigMapper;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private ArticleDeconstructResultMapperExt articleDeconstructResultMapperExt;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private ArticleVectorStoreService articleVectorStoreService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private EmbeddingService embeddingService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private AigcApiService aigcApiService;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 文章 AIGC 任务 ID
|
|
|
+ */
|
|
|
+ @Value("${aigc.article.task.id:66}")
|
|
|
+ private int articleTaskId;
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // 入口 1:同步文章解构结果
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ @XxlJob("syncArticleDeconstructJob")
|
|
|
+ public ReturnT<String> syncArticleDeconstructJob(String param) {
|
|
|
+ log.info("开始执行文章解构同步任务, param: {}", param);
|
|
|
+ try {
|
|
|
+ AtomicInteger insertCount = new AtomicInteger(0);
|
|
|
+ AtomicInteger skipCount = new AtomicInteger(0);
|
|
|
+ syncAigcArticleSource(insertCount, skipCount);
|
|
|
+ log.info("文章解构同步完成 新增={}, 已存在跳过={}", insertCount.get(), skipCount.get());
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("文章解构同步任务失败: {}", e.getMessage(), e);
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "任务执行失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void syncAigcArticleSource(AtomicInteger insertCount, AtomicInteger skipCount) {
|
|
|
+ log.info("开始从 AIGC taskId={} 拉取文章数据", articleTaskId);
|
|
|
+
|
|
|
+ List<AigcApiService.AigcTaskInput> taskInputList = aigcApiService.getTaskInputList(articleTaskId);
|
|
|
+ if (CollectionUtils.isEmpty(taskInputList)) {
|
|
|
+ log.info("AIGC taskId={} 无文章数据", articleTaskId);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, Long> articleIdToTaskInstanceId = new HashMap<>();
|
|
|
+ for (AigcApiService.AigcTaskInput input : taskInputList) {
|
|
|
+ String articleId = normalizeArticleId(input.getBizUniqueId());
|
|
|
+ if (articleId == null) {
|
|
|
+ log.info("跳过空 bizUniqueId, taskId={}", articleTaskId);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ articleIdToTaskInstanceId.put(articleId, input.getTaskInstanceId());
|
|
|
+ }
|
|
|
+ log.info("taskId={} 拉到 {} 篇文章", articleTaskId, articleIdToTaskInstanceId.size());
|
|
|
+
|
|
|
+ if (articleIdToTaskInstanceId.isEmpty()) {
|
|
|
+ log.info("AIGC 任务无有效文章数据");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ List<String> allArticleIds = new ArrayList<>(articleIdToTaskInstanceId.keySet());
|
|
|
+ for (List<String> batchIds : Lists.partition(allArticleIds, VectorConstants.ODPS_IN_BATCH_SIZE)) {
|
|
|
+ Set<String> existingIds = new HashSet<>(
|
|
|
+ articleDeconstructResultMapperExt.selectExistingArticleIds(SOURCE_AIGC, batchIds));
|
|
|
+ skipCount.addAndGet(existingIds.size());
|
|
|
+
|
|
|
+ List<String> needSyncIds = batchIds.stream()
|
|
|
+ .filter(id -> !existingIds.contains(id))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ if (needSyncIds.isEmpty()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ ExecutorService executor = Executors.newFixedThreadPool(VectorConstants.AIGC_DETAIL_PARALLELISM);
|
|
|
+ try {
|
|
|
+ List<Future<?>> futures = new ArrayList<>();
|
|
|
+ List<ArticleDeconstructResult> batch = Collections.synchronizedList(new ArrayList<>());
|
|
|
+
|
|
|
+ for (String articleId : needSyncIds) {
|
|
|
+ futures.add(executor.submit(() -> {
|
|
|
+ try {
|
|
|
+ Long taskInstanceId = articleIdToTaskInstanceId.get(articleId);
|
|
|
+ if (taskInstanceId == null) return;
|
|
|
+ JSONObject dataContent = aigcApiService.getTaskCallbackDetail(taskInstanceId);
|
|
|
+ if (dataContent != null) {
|
|
|
+ ArticleDeconstructResult r = new ArticleDeconstructResult();
|
|
|
+ r.setArticleId(articleId);
|
|
|
+ r.setSource(SOURCE_AIGC);
|
|
|
+ r.setResult(dataContent.toJSONString());
|
|
|
+ batch.add(r);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("同步 articleId={} 失败: {}", articleId, e.getMessage());
|
|
|
+ }
|
|
|
+ }));
|
|
|
+ }
|
|
|
+ awaitAndShutdown(futures, executor, 30, "文章同步");
|
|
|
+
|
|
|
+ if (!batch.isEmpty()) {
|
|
|
+ for (List<ArticleDeconstructResult> subBatch : Lists.partition(batch, 200)) {
|
|
|
+ insertCount.addAndGet(articleDeconstructResultMapperExt.batchInsertIgnore(subBatch));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ executor.shutdownNow();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // 入口 2:文章向量化
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ @XxlJob("vectorArticleJob")
|
|
|
+ public ReturnT<String> vectorArticleJob(String param) {
|
|
|
+ log.info("开始执行文章向量化任务, param: {}", param);
|
|
|
+ Integer maxArticleCount = parseMaxCount(param);
|
|
|
+ return doVectorize(maxArticleCount);
|
|
|
+ }
|
|
|
+
|
|
|
+ private ReturnT<String> doVectorize(Integer maxArticleCount) {
|
|
|
+ try {
|
|
|
+ List<DeconstructVectorConfig> configs = getEnabledConfigsBySourceField(SOURCE_AIGC);
|
|
|
+ if (CollectionUtils.isEmpty(configs)) {
|
|
|
+ log.info("未找到 source_field={} 的向量化配置", SOURCE_AIGC);
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ }
|
|
|
+ log.info("加载 {} 个文章向量化配置: {}", configs.size(),
|
|
|
+ configs.stream().map(DeconstructVectorConfig::getConfigCode).collect(Collectors.toList()));
|
|
|
+
|
|
|
+ AtomicInteger totalSuccessCount = new AtomicInteger(0);
|
|
|
+ AtomicInteger totalFailCount = new AtomicInteger(0);
|
|
|
+ AtomicInteger totalProcessed = new AtomicInteger(0);
|
|
|
+ int pageNum = 0;
|
|
|
+
|
|
|
+ while (true) {
|
|
|
+ int offset = pageNum * VectorConstants.PAGE_SIZE;
|
|
|
+ int limit = VectorConstants.PAGE_SIZE;
|
|
|
+ if (maxArticleCount != null && maxArticleCount > 0) {
|
|
|
+ int remaining = maxArticleCount - totalProcessed.get();
|
|
|
+ if (remaining <= 0) break;
|
|
|
+ limit = Math.min(limit, remaining);
|
|
|
+ }
|
|
|
+
|
|
|
+ List<String> articleIds = articleDeconstructResultMapperExt
|
|
|
+ .selectArticleIdsBySourcePaged(SOURCE_AIGC, offset, limit);
|
|
|
+ if (CollectionUtils.isEmpty(articleIds)) {
|
|
|
+ log.info("第 {} 页没有查询到数据,分页查询结束", pageNum);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ log.info("第 {} 页查询到 {} 个 articleId", pageNum, articleIds.size());
|
|
|
+
|
|
|
+ Map<String, ParsedArticle> parsedById = loadParsedArticles(articleIds);
|
|
|
+
|
|
|
+ ExecutorService configExecutor = Executors.newFixedThreadPool(configs.size());
|
|
|
+ try {
|
|
|
+ List<Future<?>> configFutures = new ArrayList<>();
|
|
|
+ for (DeconstructVectorConfig config : configs) {
|
|
|
+ configFutures.add(configExecutor.submit(() ->
|
|
|
+ processConfigForArticle(config, articleIds, parsedById, totalSuccessCount, totalFailCount)
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ awaitAndShutdown(configFutures, configExecutor, 30, "文章向量化配置并发");
|
|
|
+ } finally {
|
|
|
+ configExecutor.shutdownNow();
|
|
|
+ }
|
|
|
+
|
|
|
+ totalProcessed.addAndGet(articleIds.size());
|
|
|
+
|
|
|
+ if (maxArticleCount != null && maxArticleCount > 0
|
|
|
+ && totalProcessed.get() >= maxArticleCount) {
|
|
|
+ log.info("已达到 maxArticleCount={} 限制,结束扫描", maxArticleCount);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (articleIds.size() < limit) {
|
|
|
+ log.info("第 {} 页数据量 {} 小于 limit {},分页结束", pageNum, articleIds.size(), limit);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ pageNum++;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("文章向量化任务完成 总处理文章={}, 成功={}, 失败={}",
|
|
|
+ totalProcessed.get(), totalSuccessCount.get(), totalFailCount.get());
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("文章向量化任务失败: {}", e.getMessage(), e);
|
|
|
+ return new ReturnT<>(ReturnT.FAIL_CODE, "任务执行失败: " + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, ParsedArticle> loadParsedArticles(List<String> articleIds) {
|
|
|
+ List<ArticleDeconstructResult> results = articleDeconstructResultMapperExt
|
|
|
+ .selectResultsByArticleIds(SOURCE_AIGC, articleIds);
|
|
|
+ Map<String, ParsedArticle> map = new HashMap<>(articleIds.size());
|
|
|
+ for (ArticleDeconstructResult r : results) {
|
|
|
+ if (r == null || !StringUtils.hasText(r.getResult())) continue;
|
|
|
+ JSONObject dataContent;
|
|
|
+ try {
|
|
|
+ dataContent = JSON.parseObject(r.getResult());
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("articleId={} result JSON 解析失败: {}", r.getArticleId(), e.getMessage());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (dataContent == null) continue;
|
|
|
+ map.put(r.getArticleId(), new ParsedArticle(dataContent));
|
|
|
+ }
|
|
|
+ return map;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void processConfigForArticle(DeconstructVectorConfig config, List<String> articleIds,
|
|
|
+ Map<String, ParsedArticle> parsedById,
|
|
|
+ AtomicInteger totalSuccessCount, AtomicInteger totalFailCount) {
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
+ try {
|
|
|
+ Set<String> existingIds = articleVectorStoreService.existsByIds(configCode, articleIds);
|
|
|
+ List<String> needProcessIds = articleIds.stream()
|
|
|
+ .filter(id -> !existingIds.contains(id))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ if (needProcessIds.isEmpty()) {
|
|
|
+ log.info("配置 {} 下所有文章已有向量,跳过", configCode);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ log.info("配置 {} 需要处理 {} 篇文章", configCode, needProcessIds.size());
|
|
|
+
|
|
|
+ for (String articleId : needProcessIds) {
|
|
|
+ ParsedArticle parsed = parsedById.get(articleId);
|
|
|
+ if (parsed == null) {
|
|
|
+ log.info("articleId={} 配置 {} 无解构结果,跳过", articleId, configCode);
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ List<String> texts = extractTextsFromDataContent(parsed.dataContent, config);
|
|
|
+ if (CollectionUtils.isEmpty(texts)) {
|
|
|
+ log.info("articleId={} 配置 {} 未提取到文本,跳过", articleId, configCode);
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ boolean ok = vectorizeAndStoreArticle(config, articleId, texts);
|
|
|
+ if (ok) {
|
|
|
+ totalSuccessCount.incrementAndGet();
|
|
|
+ } else {
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("处理 articleId={} 配置 {} 时发生异常: {}", articleId, configCode, e.getMessage(), e);
|
|
|
+ totalFailCount.incrementAndGet();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("配置 {} 处理异常: {}", configCode, e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean vectorizeAndStoreArticle(DeconstructVectorConfig config, String articleId,
|
|
|
+ List<String> texts) {
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
+ Integer maxLength = config.getMaxLength();
|
|
|
+ boolean multiPoint = VectorUtils.isMultiPointConfig(config);
|
|
|
+
|
|
|
+ if (multiPoint) {
|
|
|
+ List<String> validTexts = new ArrayList<>(texts.size());
|
|
|
+ for (String raw : texts) {
|
|
|
+ if (StringUtils.hasText(raw)) validTexts.add(raw);
|
|
|
+ }
|
|
|
+ if (validTexts.isEmpty()) {
|
|
|
+ log.info("articleId={} 配置 {} 无有效文本", articleId, configCode);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ List<List<Float>> vectors = new ArrayList<>(validTexts.size());
|
|
|
+ List<String> truncated = new ArrayList<>(validTexts.size());
|
|
|
+ for (int i = 0; i < validTexts.size(); i++) {
|
|
|
+ String text = validTexts.get(i);
|
|
|
+ if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
|
|
|
+ text = text.substring(0, maxLength);
|
|
|
+ }
|
|
|
+ List<Float> vector = getOrEmbed(text, config);
|
|
|
+ if (vector == null || vector.isEmpty()) {
|
|
|
+ log.error("articleId={} 配置 {} 第{}个文本向量化失败,本文章本轮放弃",
|
|
|
+ articleId, configCode, i);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ vectors.add(vector);
|
|
|
+ truncated.add(text);
|
|
|
+ }
|
|
|
+ for (int i = 0; i < vectors.size(); i++) {
|
|
|
+ if (!articleVectorStoreService.save(configCode, articleId, i, vectors.get(i), truncated.get(i))) {
|
|
|
+ log.error("articleId={} 配置 {} 第{}个点 save 返回 false", articleId, configCode, i);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ articleVectorStoreService.deleteAbovePointIndex(configCode, articleId, vectors.size());
|
|
|
+ log.debug("articleId={} 配置 {} 多点向量化存储成功,共 {} 个点", articleId, configCode, vectors.size());
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ String text = null;
|
|
|
+ for (String t : texts) {
|
|
|
+ if (StringUtils.hasText(t)) {
|
|
|
+ text = t;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (text == null) {
|
|
|
+ log.info("articleId={} 配置 {} 无有效文本,跳过", articleId, configCode);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (maxLength != null && maxLength > 0 && text.length() > maxLength) {
|
|
|
+ text = text.substring(0, maxLength);
|
|
|
+ }
|
|
|
+ List<Float> vector = getOrEmbed(text, config);
|
|
|
+ if (vector == null || vector.isEmpty()) {
|
|
|
+ log.error("articleId={} 配置 {} 文本向量化失败", articleId, configCode);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if (!articleVectorStoreService.save(configCode, articleId, vector, text)) {
|
|
|
+ log.error("articleId={} 配置 {} save 返回 false", articleId, configCode);
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ log.debug("articleId={} 配置 {} 向量化存储成功", articleId, configCode);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<Float> getOrEmbed(String text, DeconstructVectorConfig config) {
|
|
|
+ String configCode = config.getConfigCode();
|
|
|
+ String textHash = Md5Util.encoderByMd5(text);
|
|
|
+ if (StringUtils.hasText(textHash)) {
|
|
|
+ List<Float> cached = articleVectorStoreService.getVectorByTextHash(textHash, configCode);
|
|
|
+ if (cached != null && !cached.isEmpty()) {
|
|
|
+ log.debug("命中 text_hash 缓存(article),hash={}, configCode={}", textHash, configCode);
|
|
|
+ return cached;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return embeddingService.embed(text, config);
|
|
|
+ }
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // 入口 3:编排
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ @XxlJob("articleJob")
|
|
|
+ public ReturnT<String> articleJob(String param) {
|
|
|
+ log.info("开始执行文章完整链路, param: {}", param);
|
|
|
+ ReturnT<String> syncResult = syncArticleDeconstructJob(param);
|
|
|
+ if (syncResult.getCode() != ReturnT.SUCCESS_CODE) {
|
|
|
+ log.error("文章同步阶段失败: {}", syncResult.getMsg());
|
|
|
+ return syncResult;
|
|
|
+ }
|
|
|
+ return vectorArticleJob(param);
|
|
|
+ }
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // 文本提取(与 MaterialVectorJob 共用同一套逻辑)
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ private List<String> extractTextsFromDataContent(JSONObject dataContent, DeconstructVectorConfig config) {
|
|
|
+ if (dataContent == null) {
|
|
|
+ return Collections.emptyList();
|
|
|
+ }
|
|
|
+ String extractRule = config.getExtractRule();
|
|
|
+ if (StringUtils.hasText(extractRule)) {
|
|
|
+ try {
|
|
|
+ JSONObject rule = JSON.parseObject(extractRule);
|
|
|
+ if ("point_decomposition".equals(rule.getString("type"))) {
|
|
|
+ return extractTextsFromPointDecomposition(dataContent, rule);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ // 不是 JSON 或无 type 字段,走原有逻辑
|
|
|
+ }
|
|
|
+ return extractTextsWithConfidence(dataContent, config.getSourcePath(), extractRule);
|
|
|
+ } else {
|
|
|
+ return VectorUtils.extractFromJson(dataContent, config.getSourcePath());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<String> extractTextsWithConfidence(JSONObject json, String sourcePath, String extractRule) {
|
|
|
+ List<String> texts = new ArrayList<>();
|
|
|
+ try {
|
|
|
+ JSONObject rule = JSON.parseObject(extractRule);
|
|
|
+ String textField = rule.getString("text_field");
|
|
|
+ String confidenceField = rule.getString("confidence_field");
|
|
|
+ double confidenceThreshold = rule.getDoubleValue("confidence_threshold");
|
|
|
+ if (!StringUtils.hasText(textField) || !StringUtils.hasText(confidenceField)) {
|
|
|
+ log.error("extract_rule 缺少必要字段: text_field={}, confidence_field={}", textField, confidenceField);
|
|
|
+ return texts;
|
|
|
+ }
|
|
|
+ if (sourcePath.endsWith("[*]")) {
|
|
|
+ List<JSONObject> items = VectorUtils.extractArrayItemsFromJson(json, sourcePath);
|
|
|
+ for (JSONObject item : items) {
|
|
|
+ if (isConfidenceQualified(item, confidenceField, confidenceThreshold)) {
|
|
|
+ String text = item.getString(textField);
|
|
|
+ if (StringUtils.hasText(text)) {
|
|
|
+ texts.add(text);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ List<String> pathValues = VectorUtils.extractFromJson(json, sourcePath);
|
|
|
+ if (!pathValues.isEmpty()) {
|
|
|
+ JSONObject targetObj = navigateToObject(json, sourcePath);
|
|
|
+ if (targetObj != null && isConfidenceQualified(targetObj, confidenceField, confidenceThreshold)) {
|
|
|
+ String text = targetObj.getString(textField);
|
|
|
+ if (StringUtils.hasText(text)) {
|
|
|
+ texts.add(text);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("置信度过滤提取失败: path={}, error={}", sourcePath, e.getMessage());
|
|
|
+ }
|
|
|
+ return texts;
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<String> extractTextsFromPointDecomposition(JSONObject dataContent, JSONObject rule) {
|
|
|
+ List<String> texts = new ArrayList<>();
|
|
|
+ try {
|
|
|
+ String pointArrayPath = rule.getString("point_array_path");
|
|
|
+ String finalResultPath = rule.getString("final_result_path");
|
|
|
+ String pointNameField = rule.getString("point_name_field");
|
|
|
+ String confidenceField = rule.getString("confidence_field");
|
|
|
+ double confidenceThreshold = rule.getDoubleValue("confidence_threshold");
|
|
|
+ String target = rule.getString("target");
|
|
|
+ String contributionPath = rule.getString("contribution_path");
|
|
|
+ double contributionThreshold = rule.getDoubleValue("contribution_threshold");
|
|
|
+
|
|
|
+ List<JSONObject> finalPoints = VectorUtils.extractArrayItemsFromJson(dataContent, finalResultPath + "[*]");
|
|
|
+ List<String> qualifiedPointNames = new ArrayList<>();
|
|
|
+ for (JSONObject fp : finalPoints) {
|
|
|
+ if (isConfidenceQualified(fp, confidenceField, confidenceThreshold)) {
|
|
|
+ String pointName = fp.getString(pointNameField);
|
|
|
+ if (StringUtils.hasText(pointName)) {
|
|
|
+ qualifiedPointNames.add(pointName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (qualifiedPointNames.isEmpty()) return texts;
|
|
|
+
|
|
|
+ List<JSONObject> pointDetails = VectorUtils.extractArrayItemsFromJson(dataContent, pointArrayPath + "[*]");
|
|
|
+ Map<String, Double> contributionMap = buildContributionMap(dataContent, contributionPath);
|
|
|
+
|
|
|
+ for (String pointName : qualifiedPointNames) {
|
|
|
+ try {
|
|
|
+ JSONObject matchedPoint = null;
|
|
|
+ for (JSONObject detail : pointDetails) {
|
|
|
+ if (pointName.equals(detail.getString("点"))) {
|
|
|
+ matchedPoint = detail;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (matchedPoint == null) continue;
|
|
|
+
|
|
|
+ List<String> itemNames = "substance".equals(target)
|
|
|
+ ? extractSubstanceNames(matchedPoint)
|
|
|
+ : extractFormNames(matchedPoint);
|
|
|
+ for (String name : itemNames) {
|
|
|
+ Double contribution = contributionMap.get(name);
|
|
|
+ if (contribution != null && contribution >= contributionThreshold) {
|
|
|
+ texts.add(name);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.debug("extractTextsFromPointDecomposition 单点处理异常 pointName={}: {}", pointName, e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("extractTextsFromPointDecomposition 失败: {}", e.getMessage(), e);
|
|
|
+ }
|
|
|
+ return texts;
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<String> extractSubstanceNames(JSONObject point) {
|
|
|
+ List<String> names = new ArrayList<>();
|
|
|
+ JSONObject substance = point.getJSONObject("实质");
|
|
|
+ if (substance == null) return names;
|
|
|
+ for (String key : new String[]{"具体元素", "具象概念", "抽象概念"}) {
|
|
|
+ try {
|
|
|
+ collectNamesFromArray(substance.getJSONArray(key), names);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.debug("extractSubstanceNames key={} 异常: {}", key, e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return names;
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<String> extractFormNames(JSONObject point) {
|
|
|
+ List<String> names = new ArrayList<>();
|
|
|
+ JSONObject form = point.getJSONObject("形式");
|
|
|
+ if (form == null) return names;
|
|
|
+ for (String key : new String[]{"具体元素形式", "具象概念形式", "整体形式"}) {
|
|
|
+ try {
|
|
|
+ collectNamesFromArray(form.getJSONArray(key), names);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.debug("extractFormNames key={} 异常: {}", key, e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return names;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void collectNamesFromArray(JSONArray array, List<String> names) {
|
|
|
+ if (array == null || array.isEmpty()) return;
|
|
|
+ for (int i = 0; i < array.size(); i++) {
|
|
|
+ try {
|
|
|
+ JSONObject item = array.getJSONObject(i);
|
|
|
+ if (item != null) {
|
|
|
+ String name = item.getString("名称");
|
|
|
+ if (StringUtils.hasText(name)) {
|
|
|
+ names.add(name);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.debug("collectNamesFromArray 单元素解析异常: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, Double> buildContributionMap(JSONObject dataContent, String contributionPath) {
|
|
|
+ Map<String, Double> map = new HashMap<>();
|
|
|
+ try {
|
|
|
+ List<JSONObject> contributions = VectorUtils.extractArrayItemsFromJson(dataContent, contributionPath + "[*]");
|
|
|
+ for (JSONObject c : contributions) {
|
|
|
+ try {
|
|
|
+ String word = c.getString("词");
|
|
|
+ Double contribution = c.getDouble("贡献度");
|
|
|
+ if (StringUtils.hasText(word) && contribution != null) {
|
|
|
+ map.put(word, contribution);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.debug("buildContributionMap 单元素解析异常: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("构建贡献度查找表失败: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ return map;
|
|
|
+ }
|
|
|
+
|
|
|
+ private JSONObject navigateToObject(JSONObject json, String path) {
|
|
|
+ if (json == null || !StringUtils.hasText(path) || !path.startsWith("$.")) return null;
|
|
|
+ try {
|
|
|
+ String pathContent = path.substring(2);
|
|
|
+ String[] parts = pathContent.split("\\.");
|
|
|
+ Object current = json;
|
|
|
+ for (String part : parts) {
|
|
|
+ if (current instanceof JSONObject) {
|
|
|
+ current = ((JSONObject) current).get(part);
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return current instanceof JSONObject ? (JSONObject) current : null;
|
|
|
+ } catch (Exception e) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean isConfidenceQualified(JSONObject item, String confidenceField, double threshold) {
|
|
|
+ Object value = item.get(confidenceField);
|
|
|
+ if (value == null) return false;
|
|
|
+ if (value instanceof String) return "high".equalsIgnoreCase((String) value);
|
|
|
+ if (value instanceof Number) return ((Number) value).doubleValue() >= threshold;
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ // ====================================================================
|
|
|
+ // 通用辅助
|
|
|
+ // ====================================================================
|
|
|
+
|
|
|
+ private List<DeconstructVectorConfig> getEnabledConfigsBySourceField(String sourceField) {
|
|
|
+ DeconstructVectorConfigExample example = new DeconstructVectorConfigExample();
|
|
|
+ example.createCriteria()
|
|
|
+ .andEnabledEqualTo((short) 1)
|
|
|
+ .andSourceFieldEqualTo(sourceField);
|
|
|
+ example.setOrderByClause("priority ASC");
|
|
|
+ return vectorConfigMapper.selectByExample(example);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void awaitAndShutdown(List<Future<?>> futures, ExecutorService executor,
|
|
|
+ long timeoutMinutes, String taskDesc) {
|
|
|
+ long deadline = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(timeoutMinutes);
|
|
|
+ int completed = 0;
|
|
|
+ for (Future<?> future : futures) {
|
|
|
+ long remaining = deadline - System.currentTimeMillis();
|
|
|
+ if (remaining <= 0) {
|
|
|
+ log.error("{} 整体超时({}分钟),已取消剩余任务 (已完成 {}/{})",
|
|
|
+ taskDesc, timeoutMinutes, completed, futures.size());
|
|
|
+ for (Future<?> f : futures) {
|
|
|
+ f.cancel(true);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ future.get(remaining, TimeUnit.MILLISECONDS);
|
|
|
+ completed++;
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("{} 并发任务等待异常: {}", taskDesc, e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ executor.shutdown();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Integer parseMaxCount(String param) {
|
|
|
+ if (!StringUtils.hasText(param)) return null;
|
|
|
+ try {
|
|
|
+ int v = Integer.parseInt(param.trim());
|
|
|
+ return v > 0 ? v : null;
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private String normalizeArticleId(String bizUniqueId) {
|
|
|
+ if (!StringUtils.hasText(bizUniqueId)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return bizUniqueId.trim();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final class ParsedArticle {
|
|
|
+ final JSONObject dataContent;
|
|
|
+
|
|
|
+ ParsedArticle(JSONObject dataContent) {
|
|
|
+ this.dataContent = dataContent;
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|