|
@@ -0,0 +1,352 @@
|
|
|
|
|
+package com.tzld.videoVector.job;
|
|
|
|
|
+
|
|
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.ext.ArticleQualityMapperExt;
|
|
|
|
|
+import com.tzld.videoVector.model.po.pgVector.ArticleQuality;
|
|
|
|
|
+import com.tzld.videoVector.util.ArticleQualityCalculator;
|
|
|
|
|
+import com.tzld.videoVector.util.OdpsUtil;
|
|
|
|
|
+import com.xxl.job.core.biz.model.ReturnT;
|
|
|
|
|
+import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
|
+
|
|
|
|
|
+import javax.annotation.Resource;
|
|
|
|
|
+import java.time.LocalDate;
|
|
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
|
+import java.util.LinkedHashMap;
|
|
|
|
|
+import java.util.List;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * 文章质量评分同步 Job
|
|
|
|
|
+ * 从 ODPS loghubods.article_title_his_cache 拉取发布表现数据,
|
|
|
|
|
+ * 解析 his_publish_article_list JSON,聚合计算综合质量分,写入 pgVector article_quality 表。
|
|
|
|
|
+ */
|
|
|
|
|
+@Component
|
|
|
|
|
+public class ArticleQualitySyncJob {
|
|
|
|
|
+
|
|
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(ArticleQualitySyncJob.class);
|
|
|
|
|
+
|
|
|
|
|
+ private static final DateTimeFormatter DT_FMT = DateTimeFormatter.ofPattern("yyyyMMdd");
|
|
|
|
|
+ private static final int DB_BATCH_SIZE = 200;
|
|
|
|
|
+
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private ArticleQualityMapperExt articleQualityMapperExt;
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 同步文章质量评分
|
|
|
|
|
+ *
|
|
|
|
|
+ * 可选参数(逗号分隔):
|
|
|
|
|
+ * wRead=0.4 — 阅读维度权重,默认 0.4
|
|
|
|
|
+ * wOpen=0.3 — 打开率维度权重,默认 0.3
|
|
|
|
|
+ * wFission=0.3 — 裂变率维度权重,默认 0.3
|
|
|
|
|
+ * confidenceThreshold=3 — 置信度发文次数阈值,默认 3
|
|
|
|
|
+ * dryRun=true — 仅打印不写库
|
|
|
|
|
+ * dt=20260607 — ODPS 分区日期,默认昨天
|
|
|
|
|
+ * maxRows=10000 — 最多读取行数,默认不限(全量)
|
|
|
|
|
+ */
|
|
|
|
|
+ @XxlJob("articleQualityJob")
|
|
|
|
|
+ public ReturnT<String> articleQualityJob(String param) {
|
|
|
|
|
+ log.info("===== articleQualityJob 开始, param: {} =====", param);
|
|
|
|
|
+
|
|
|
|
|
+ double wRead = parseParamDouble(param, "wRead", 0.4);
|
|
|
|
|
+ double wOpen = parseParamDouble(param, "wOpen", 0.3);
|
|
|
|
|
+ double wFission = parseParamDouble(param, "wFission", 0.3);
|
|
|
|
|
+ int confidenceThreshold = (int) parseParamDouble(param, "confidenceThreshold", 3);
|
|
|
|
|
+ boolean dryRun = parseParamBool(param, "dryRun");
|
|
|
|
|
+ String odpsDt = parseParamString(param, "dt", LocalDate.now().minusDays(1).format(DT_FMT));
|
|
|
|
|
+ int maxRows = (int) parseParamDouble(param, "maxRows", 0);
|
|
|
|
|
+
|
|
|
|
|
+ String dt = LocalDate.now().format(DT_FMT);
|
|
|
|
|
+ log.info("权重: r={} o={} f={}, 置信度阈值: {}, ODPS分区dt={}, 写入dt={}",
|
|
|
|
|
+ wRead, wOpen, wFission, confidenceThreshold, odpsDt, dt);
|
|
|
|
|
+
|
|
|
|
|
+ // Step 0: 探针——确认分区有数据
|
|
|
|
|
+ String probeSql = "SELECT COUNT(*) AS cnt FROM loghubods.article_title_his_cache WHERE dt = '" + odpsDt + "' AND type = '9'";
|
|
|
|
|
+ log.info("探针 SQL: {}", probeSql);
|
|
|
|
|
+ try {
|
|
|
|
|
+ long[] rowCount = {0};
|
|
|
|
|
+ OdpsUtil.getOdpsDataStream(probeSql, record -> {
|
|
|
|
|
+ rowCount[0] = record.getBigint("cnt") != null ? record.getBigint("cnt") : 0;
|
|
|
|
|
+ });
|
|
|
|
|
+ log.info("探针: dt={} 总行数={}", odpsDt, rowCount[0]);
|
|
|
|
|
+ if (rowCount[0] == 0) {
|
|
|
|
|
+ log.warn("分区 dt={} 无数据,请确认分区值是否正确", odpsDt);
|
|
|
|
|
+ return ReturnT.FAIL;
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("探针查询失败: {}", e.getMessage(), e);
|
|
|
|
|
+ return ReturnT.FAIL;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Step 1: 从 ODPS 一次流式读取(OdpsUtil 底层已是流式,不会 OOM)
|
|
|
|
|
+ List<ArticleQuality> rawList = new ArrayList<>();
|
|
|
|
|
+ long[] totalRows = {0};
|
|
|
|
|
+ long[] parseFailCount = {0};
|
|
|
|
|
+ long[] emptyDataCount = {0};
|
|
|
|
|
+ long[] validCount = {0};
|
|
|
|
|
+
|
|
|
|
|
+ String sql = maxRows > 0
|
|
|
|
|
+ ? "SELECT source_id, his_publish_article_list "
|
|
|
|
|
+ + "FROM loghubods.article_title_his_cache "
|
|
|
|
|
+ + "WHERE dt = '" + odpsDt + "' "
|
|
|
|
|
+ + "AND type = '9' "
|
|
|
|
|
+ + "AND his_publish_article_list IS NOT NULL "
|
|
|
|
|
+ + "LIMIT " + maxRows
|
|
|
|
|
+ : "SELECT source_id, his_publish_article_list "
|
|
|
|
|
+ + "FROM loghubods.article_title_his_cache "
|
|
|
|
|
+ + "WHERE dt = '" + odpsDt + "' "
|
|
|
|
|
+ + "AND type = '9' "
|
|
|
|
|
+ + "AND his_publish_article_list IS NOT NULL";
|
|
|
|
|
+
|
|
|
|
|
+ log.info("ODPS SQL: {}", sql);
|
|
|
|
|
+ try {
|
|
|
|
|
+ OdpsUtil.getOdpsDataStream(sql, record -> {
|
|
|
|
|
+ String contentId = record.getString("source_id");
|
|
|
|
|
+ String hisPublishStr = record.getString("his_publish_article_list");
|
|
|
|
|
+
|
|
|
|
|
+ if (contentId == null || contentId.isEmpty()) return;
|
|
|
|
|
+ if (hisPublishStr == null || hisPublishStr.isEmpty()) return;
|
|
|
|
|
+
|
|
|
|
|
+ // 采样前 3 条原始数据
|
|
|
|
|
+ if (totalRows[0] < 3) {
|
|
|
|
|
+ String preview = hisPublishStr.length() > 500
|
|
|
|
|
+ ? hisPublishStr.substring(0, 500) + "..."
|
|
|
|
|
+ : hisPublishStr;
|
|
|
|
|
+ System.out.println("[采样" + totalRows[0] + "] contentId=" + contentId + ", json=" + preview);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ArticleQuality aq = aggregateFromHisPublishList(contentId, hisPublishStr);
|
|
|
|
|
+ if (aq != null) {
|
|
|
|
|
+ synchronized (rawList) { rawList.add(aq); }
|
|
|
|
|
+ validCount[0]++;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // 区分解析失败 vs 空数据
|
|
|
|
|
+ if (isJsonParseFail(hisPublishStr)) {
|
|
|
|
|
+ parseFailCount[0]++;
|
|
|
|
|
+ if (parseFailCount[0] <= 5) {
|
|
|
|
|
+ String preview = hisPublishStr.length() > 300
|
|
|
|
|
+ ? hisPublishStr.substring(0, 300) + "..."
|
|
|
|
|
+ : hisPublishStr;
|
|
|
|
|
+ System.out.println("[解析失败#" + parseFailCount[0] + "] contentId=" + contentId + ", json=" + preview);
|
|
|
|
|
+ }
|
|
|
|
|
+ } else {
|
|
|
|
|
+ emptyDataCount[0]++;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ totalRows[0]++;
|
|
|
|
|
+ if (totalRows[0] % 10000 == 0) {
|
|
|
|
|
+ System.out.println("[进度] " + totalRows[0] + " 行, 有效=" + validCount[0] + ", 解析失败=" + parseFailCount[0] + ", 无数据=" + emptyDataCount[0]);
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.error("ODPS 查询异常: {}", e.getMessage(), e);
|
|
|
|
|
+ return ReturnT.FAIL;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ System.out.println("[完成] 总行数=" + totalRows[0] + ", 有效=" + validCount[0] + ", 解析失败=" + parseFailCount[0] + ", 无数据=" + emptyDataCount[0]);
|
|
|
|
|
+ if (rawList.isEmpty()) {
|
|
|
|
|
+ log.warn("无有效文章表现数据");
|
|
|
|
|
+ return ReturnT.FAIL;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Step 2: 计算质量分
|
|
|
|
|
+ ArticleQualityCalculator.calculateAll(rawList, wRead, wOpen, wFission, confidenceThreshold);
|
|
|
|
|
+
|
|
|
|
|
+ if (dryRun) {
|
|
|
|
|
+ log.info("===== DRY RUN 模式, 不写入DB =====");
|
|
|
|
|
+ printTopBottom(rawList);
|
|
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Step 2.5: 按 contentId 去重(ODPS 同 contentId 可能多行)
|
|
|
|
|
+ Map<String, ArticleQuality> deduped = new LinkedHashMap<>();
|
|
|
|
|
+ for (ArticleQuality aq : rawList) {
|
|
|
|
|
+ deduped.putIfAbsent(aq.getContentId(), aq);
|
|
|
|
|
+ }
|
|
|
|
|
+ List<ArticleQuality> list = new ArrayList<>(deduped.values());
|
|
|
|
|
+ System.out.println("[去重] " + rawList.size() + " → " + list.size() + " 条");
|
|
|
|
|
+
|
|
|
|
|
+ // Step 3: 分批写入
|
|
|
|
|
+ System.out.println("[写入DB] 开始, 共 " + list.size() + " 条, dt=" + dt);
|
|
|
|
|
+ // 采样第一条数据
|
|
|
|
|
+ if (!list.isEmpty()) {
|
|
|
|
|
+ ArticleQuality sample = list.get(0);
|
|
|
|
|
+ System.out.println("[写入DB 采样] contentId=" + sample.getContentId()
|
|
|
|
|
+ + ", qualityScore=" + round2(sample.getQualityScore())
|
|
|
|
|
+ + ", dt=" + dt);
|
|
|
|
|
+ }
|
|
|
|
|
+ int totalUpserted = 0;
|
|
|
|
|
+ for (int i = 0; i < list.size(); i += DB_BATCH_SIZE) {
|
|
|
|
|
+ int end = Math.min(i + DB_BATCH_SIZE, list.size());
|
|
|
|
|
+ List<ArticleQuality> batch = list.subList(i, end);
|
|
|
|
|
+ // 设置 dt
|
|
|
|
|
+ for (ArticleQuality aq : batch) {
|
|
|
|
|
+ aq.setDt(dt);
|
|
|
|
|
+ }
|
|
|
|
|
+ int n = articleQualityMapperExt.batchUpsert(batch);
|
|
|
|
|
+ totalUpserted += n;
|
|
|
|
|
+ if ((i / DB_BATCH_SIZE) % 50 == 0) {
|
|
|
|
|
+ System.out.println("[写入DB 进度] " + end + "/" + rawList.size() + ", 本批" + n + "条, 累计" + totalUpserted + "条");
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ System.out.println("[写入DB 完成] upserted=" + totalUpserted);
|
|
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 解析 his_publish_article_list 并聚合为单条 ArticleQuality
|
|
|
|
|
+ */
|
|
|
|
|
+ private ArticleQuality aggregateFromHisPublishList(String contentId, String hisPublishListJson) {
|
|
|
|
|
+ JSONArray publishList;
|
|
|
|
|
+ try {
|
|
|
|
|
+ publishList = JSON.parseArray(hisPublishListJson);
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ log.info("contentId={} his_publish_article_list JSON 解析失败: {}", contentId, e.getMessage());
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (publishList == null || publishList.isEmpty()) {
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ long totalRead = 0;
|
|
|
|
|
+ double totalAvgRead = 0;
|
|
|
|
|
+ long totalFans = 0;
|
|
|
|
|
+ int maxItemIndex = -1;
|
|
|
|
|
+ double totalFirstLevel = 0;
|
|
|
|
|
+ double totalFission = 0;
|
|
|
|
|
+
|
|
|
|
|
+ for (int i = 0; i < publishList.size(); i++) {
|
|
|
|
|
+ JSONObject pub = publishList.getJSONObject(i);
|
|
|
|
|
+ if (pub == null) continue;
|
|
|
|
|
+
|
|
|
|
|
+ long viewCount = pub.getLongValue("viewCount");
|
|
|
|
|
+ totalRead += viewCount;
|
|
|
|
|
+
|
|
|
|
|
+ Double avgView = pub.getDouble("avgViewCount");
|
|
|
|
|
+ if (avgView != null) {
|
|
|
|
|
+ totalAvgRead += avgView;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 取最新发文(itemIndex 最大)的粉丝量
|
|
|
|
|
+ int itemIndex = pub.getIntValue("itemIndex");
|
|
|
|
|
+ if (itemIndex > maxItemIndex) {
|
|
|
|
|
+ maxItemIndex = itemIndex;
|
|
|
|
|
+ totalFans = pub.getLongValue("fans");
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 首层和裂变
|
|
|
|
|
+ JSONArray fissionList = pub.getJSONArray("articleDetailInfoList");
|
|
|
|
|
+ if (fissionList != null) {
|
|
|
|
|
+ double pubFirstLevel = 0;
|
|
|
|
|
+ double pubFission = 0;
|
|
|
|
|
+ for (int j = 0; j < fissionList.size(); j++) {
|
|
|
|
|
+ JSONObject fi = fissionList.getJSONObject(j);
|
|
|
|
|
+ if (fi == null) continue;
|
|
|
|
|
+ pubFirstLevel += fi.getDoubleValue("firstLevel");
|
|
|
|
|
+ pubFission += fi.getDoubleValue("fission0")
|
|
|
|
|
+ + fi.getDoubleValue("fission1")
|
|
|
|
|
+ + fi.getDoubleValue("fission2");
|
|
|
|
|
+ }
|
|
|
|
|
+ totalFirstLevel += pubFirstLevel;
|
|
|
|
|
+ totalFission += pubFission;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (totalRead <= 0 && totalAvgRead <= 0) {
|
|
|
|
|
+ return null;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ ArticleQuality aq = new ArticleQuality();
|
|
|
|
|
+ aq.setContentId(contentId);
|
|
|
|
|
+ aq.setTotalRead(totalRead);
|
|
|
|
|
+ aq.setAvgRead(totalAvgRead);
|
|
|
|
|
+ aq.setTotalFans(totalFans);
|
|
|
|
|
+ aq.setPublishCount(publishList.size());
|
|
|
|
|
+ aq.setOpenRate(totalRead > 0 ? totalFirstLevel / totalRead : 0);
|
|
|
|
|
+ aq.setFissionRate(totalFirstLevel > 0 ? totalFission / totalFirstLevel : 0);
|
|
|
|
|
+ return aq;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private static boolean isJsonParseFail(String json) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ JSON.parseArray(json);
|
|
|
|
|
+ return false;
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // ===== 辅助方法 =====
|
|
|
|
|
+
|
|
|
|
|
+ private static double parseParamDouble(String param, String key, double defaultValue) {
|
|
|
|
|
+ if (param == null || param.isEmpty()) return defaultValue;
|
|
|
|
|
+ for (String part : param.split(",")) {
|
|
|
|
|
+ String[] kv = part.trim().split("=", 2);
|
|
|
|
|
+ if (kv.length == 2 && kv[0].trim().equals(key)) {
|
|
|
|
|
+ try { return Double.parseDouble(kv[1].trim()); } catch (NumberFormatException ignored) { }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return defaultValue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private static String parseParamString(String param, String key, String defaultValue) {
|
|
|
|
|
+ if (param == null || param.isEmpty()) return defaultValue;
|
|
|
|
|
+ for (String part : param.split(",")) {
|
|
|
|
|
+ String[] kv = part.trim().split("=", 2);
|
|
|
|
|
+ if (kv.length == 2 && kv[0].trim().equals(key)) {
|
|
|
|
|
+ return kv[1].trim();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return defaultValue;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private static boolean parseParamBool(String param, String key) {
|
|
|
|
|
+ if (param == null || param.isEmpty()) return false;
|
|
|
|
|
+ for (String part : param.split(",")) {
|
|
|
|
|
+ String[] kv = part.trim().split("=", 2);
|
|
|
|
|
+ if (kv.length == 2 && kv[0].trim().equals(key)) {
|
|
|
|
|
+ return "true".equalsIgnoreCase(kv[1].trim());
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private static void printTopBottom(List<ArticleQuality> list) {
|
|
|
|
|
+ List<ArticleQuality> sorted = new ArrayList<>(list);
|
|
|
|
|
+ sorted.sort((a, b) -> Double.compare(
|
|
|
|
|
+ b.getQualityScore() == null ? 0 : b.getQualityScore(),
|
|
|
|
|
+ a.getQualityScore() == null ? 0 : a.getQualityScore()));
|
|
|
|
|
+
|
|
|
|
|
+ int show = Math.min(5, sorted.size());
|
|
|
|
|
+ System.out.println("===== Top " + show + " 高质量文章 =====");
|
|
|
|
|
+ for (int i = 0; i < show; i++) {
|
|
|
|
|
+ ArticleQuality aq = sorted.get(i);
|
|
|
|
|
+ System.out.println("[" + (i + 1) + "] contentId=" + aq.getContentId()
|
|
|
|
|
+ + ", qualityScore=" + round2(aq.getQualityScore())
|
|
|
|
|
+ + ", readScore=" + round2(aq.getReadScore())
|
|
|
|
|
+ + ", openScore=" + round2(aq.getOpenScore())
|
|
|
|
|
+ + ", fissionScore=" + round2(aq.getFissionScore())
|
|
|
|
|
+ + ", conf=" + round2(aq.getConfidence()));
|
|
|
|
|
+ }
|
|
|
|
|
+ System.out.println("===== Bottom " + show + " 低质量文章 =====");
|
|
|
|
|
+ for (int i = sorted.size() - 1; i >= Math.max(0, sorted.size() - show); i--) {
|
|
|
|
|
+ ArticleQuality aq = sorted.get(i);
|
|
|
|
|
+ System.out.println("[" + (sorted.size() - i) + "] contentId=" + aq.getContentId()
|
|
|
|
|
+ + ", qualityScore=" + round2(aq.getQualityScore())
|
|
|
|
|
+ + ", readScore=" + round2(aq.getReadScore())
|
|
|
|
|
+ + ", openScore=" + round2(aq.getOpenScore())
|
|
|
|
|
+ + ", fissionScore=" + round2(aq.getFissionScore())
|
|
|
|
|
+ + ", conf=" + round2(aq.getConfidence()));
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private static double round2(Double v) {
|
|
|
|
|
+ if (v == null) return 0;
|
|
|
|
|
+ return Math.round(v * 100.0) / 100.0;
|
|
|
|
|
+ }
|
|
|
|
|
+}
|