|
|
@@ -0,0 +1,330 @@
|
|
|
+package com.tzld.videoVector.job;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.tzld.videoVector.dao.mapper.pgVector.ext.ArticleQualityMapperExt;
|
|
|
+import com.tzld.videoVector.model.po.pgVector.ArticleQuality;
|
|
|
+import com.tzld.videoVector.util.ArticleQualityCalculator;
|
|
|
+import com.tzld.videoVector.util.OdpsUtil;
|
|
|
+import com.xxl.job.core.biz.model.ReturnT;
|
|
|
+import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.time.LocalDate;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+@Component
|
|
|
+public class ArticleQualitySyncJob {
|
|
|
+
|
|
|
+ private static final Logger log = LoggerFactory.getLogger(ArticleQualitySyncJob.class);
|
|
|
+
|
|
|
+ private static final DateTimeFormatter DT_FMT = DateTimeFormatter.ofPattern("yyyyMMdd");
|
|
|
+ private static final int DB_BATCH_SIZE = 200;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private ArticleQualityMapperExt articleQualityMapperExt;
|
|
|
+
|
|
|
+ @XxlJob("articleQualityJob")
|
|
|
+ public ReturnT<String> articleQualityJob(String param) {
|
|
|
+ log.info("===== articleQualityJob 开始, param: {} =====", param);
|
|
|
+
|
|
|
+ double wRead = parseParamDouble(param, "wRead", 0.4);
|
|
|
+ double wOpen = parseParamDouble(param, "wOpen", 0.3);
|
|
|
+ double wFission = parseParamDouble(param, "wFission", 0.3);
|
|
|
+ int confidenceThreshold = (int) parseParamDouble(param, "confidenceThreshold", 3);
|
|
|
+ boolean dryRun = parseParamBool(param, "dryRun");
|
|
|
+ String odpsDt = parseParamString(param, "dt", LocalDate.now().minusDays(1).format(DT_FMT));
|
|
|
+ int maxRows = (int) parseParamDouble(param, "maxRows", 0);
|
|
|
+
|
|
|
+ if (!odpsDt.matches("\\d{8}")) {
|
|
|
+ log.error("odpsDt 格式非法,期望 yyyyMMdd: {}", odpsDt);
|
|
|
+ return ReturnT.FAIL;
|
|
|
+ }
|
|
|
+ if (maxRows < 0) {
|
|
|
+ log.error("maxRows 不能为负数: {}", maxRows);
|
|
|
+ return ReturnT.FAIL;
|
|
|
+ }
|
|
|
+
|
|
|
+ String safeOdpsDt = odpsDt.replace("'", "''");
|
|
|
+
|
|
|
+ String dt = LocalDate.now().format(DT_FMT);
|
|
|
+ log.info("权重: r={} o={} f={}, 置信度阈值: {}, ODPS分区dt={}, 写入dt={}",
|
|
|
+ wRead, wOpen, wFission, confidenceThreshold, safeOdpsDt, dt);
|
|
|
+
|
|
|
+ // 确认分区有数据
|
|
|
+ String probeSql = "SELECT COUNT(*) AS cnt FROM loghubods.article_title_his_cache WHERE dt = '" + safeOdpsDt + "' AND type = '9'";
|
|
|
+ log.info("探针 SQL: {}", probeSql);
|
|
|
+ try {
|
|
|
+ long[] rowCount = {0};
|
|
|
+ OdpsUtil.getOdpsDataStream(probeSql, record -> {
|
|
|
+ rowCount[0] = record.getBigint("cnt") != null ? record.getBigint("cnt") : 0;
|
|
|
+ });
|
|
|
+ log.info("探针: dt={} 总行数={}", odpsDt, rowCount[0]);
|
|
|
+ if (rowCount[0] == 0) {
|
|
|
+ log.warn("分区 dt={} 无数据,请确认分区值是否正确", odpsDt);
|
|
|
+ return ReturnT.FAIL;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("探针查询失败: {}", e.getMessage(), e);
|
|
|
+ return ReturnT.FAIL;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 从 ODPS 流式读取
|
|
|
+ List<ArticleQuality> rawList = new ArrayList<>();
|
|
|
+ long[] totalRows = {0};
|
|
|
+ long[] parseFailCount = {0};
|
|
|
+ long[] emptyDataCount = {0};
|
|
|
+ long[] validCount = {0};
|
|
|
+
|
|
|
+ String sql = maxRows > 0
|
|
|
+ ? "SELECT source_id, his_publish_article_list "
|
|
|
+ + "FROM loghubods.article_title_his_cache "
|
|
|
+ + "WHERE dt = '" + safeOdpsDt + "' "
|
|
|
+ + "AND type = '9' "
|
|
|
+ + "AND his_publish_article_list IS NOT NULL "
|
|
|
+ + "LIMIT " + maxRows
|
|
|
+ : "SELECT source_id, his_publish_article_list "
|
|
|
+ + "FROM loghubods.article_title_his_cache "
|
|
|
+ + "WHERE dt = '" + safeOdpsDt + "' "
|
|
|
+ + "AND type = '9' "
|
|
|
+ + "AND his_publish_article_list IS NOT NULL";
|
|
|
+
|
|
|
+ log.info("ODPS SQL: {}", sql);
|
|
|
+ try {
|
|
|
+ OdpsUtil.getOdpsDataStream(sql, record -> {
|
|
|
+ String contentId = record.getString("source_id");
|
|
|
+ String hisPublishStr = record.getString("his_publish_article_list");
|
|
|
+
|
|
|
+ if (contentId == null || contentId.isEmpty()) return;
|
|
|
+ if (hisPublishStr == null || hisPublishStr.isEmpty()) return;
|
|
|
+
|
|
|
+ if (totalRows[0] < 3) {
|
|
|
+ String preview = hisPublishStr.length() > 500
|
|
|
+ ? hisPublishStr.substring(0, 500) + "..."
|
|
|
+ : hisPublishStr;
|
|
|
+ log.info("[采样{}] contentId={}, json={}", totalRows[0], contentId, preview);
|
|
|
+ }
|
|
|
+
|
|
|
+ ArticleQuality aq = aggregateFromHisPublishList(contentId, hisPublishStr);
|
|
|
+ if (aq != null) {
|
|
|
+ synchronized (rawList) { rawList.add(aq); }
|
|
|
+ validCount[0]++;
|
|
|
+ } else {
|
|
|
+ if (isJsonParseFail(hisPublishStr)) {
|
|
|
+ parseFailCount[0]++;
|
|
|
+ if (parseFailCount[0] <= 5) {
|
|
|
+ String preview = hisPublishStr.length() > 300
|
|
|
+ ? hisPublishStr.substring(0, 300) + "..."
|
|
|
+ : hisPublishStr;
|
|
|
+ log.info("[解析失败#{}] contentId={}, json={}", parseFailCount[0], contentId, preview);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ emptyDataCount[0]++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ totalRows[0]++;
|
|
|
+ if (totalRows[0] % 10000 == 0) {
|
|
|
+ log.info("[进度] {} 行, 有效={}, 解析失败={}, 无数据={}", totalRows[0], validCount[0], parseFailCount[0], emptyDataCount[0]);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("ODPS 查询异常: {}", e.getMessage(), e);
|
|
|
+ return ReturnT.FAIL;
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("[完成] 总行数={}, 有效={}, 解析失败={}, 无数据={}", totalRows[0], validCount[0], parseFailCount[0], emptyDataCount[0]);
|
|
|
+ if (rawList.isEmpty()) {
|
|
|
+ log.warn("无有效文章表现数据");
|
|
|
+ return ReturnT.FAIL;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 计算质量分
|
|
|
+ ArticleQualityCalculator.calculateAll(rawList, wRead, wOpen, wFission, confidenceThreshold);
|
|
|
+
|
|
|
+ if (dryRun) {
|
|
|
+ log.info("===== DRY RUN 模式, 不写入DB =====");
|
|
|
+ printTopBottom(rawList);
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 按 contentId 去重(ODPS 同 contentId 可能多行)
|
|
|
+ Map<String, ArticleQuality> deduped = new LinkedHashMap<>();
|
|
|
+ for (ArticleQuality aq : rawList) {
|
|
|
+ deduped.putIfAbsent(aq.getContentId(), aq);
|
|
|
+ }
|
|
|
+ List<ArticleQuality> list = new ArrayList<>(deduped.values());
|
|
|
+ log.info("[去重] {} → {} 条", rawList.size(), list.size());
|
|
|
+
|
|
|
+ // 分批写入
|
|
|
+ log.info("[写入DB] 开始, 共 {} 条, dt={}", list.size(), dt);
|
|
|
+ if (!list.isEmpty()) {
|
|
|
+ ArticleQuality sample = list.get(0);
|
|
|
+ log.info("[写入DB 采样] contentId={}, qualityScore={}, dt={}",
|
|
|
+ sample.getContentId(), round2(sample.getQualityScore()), dt);
|
|
|
+ }
|
|
|
+ int totalUpserted = 0;
|
|
|
+ for (int i = 0; i < list.size(); i += DB_BATCH_SIZE) {
|
|
|
+ int end = Math.min(i + DB_BATCH_SIZE, list.size());
|
|
|
+ List<ArticleQuality> batch = list.subList(i, end);
|
|
|
+ for (ArticleQuality aq : batch) {
|
|
|
+ aq.setDt(dt);
|
|
|
+ }
|
|
|
+ int n = articleQualityMapperExt.batchUpsert(batch);
|
|
|
+ totalUpserted += n;
|
|
|
+ if ((i / DB_BATCH_SIZE) % 50 == 0) {
|
|
|
+ log.info("[写入DB 进度] {}/{}, 本批{}条, 累计{}条", end, list.size(), n, totalUpserted);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.info("[写入DB 完成] upserted={}", totalUpserted);
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ }
|
|
|
+
|
|
|
+ private ArticleQuality aggregateFromHisPublishList(String contentId, String hisPublishListJson) {
|
|
|
+ JSONArray publishList;
|
|
|
+ try {
|
|
|
+ publishList = JSON.parseArray(hisPublishListJson);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.info("contentId={} his_publish_article_list JSON 解析失败: {}", contentId, e.getMessage());
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if (publishList == null || publishList.isEmpty()) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ long totalRead = 0;
|
|
|
+ double totalAvgRead = 0;
|
|
|
+ long totalFans = 0;
|
|
|
+ int maxItemIndex = -1;
|
|
|
+ double totalFirstLevel = 0;
|
|
|
+ double totalFission = 0;
|
|
|
+
|
|
|
+ for (int i = 0; i < publishList.size(); i++) {
|
|
|
+ JSONObject pub = publishList.getJSONObject(i);
|
|
|
+ if (pub == null) continue;
|
|
|
+
|
|
|
+ long viewCount = pub.getLongValue("viewCount");
|
|
|
+ totalRead += viewCount;
|
|
|
+
|
|
|
+ Double avgView = pub.getDouble("avgViewCount");
|
|
|
+ if (avgView != null) {
|
|
|
+ totalAvgRead += avgView;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 取最新发文的粉丝量
|
|
|
+ int itemIndex = pub.getIntValue("itemIndex");
|
|
|
+ if (itemIndex > maxItemIndex) {
|
|
|
+ maxItemIndex = itemIndex;
|
|
|
+ totalFans = pub.getLongValue("fans");
|
|
|
+ }
|
|
|
+
|
|
|
+ JSONArray fissionList = pub.getJSONArray("articleDetailInfoList");
|
|
|
+ if (fissionList != null) {
|
|
|
+ double pubFirstLevel = 0;
|
|
|
+ double pubFission = 0;
|
|
|
+ for (int j = 0; j < fissionList.size(); j++) {
|
|
|
+ JSONObject fi = fissionList.getJSONObject(j);
|
|
|
+ if (fi == null) continue;
|
|
|
+ pubFirstLevel += fi.getDoubleValue("firstLevel");
|
|
|
+ pubFission += fi.getDoubleValue("fission0")
|
|
|
+ + fi.getDoubleValue("fission1")
|
|
|
+ + fi.getDoubleValue("fission2");
|
|
|
+ }
|
|
|
+ totalFirstLevel += pubFirstLevel;
|
|
|
+ totalFission += pubFission;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (totalRead <= 0 && totalAvgRead <= 0) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ ArticleQuality aq = new ArticleQuality();
|
|
|
+ aq.setContentId(contentId);
|
|
|
+ aq.setTotalRead(totalRead);
|
|
|
+ aq.setAvgRead(totalAvgRead);
|
|
|
+ aq.setTotalFans(totalFans);
|
|
|
+ aq.setPublishCount(publishList.size());
|
|
|
+ aq.setOpenRate(totalRead > 0 ? totalFirstLevel / totalRead : 0);
|
|
|
+ aq.setFissionRate(totalFirstLevel > 0 ? totalFission / totalFirstLevel : 0);
|
|
|
+ return aq;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static boolean isJsonParseFail(String json) {
|
|
|
+ try {
|
|
|
+ JSON.parseArray(json);
|
|
|
+ return false;
|
|
|
+ } catch (Exception e) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static double parseParamDouble(String param, String key, double defaultValue) {
|
|
|
+ if (param == null || param.isEmpty()) return defaultValue;
|
|
|
+ for (String part : param.split(",")) {
|
|
|
+ String[] kv = part.trim().split("=", 2);
|
|
|
+ if (kv.length == 2 && kv[0].trim().equals(key)) {
|
|
|
+ try { return Double.parseDouble(kv[1].trim()); } catch (NumberFormatException ignored) { }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return defaultValue;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static String parseParamString(String param, String key, String defaultValue) {
|
|
|
+ if (param == null || param.isEmpty()) return defaultValue;
|
|
|
+ for (String part : param.split(",")) {
|
|
|
+ String[] kv = part.trim().split("=", 2);
|
|
|
+ if (kv.length == 2 && kv[0].trim().equals(key)) {
|
|
|
+ return kv[1].trim();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return defaultValue;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static boolean parseParamBool(String param, String key) {
|
|
|
+ if (param == null || param.isEmpty()) return false;
|
|
|
+ for (String part : param.split(",")) {
|
|
|
+ String[] kv = part.trim().split("=", 2);
|
|
|
+ if (kv.length == 2 && kv[0].trim().equals(key)) {
|
|
|
+ return "true".equalsIgnoreCase(kv[1].trim());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static void printTopBottom(List<ArticleQuality> list) {
|
|
|
+ List<ArticleQuality> sorted = new ArrayList<>(list);
|
|
|
+ sorted.sort((a, b) -> Double.compare(
|
|
|
+ b.getQualityScore() == null ? 0 : b.getQualityScore(),
|
|
|
+ a.getQualityScore() == null ? 0 : a.getQualityScore()));
|
|
|
+
|
|
|
+ int show = Math.min(5, sorted.size());
|
|
|
+ log.info("===== Top {} 高质量文章 =====", show);
|
|
|
+ for (int i = 0; i < show; i++) {
|
|
|
+ ArticleQuality aq = sorted.get(i);
|
|
|
+ log.info("[{}] contentId={}, qualityScore={}, readScore={}, openScore={}, fissionScore={}, conf={}",
|
|
|
+ i + 1, aq.getContentId(), round2(aq.getQualityScore()),
|
|
|
+ round2(aq.getReadScore()), round2(aq.getOpenScore()),
|
|
|
+ round2(aq.getFissionScore()), round2(aq.getConfidence()));
|
|
|
+ }
|
|
|
+ log.info("===== Bottom {} 低质量文章 =====", show);
|
|
|
+ for (int i = sorted.size() - 1; i >= Math.max(0, sorted.size() - show); i--) {
|
|
|
+ ArticleQuality aq = sorted.get(i);
|
|
|
+ log.info("[{}] contentId={}, qualityScore={}, readScore={}, openScore={}, fissionScore={}, conf={}",
|
|
|
+ sorted.size() - i, aq.getContentId(), round2(aq.getQualityScore()),
|
|
|
+ round2(aq.getReadScore()), round2(aq.getOpenScore()),
|
|
|
+ round2(aq.getFissionScore()), round2(aq.getConfidence()));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static double round2(Double v) {
|
|
|
+ return ArticleQualityCalculator.round2(v);
|
|
|
+ }
|
|
|
+}
|