|
|
@@ -35,6 +35,11 @@ public class ContentPlatformDemandVideoJob {
|
|
|
@Autowired
|
|
|
private MessageAttachmentService messageAttachmentService;
|
|
|
|
|
|
+ private static final List<String> SYNC_CHANNEL_NAMES = Arrays.asList(
|
|
|
+ "公众号合作-即转-稳定",
|
|
|
+ "群/企微合作-稳定"
|
|
|
+ );
|
|
|
+
|
|
|
@XxlJob("syncContentPlatformDemandVideoJob")
|
|
|
public ReturnT<String> syncContentPlatformDemandVideoJob(String param) {
|
|
|
String dt = DateUtil.getBeforeDayDateString("yyyyMMdd");
|
|
|
@@ -43,40 +48,71 @@ public class ContentPlatformDemandVideoJob {
|
|
|
}
|
|
|
log.info("syncContentPlatformDemandVideoJob start, dt={}", dt);
|
|
|
|
|
|
- try {
|
|
|
- // 调用接口获取需求匹配结果
|
|
|
+ for (String syncChannelName : SYNC_CHANNEL_NAMES) {
|
|
|
+ try {
|
|
|
+ syncByChannel(dt, syncChannelName);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("syncContentPlatformDemandVideoJob error, dt={}, channelName={}", dt, syncChannelName, e);
|
|
|
+ return ReturnT.FAIL;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ log.info("syncContentPlatformDemandVideoJob finish, dt={}", dt);
|
|
|
+ return ReturnT.SUCCESS;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final int PAGE_SIZE = 10000;
|
|
|
+
|
|
|
+ private void syncByChannel(String dt, String syncChannelName) throws Exception {
|
|
|
+ log.info("syncByChannel start, dt={}, channelName={}", dt, syncChannelName);
|
|
|
+
|
|
|
+ Long now = System.currentTimeMillis();
|
|
|
+ List<ContentPlatformDemandVideo> saveList = new ArrayList<>();
|
|
|
+
|
|
|
+ int pageNum = 1;
|
|
|
+ int totalPages = 1;
|
|
|
+
|
|
|
+ while (pageNum <= totalPages) {
|
|
|
+ // 调用分页接口获取需求匹配结果
|
|
|
JSONObject requestParam = new JSONObject();
|
|
|
requestParam.put("dt", dt);
|
|
|
- requestParam.put("channelName", "群/企微合作-稳定");
|
|
|
+ requestParam.put("channelName", syncChannelName);
|
|
|
+ requestParam.put("pageNum", pageNum);
|
|
|
+ requestParam.put("pageSize", PAGE_SIZE);
|
|
|
|
|
|
String response = httpPoolClient.post(DEMAND_MATCH_API_URL, requestParam.toJSONString());
|
|
|
if (!StringUtils.hasText(response)) {
|
|
|
- log.error("syncContentPlatformDemandVideoJob response is empty, dt={}", dt);
|
|
|
- return ReturnT.FAIL;
|
|
|
+ log.error("syncByChannel response is empty, dt={}, channelName={}, pageNum={}", dt, syncChannelName, pageNum);
|
|
|
+ throw new RuntimeException("syncByChannel response is empty");
|
|
|
}
|
|
|
|
|
|
JSONObject result = JSONObject.parseObject(response);
|
|
|
if (result.getInteger("code") != 0) {
|
|
|
- log.error("syncContentPlatformDemandVideoJob api error, dt={}, msg={}", dt, result.getString("msg"));
|
|
|
- return ReturnT.FAIL;
|
|
|
+ log.error("syncByChannel api error, dt={}, channelName={}, pageNum={}, msg={}", dt, syncChannelName, pageNum, result.getString("msg"));
|
|
|
+ throw new RuntimeException("syncByChannel api error: " + result.getString("msg"));
|
|
|
}
|
|
|
|
|
|
- JSONArray dataArray = result.getJSONArray("data");
|
|
|
- if (dataArray == null || dataArray.isEmpty()) {
|
|
|
- log.info("syncContentPlatformDemandVideoJob no data, dt={}", dt);
|
|
|
- return ReturnT.SUCCESS;
|
|
|
+ JSONObject dataObj = result.getJSONObject("data");
|
|
|
+ if (dataObj == null) {
|
|
|
+ log.info("syncByChannel no data, dt={}, channelName={}, pageNum={}", dt, syncChannelName, pageNum);
|
|
|
+ break;
|
|
|
}
|
|
|
|
|
|
- Long now = System.currentTimeMillis();
|
|
|
- List<ContentPlatformDemandVideo> saveList = new ArrayList<>();
|
|
|
+ totalPages = dataObj.getIntValue("totalPages");
|
|
|
+ JSONArray records = dataObj.getJSONArray("records");
|
|
|
+ if (records == null || records.isEmpty()) {
|
|
|
+ log.info("syncByChannel no records, dt={}, channelName={}, pageNum={}", dt, syncChannelName, pageNum);
|
|
|
+ break;
|
|
|
+ }
|
|
|
|
|
|
- for (int i = 0; i < dataArray.size(); i++) {
|
|
|
- JSONObject demandItem = dataArray.getJSONObject(i);
|
|
|
+ for (int i = 0; i < records.size(); i++) {
|
|
|
+ JSONObject demandItem = records.getJSONObject(i);
|
|
|
String channelName = demandItem.getString("channelName");
|
|
|
String crowdSegment = demandItem.getString("crowdSegment");
|
|
|
String dimension = demandItem.getString("dimension");
|
|
|
String pointType = demandItem.getString("pointType");
|
|
|
String standardElement = demandItem.getString("standardElement");
|
|
|
+ String elementDimension = demandItem.getString("elementDimension");
|
|
|
String categoryName = demandItem.getString("categoryName");
|
|
|
String demandId = demandItem.getString("demandId");
|
|
|
String crowdPackage = demandItem.getString("crowdPackage");
|
|
|
@@ -113,6 +149,7 @@ public class ContentPlatformDemandVideoJob {
|
|
|
demandVideo.setDimension(dimension != null ? dimension : "");
|
|
|
demandVideo.setPointType(pointType != null ? pointType : "");
|
|
|
demandVideo.setStandardElement(standardElement != null ? standardElement : "");
|
|
|
+ demandVideo.setElementDimension(elementDimension != null ? elementDimension : "");
|
|
|
demandVideo.setCategoryName(categoryName != null ? categoryName : "");
|
|
|
demandVideo.setDemandId(demandId != null ? demandId : "");
|
|
|
demandVideo.setCrowdPackage(crowdPackage != null ? crowdPackage : "");
|
|
|
@@ -143,65 +180,67 @@ public class ContentPlatformDemandVideoJob {
|
|
|
demandVideo.setExperimentId(videoItem.getString("experimentId"));
|
|
|
demandVideo.setStatus(1);
|
|
|
demandVideo.setCreateTimestamp(now);
|
|
|
+ if (Objects.isNull(demandVideo.getRov()) || demandVideo.getRov() == 0.0) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
saveList.add(demandVideo);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (CollectionUtils.isEmpty(saveList)) {
|
|
|
- log.info("syncContentPlatformDemandVideoJob no matched videos, dt={}", dt);
|
|
|
- return ReturnT.SUCCESS;
|
|
|
- }
|
|
|
+ pageNum++;
|
|
|
+ }
|
|
|
|
|
|
- // 获取视频详情(标题、封面、视频URL)
|
|
|
- List<Long> videoIds = saveList.stream().map(ContentPlatformDemandVideo::getVideoId)
|
|
|
- .distinct().collect(Collectors.toList());
|
|
|
- Map<Long, VideoDetail> videoDetailMap = new HashMap<>();
|
|
|
- for (List<Long> partition : Lists.partition(videoIds, 20)) {
|
|
|
- Set<Long> ids = new HashSet<>(partition);
|
|
|
- videoDetailMap.putAll(messageAttachmentService.getVideoDetail(ids));
|
|
|
- }
|
|
|
+ if (CollectionUtils.isEmpty(saveList)) {
|
|
|
+ log.info("syncByChannel no matched videos, dt={}, channelName={}", dt, syncChannelName);
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- // 填充视频详情
|
|
|
- for (ContentPlatformDemandVideo demandVideo : saveList) {
|
|
|
- VideoDetail detail = videoDetailMap.get(demandVideo.getVideoId());
|
|
|
- if (Objects.nonNull(detail)) {
|
|
|
- demandVideo.setTitle(detail.getTitle());
|
|
|
- String cover = detail.getCover();
|
|
|
- if (StringUtils.hasText(cover) && cover.contains("/watermark")) {
|
|
|
- cover = cover.substring(0, cover.indexOf("/watermark"));
|
|
|
- }
|
|
|
- demandVideo.setCover(cover);
|
|
|
- demandVideo.setVideo(detail.getVideoPath());
|
|
|
+ // 获取视频详情(标题、封面、视频URL)
|
|
|
+ List<Long> videoIds = saveList.stream().map(ContentPlatformDemandVideo::getVideoId)
|
|
|
+ .distinct().collect(Collectors.toList());
|
|
|
+ Map<Long, VideoDetail> videoDetailMap = new HashMap<>();
|
|
|
+ for (List<Long> partition : Lists.partition(videoIds, 20)) {
|
|
|
+ Set<Long> ids = new HashSet<>(partition);
|
|
|
+ videoDetailMap.putAll(messageAttachmentService.getVideoDetail(ids));
|
|
|
+ }
|
|
|
+
|
|
|
+ // 填充视频详情
|
|
|
+ for (ContentPlatformDemandVideo demandVideo : saveList) {
|
|
|
+ VideoDetail detail = videoDetailMap.get(demandVideo.getVideoId());
|
|
|
+ if (Objects.nonNull(detail)) {
|
|
|
+ demandVideo.setTitle(detail.getTitle());
|
|
|
+ String cover = detail.getCover();
|
|
|
+ if (StringUtils.hasText(cover) && cover.contains("/watermark")) {
|
|
|
+ cover = cover.substring(0, cover.indexOf("/watermark"));
|
|
|
}
|
|
|
+ demandVideo.setCover(cover);
|
|
|
+ demandVideo.setVideo(detail.getVideoPath());
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- // 过滤未获取到视频详情的记录
|
|
|
- saveList = saveList.stream()
|
|
|
- .filter(v -> StringUtils.hasText(v.getTitle()))
|
|
|
- .collect(Collectors.toList());
|
|
|
-
|
|
|
- // 按crowd_segment粒度去重,相同videoId保留分数最高的一条
|
|
|
- saveList = saveList.stream()
|
|
|
- .collect(Collectors.groupingBy(v -> v.getCrowdSegment() + "_" + v.getVideoId()))
|
|
|
- .values().stream()
|
|
|
- .map(group -> group.stream()
|
|
|
- .max(Comparator.comparingDouble(v -> v.getScore() != null ? v.getScore() : 0.0))
|
|
|
- .orElse(null))
|
|
|
- .filter(Objects::nonNull)
|
|
|
- .collect(Collectors.toList());
|
|
|
-
|
|
|
- // 先删除当天数据,再批量插入
|
|
|
- demandVideoMapperExt.deleteByDt(dt);
|
|
|
- for (List<ContentPlatformDemandVideo> partition : Lists.partition(saveList, 500)) {
|
|
|
- demandVideoMapperExt.batchInsert(partition);
|
|
|
- }
|
|
|
+ // 过滤未获取到视频详情的记录
|
|
|
+ saveList = saveList.stream()
|
|
|
+ .filter(v -> StringUtils.hasText(v.getTitle()))
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
|
|
- log.info("syncContentPlatformDemandVideoJob success, dt={}, count={}", dt, saveList.size());
|
|
|
- return ReturnT.SUCCESS;
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("syncContentPlatformDemandVideoJob error, dt={}", dt, e);
|
|
|
- return ReturnT.FAIL;
|
|
|
+// // 按crowd_segment粒度去重,相同videoId保留分数最高的一条
|
|
|
+// saveList = saveList.stream()
|
|
|
+// .collect(Collectors.groupingBy(v -> v.getCrowdSegment() + "_"
|
|
|
+// + v.getChannelLevel3() + "_" + v.getDimension() + "_" + v.getVideoId()))
|
|
|
+// .values().stream()
|
|
|
+// .map(group -> group.stream()
|
|
|
+// .max(Comparator.comparingDouble(v -> v.getScore() != null ? v.getScore() : 0.0))
|
|
|
+// .orElse(null))
|
|
|
+// .filter(Objects::nonNull)
|
|
|
+// .collect(Collectors.toList());
|
|
|
+
|
|
|
+ // 先删除当天该渠道数据,再批量插入
|
|
|
+ demandVideoMapperExt.deleteByDtAndChannelName(dt, syncChannelName);
|
|
|
+ for (List<ContentPlatformDemandVideo> partition : Lists.partition(saveList, 1000)) {
|
|
|
+ demandVideoMapperExt.batchInsert(partition);
|
|
|
}
|
|
|
+
|
|
|
+ log.info("syncByChannel success, dt={}, channelName={}, count={}", dt, syncChannelName, saveList.size());
|
|
|
}
|
|
|
|
|
|
@XxlJob("checkContentPlatformDemandVideoStatusJob")
|