|
@@ -21,11 +21,6 @@ import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
-/**
|
|
|
|
|
- * 文章质量评分同步 Job
|
|
|
|
|
- * 从 ODPS loghubods.article_title_his_cache 拉取发布表现数据,
|
|
|
|
|
- * 解析 his_publish_article_list JSON,聚合计算综合质量分,写入 pgVector article_quality 表。
|
|
|
|
|
- */
|
|
|
|
|
@Component
|
|
@Component
|
|
|
public class ArticleQualitySyncJob {
|
|
public class ArticleQualitySyncJob {
|
|
|
|
|
|
|
@@ -37,18 +32,6 @@ public class ArticleQualitySyncJob {
|
|
|
@Resource
|
|
@Resource
|
|
|
private ArticleQualityMapperExt articleQualityMapperExt;
|
|
private ArticleQualityMapperExt articleQualityMapperExt;
|
|
|
|
|
|
|
|
- /**
|
|
|
|
|
- * 同步文章质量评分
|
|
|
|
|
- *
|
|
|
|
|
- * 可选参数(逗号分隔):
|
|
|
|
|
- * wRead=0.4 — 阅读维度权重,默认 0.4
|
|
|
|
|
- * wOpen=0.3 — 打开率维度权重,默认 0.3
|
|
|
|
|
- * wFission=0.3 — 裂变率维度权重,默认 0.3
|
|
|
|
|
- * confidenceThreshold=3 — 置信度发文次数阈值,默认 3
|
|
|
|
|
- * dryRun=true — 仅打印不写库
|
|
|
|
|
- * dt=20260607 — ODPS 分区日期,默认昨天
|
|
|
|
|
- * maxRows=10000 — 最多读取行数,默认不限(全量)
|
|
|
|
|
- */
|
|
|
|
|
@XxlJob("articleQualityJob")
|
|
@XxlJob("articleQualityJob")
|
|
|
public ReturnT<String> articleQualityJob(String param) {
|
|
public ReturnT<String> articleQualityJob(String param) {
|
|
|
log.info("===== articleQualityJob 开始, param: {} =====", param);
|
|
log.info("===== articleQualityJob 开始, param: {} =====", param);
|
|
@@ -61,11 +44,20 @@ public class ArticleQualitySyncJob {
|
|
|
String odpsDt = parseParamString(param, "dt", LocalDate.now().minusDays(1).format(DT_FMT));
|
|
String odpsDt = parseParamString(param, "dt", LocalDate.now().minusDays(1).format(DT_FMT));
|
|
|
int maxRows = (int) parseParamDouble(param, "maxRows", 0);
|
|
int maxRows = (int) parseParamDouble(param, "maxRows", 0);
|
|
|
|
|
|
|
|
|
|
+ if (!odpsDt.matches("\\d{8}")) {
|
|
|
|
|
+ log.error("odpsDt 格式非法,期望 yyyyMMdd: {}", odpsDt);
|
|
|
|
|
+ return ReturnT.FAIL;
|
|
|
|
|
+ }
|
|
|
|
|
+ if (maxRows < 0) {
|
|
|
|
|
+ log.error("maxRows 不能为负数: {}", maxRows);
|
|
|
|
|
+ return ReturnT.FAIL;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
String dt = LocalDate.now().format(DT_FMT);
|
|
String dt = LocalDate.now().format(DT_FMT);
|
|
|
log.info("权重: r={} o={} f={}, 置信度阈值: {}, ODPS分区dt={}, 写入dt={}",
|
|
log.info("权重: r={} o={} f={}, 置信度阈值: {}, ODPS分区dt={}, 写入dt={}",
|
|
|
wRead, wOpen, wFission, confidenceThreshold, odpsDt, dt);
|
|
wRead, wOpen, wFission, confidenceThreshold, odpsDt, dt);
|
|
|
|
|
|
|
|
- // Step 0: 探针——确认分区有数据
|
|
|
|
|
|
|
+ // 确认分区有数据
|
|
|
String probeSql = "SELECT COUNT(*) AS cnt FROM loghubods.article_title_his_cache WHERE dt = '" + odpsDt + "' AND type = '9'";
|
|
String probeSql = "SELECT COUNT(*) AS cnt FROM loghubods.article_title_his_cache WHERE dt = '" + odpsDt + "' AND type = '9'";
|
|
|
log.info("探针 SQL: {}", probeSql);
|
|
log.info("探针 SQL: {}", probeSql);
|
|
|
try {
|
|
try {
|
|
@@ -83,7 +75,7 @@ public class ArticleQualitySyncJob {
|
|
|
return ReturnT.FAIL;
|
|
return ReturnT.FAIL;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Step 1: 从 ODPS 一次流式读取(OdpsUtil 底层已是流式,不会 OOM)
|
|
|
|
|
|
|
+ // 从 ODPS 流式读取
|
|
|
List<ArticleQuality> rawList = new ArrayList<>();
|
|
List<ArticleQuality> rawList = new ArrayList<>();
|
|
|
long[] totalRows = {0};
|
|
long[] totalRows = {0};
|
|
|
long[] parseFailCount = {0};
|
|
long[] parseFailCount = {0};
|
|
@@ -112,12 +104,11 @@ public class ArticleQualitySyncJob {
|
|
|
if (contentId == null || contentId.isEmpty()) return;
|
|
if (contentId == null || contentId.isEmpty()) return;
|
|
|
if (hisPublishStr == null || hisPublishStr.isEmpty()) return;
|
|
if (hisPublishStr == null || hisPublishStr.isEmpty()) return;
|
|
|
|
|
|
|
|
- // 采样前 3 条原始数据
|
|
|
|
|
if (totalRows[0] < 3) {
|
|
if (totalRows[0] < 3) {
|
|
|
String preview = hisPublishStr.length() > 500
|
|
String preview = hisPublishStr.length() > 500
|
|
|
? hisPublishStr.substring(0, 500) + "..."
|
|
? hisPublishStr.substring(0, 500) + "..."
|
|
|
: hisPublishStr;
|
|
: hisPublishStr;
|
|
|
- System.out.println("[采样" + totalRows[0] + "] contentId=" + contentId + ", json=" + preview);
|
|
|
|
|
|
|
+ log.info("[采样{}] contentId={}, json={}", totalRows[0], contentId, preview);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
ArticleQuality aq = aggregateFromHisPublishList(contentId, hisPublishStr);
|
|
ArticleQuality aq = aggregateFromHisPublishList(contentId, hisPublishStr);
|
|
@@ -125,14 +116,13 @@ public class ArticleQualitySyncJob {
|
|
|
synchronized (rawList) { rawList.add(aq); }
|
|
synchronized (rawList) { rawList.add(aq); }
|
|
|
validCount[0]++;
|
|
validCount[0]++;
|
|
|
} else {
|
|
} else {
|
|
|
- // 区分解析失败 vs 空数据
|
|
|
|
|
if (isJsonParseFail(hisPublishStr)) {
|
|
if (isJsonParseFail(hisPublishStr)) {
|
|
|
parseFailCount[0]++;
|
|
parseFailCount[0]++;
|
|
|
if (parseFailCount[0] <= 5) {
|
|
if (parseFailCount[0] <= 5) {
|
|
|
String preview = hisPublishStr.length() > 300
|
|
String preview = hisPublishStr.length() > 300
|
|
|
? hisPublishStr.substring(0, 300) + "..."
|
|
? hisPublishStr.substring(0, 300) + "..."
|
|
|
: hisPublishStr;
|
|
: hisPublishStr;
|
|
|
- System.out.println("[解析失败#" + parseFailCount[0] + "] contentId=" + contentId + ", json=" + preview);
|
|
|
|
|
|
|
+ log.info("[解析失败#{}] contentId={}, json={}", parseFailCount[0], contentId, preview);
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
emptyDataCount[0]++;
|
|
emptyDataCount[0]++;
|
|
@@ -140,7 +130,7 @@ public class ArticleQualitySyncJob {
|
|
|
}
|
|
}
|
|
|
totalRows[0]++;
|
|
totalRows[0]++;
|
|
|
if (totalRows[0] % 10000 == 0) {
|
|
if (totalRows[0] % 10000 == 0) {
|
|
|
- System.out.println("[进度] " + totalRows[0] + " 行, 有效=" + validCount[0] + ", 解析失败=" + parseFailCount[0] + ", 无数据=" + emptyDataCount[0]);
|
|
|
|
|
|
|
+ log.info("[进度] {} 行, 有效={}, 解析失败={}, 无数据={}", totalRows[0], validCount[0], parseFailCount[0], emptyDataCount[0]);
|
|
|
}
|
|
}
|
|
|
});
|
|
});
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
@@ -148,13 +138,13 @@ public class ArticleQualitySyncJob {
|
|
|
return ReturnT.FAIL;
|
|
return ReturnT.FAIL;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- System.out.println("[完成] 总行数=" + totalRows[0] + ", 有效=" + validCount[0] + ", 解析失败=" + parseFailCount[0] + ", 无数据=" + emptyDataCount[0]);
|
|
|
|
|
|
|
+ log.info("[完成] 总行数={}, 有效={}, 解析失败={}, 无数据={}", totalRows[0], validCount[0], parseFailCount[0], emptyDataCount[0]);
|
|
|
if (rawList.isEmpty()) {
|
|
if (rawList.isEmpty()) {
|
|
|
log.warn("无有效文章表现数据");
|
|
log.warn("无有效文章表现数据");
|
|
|
return ReturnT.FAIL;
|
|
return ReturnT.FAIL;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Step 2: 计算质量分
|
|
|
|
|
|
|
+ // 计算质量分
|
|
|
ArticleQualityCalculator.calculateAll(rawList, wRead, wOpen, wFission, confidenceThreshold);
|
|
ArticleQualityCalculator.calculateAll(rawList, wRead, wOpen, wFission, confidenceThreshold);
|
|
|
|
|
|
|
|
if (dryRun) {
|
|
if (dryRun) {
|
|
@@ -163,44 +153,38 @@ public class ArticleQualitySyncJob {
|
|
|
return ReturnT.SUCCESS;
|
|
return ReturnT.SUCCESS;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Step 2.5: 按 contentId 去重(ODPS 同 contentId 可能多行)
|
|
|
|
|
|
|
+ // 按 contentId 去重(ODPS 同 contentId 可能多行)
|
|
|
Map<String, ArticleQuality> deduped = new LinkedHashMap<>();
|
|
Map<String, ArticleQuality> deduped = new LinkedHashMap<>();
|
|
|
for (ArticleQuality aq : rawList) {
|
|
for (ArticleQuality aq : rawList) {
|
|
|
deduped.putIfAbsent(aq.getContentId(), aq);
|
|
deduped.putIfAbsent(aq.getContentId(), aq);
|
|
|
}
|
|
}
|
|
|
List<ArticleQuality> list = new ArrayList<>(deduped.values());
|
|
List<ArticleQuality> list = new ArrayList<>(deduped.values());
|
|
|
- System.out.println("[去重] " + rawList.size() + " → " + list.size() + " 条");
|
|
|
|
|
|
|
+ log.info("[去重] {} → {} 条", rawList.size(), list.size());
|
|
|
|
|
|
|
|
- // Step 3: 分批写入
|
|
|
|
|
- System.out.println("[写入DB] 开始, 共 " + list.size() + " 条, dt=" + dt);
|
|
|
|
|
- // 采样第一条数据
|
|
|
|
|
|
|
+ // 分批写入
|
|
|
|
|
+ log.info("[写入DB] 开始, 共 {} 条, dt={}", list.size(), dt);
|
|
|
if (!list.isEmpty()) {
|
|
if (!list.isEmpty()) {
|
|
|
ArticleQuality sample = list.get(0);
|
|
ArticleQuality sample = list.get(0);
|
|
|
- System.out.println("[写入DB 采样] contentId=" + sample.getContentId()
|
|
|
|
|
- + ", qualityScore=" + round2(sample.getQualityScore())
|
|
|
|
|
- + ", dt=" + dt);
|
|
|
|
|
|
|
+ log.info("[写入DB 采样] contentId={}, qualityScore={}, dt={}",
|
|
|
|
|
+ sample.getContentId(), round2(sample.getQualityScore()), dt);
|
|
|
}
|
|
}
|
|
|
int totalUpserted = 0;
|
|
int totalUpserted = 0;
|
|
|
for (int i = 0; i < list.size(); i += DB_BATCH_SIZE) {
|
|
for (int i = 0; i < list.size(); i += DB_BATCH_SIZE) {
|
|
|
int end = Math.min(i + DB_BATCH_SIZE, list.size());
|
|
int end = Math.min(i + DB_BATCH_SIZE, list.size());
|
|
|
List<ArticleQuality> batch = list.subList(i, end);
|
|
List<ArticleQuality> batch = list.subList(i, end);
|
|
|
- // 设置 dt
|
|
|
|
|
for (ArticleQuality aq : batch) {
|
|
for (ArticleQuality aq : batch) {
|
|
|
aq.setDt(dt);
|
|
aq.setDt(dt);
|
|
|
}
|
|
}
|
|
|
int n = articleQualityMapperExt.batchUpsert(batch);
|
|
int n = articleQualityMapperExt.batchUpsert(batch);
|
|
|
totalUpserted += n;
|
|
totalUpserted += n;
|
|
|
if ((i / DB_BATCH_SIZE) % 50 == 0) {
|
|
if ((i / DB_BATCH_SIZE) % 50 == 0) {
|
|
|
- System.out.println("[写入DB 进度] " + end + "/" + rawList.size() + ", 本批" + n + "条, 累计" + totalUpserted + "条");
|
|
|
|
|
|
|
+ log.info("[写入DB 进度] {}/{}, 本批{}条, 累计{}条", end, list.size(), n, totalUpserted);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- System.out.println("[写入DB 完成] upserted=" + totalUpserted);
|
|
|
|
|
|
|
+ log.info("[写入DB 完成] upserted={}", totalUpserted);
|
|
|
return ReturnT.SUCCESS;
|
|
return ReturnT.SUCCESS;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- /**
|
|
|
|
|
- * 解析 his_publish_article_list 并聚合为单条 ArticleQuality
|
|
|
|
|
- */
|
|
|
|
|
private ArticleQuality aggregateFromHisPublishList(String contentId, String hisPublishListJson) {
|
|
private ArticleQuality aggregateFromHisPublishList(String contentId, String hisPublishListJson) {
|
|
|
JSONArray publishList;
|
|
JSONArray publishList;
|
|
|
try {
|
|
try {
|
|
@@ -232,14 +216,13 @@ public class ArticleQualitySyncJob {
|
|
|
totalAvgRead += avgView;
|
|
totalAvgRead += avgView;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 取最新发文(itemIndex 最大)的粉丝量
|
|
|
|
|
|
|
+ // 取最新发文的粉丝量
|
|
|
int itemIndex = pub.getIntValue("itemIndex");
|
|
int itemIndex = pub.getIntValue("itemIndex");
|
|
|
if (itemIndex > maxItemIndex) {
|
|
if (itemIndex > maxItemIndex) {
|
|
|
maxItemIndex = itemIndex;
|
|
maxItemIndex = itemIndex;
|
|
|
totalFans = pub.getLongValue("fans");
|
|
totalFans = pub.getLongValue("fans");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 首层和裂变
|
|
|
|
|
JSONArray fissionList = pub.getJSONArray("articleDetailInfoList");
|
|
JSONArray fissionList = pub.getJSONArray("articleDetailInfoList");
|
|
|
if (fissionList != null) {
|
|
if (fissionList != null) {
|
|
|
double pubFirstLevel = 0;
|
|
double pubFirstLevel = 0;
|
|
@@ -281,8 +264,6 @@ public class ArticleQualitySyncJob {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // ===== 辅助方法 =====
|
|
|
|
|
-
|
|
|
|
|
private static double parseParamDouble(String param, String key, double defaultValue) {
|
|
private static double parseParamDouble(String param, String key, double defaultValue) {
|
|
|
if (param == null || param.isEmpty()) return defaultValue;
|
|
if (param == null || param.isEmpty()) return defaultValue;
|
|
|
for (String part : param.split(",")) {
|
|
for (String part : param.split(",")) {
|
|
@@ -323,25 +304,21 @@ public class ArticleQualitySyncJob {
|
|
|
a.getQualityScore() == null ? 0 : a.getQualityScore()));
|
|
a.getQualityScore() == null ? 0 : a.getQualityScore()));
|
|
|
|
|
|
|
|
int show = Math.min(5, sorted.size());
|
|
int show = Math.min(5, sorted.size());
|
|
|
- System.out.println("===== Top " + show + " 高质量文章 =====");
|
|
|
|
|
|
|
+ log.info("===== Top {} 高质量文章 =====", show);
|
|
|
for (int i = 0; i < show; i++) {
|
|
for (int i = 0; i < show; i++) {
|
|
|
ArticleQuality aq = sorted.get(i);
|
|
ArticleQuality aq = sorted.get(i);
|
|
|
- System.out.println("[" + (i + 1) + "] contentId=" + aq.getContentId()
|
|
|
|
|
- + ", qualityScore=" + round2(aq.getQualityScore())
|
|
|
|
|
- + ", readScore=" + round2(aq.getReadScore())
|
|
|
|
|
- + ", openScore=" + round2(aq.getOpenScore())
|
|
|
|
|
- + ", fissionScore=" + round2(aq.getFissionScore())
|
|
|
|
|
- + ", conf=" + round2(aq.getConfidence()));
|
|
|
|
|
|
|
+ log.info("[{}] contentId={}, qualityScore={}, readScore={}, openScore={}, fissionScore={}, conf={}",
|
|
|
|
|
+ i + 1, aq.getContentId(), round2(aq.getQualityScore()),
|
|
|
|
|
+ round2(aq.getReadScore()), round2(aq.getOpenScore()),
|
|
|
|
|
+ round2(aq.getFissionScore()), round2(aq.getConfidence()));
|
|
|
}
|
|
}
|
|
|
- System.out.println("===== Bottom " + show + " 低质量文章 =====");
|
|
|
|
|
|
|
+ log.info("===== Bottom {} 低质量文章 =====", show);
|
|
|
for (int i = sorted.size() - 1; i >= Math.max(0, sorted.size() - show); i--) {
|
|
for (int i = sorted.size() - 1; i >= Math.max(0, sorted.size() - show); i--) {
|
|
|
ArticleQuality aq = sorted.get(i);
|
|
ArticleQuality aq = sorted.get(i);
|
|
|
- System.out.println("[" + (sorted.size() - i) + "] contentId=" + aq.getContentId()
|
|
|
|
|
- + ", qualityScore=" + round2(aq.getQualityScore())
|
|
|
|
|
- + ", readScore=" + round2(aq.getReadScore())
|
|
|
|
|
- + ", openScore=" + round2(aq.getOpenScore())
|
|
|
|
|
- + ", fissionScore=" + round2(aq.getFissionScore())
|
|
|
|
|
- + ", conf=" + round2(aq.getConfidence()));
|
|
|
|
|
|
|
+ log.info("[{}] contentId={}, qualityScore={}, readScore={}, openScore={}, fissionScore={}, conf={}",
|
|
|
|
|
+ sorted.size() - i, aq.getContentId(), round2(aq.getQualityScore()),
|
|
|
|
|
+ round2(aq.getReadScore()), round2(aq.getOpenScore()),
|
|
|
|
|
+ round2(aq.getFissionScore()), round2(aq.getConfidence()));
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|