|
|
@@ -1,7 +1,6 @@
|
|
|
package com.tzld.piaoquan.sde.service.impl;
|
|
|
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
-import com.alibaba.fastjson.TypeReference;
|
|
|
import com.aliyun.odps.data.Record;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
|
|
@@ -16,6 +15,7 @@ import com.tzld.piaoquan.sde.model.dto.ContentInputParamsDTO;
|
|
|
import com.tzld.piaoquan.sde.model.dto.SdExecutionTaskPropertiesDTO;
|
|
|
import com.tzld.piaoquan.sde.model.dto.cluster.ManualClusterExecutionConfigDTO;
|
|
|
import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
|
|
|
+import com.tzld.piaoquan.sde.model.dto.task.XxlJobParamDto;
|
|
|
import com.tzld.piaoquan.sde.model.entity.ContentProfile;
|
|
|
import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
|
|
|
import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
|
|
|
@@ -40,6 +40,7 @@ import org.springframework.util.CollectionUtils;
|
|
|
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.ZoneId;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.stream.Collectors;
|
|
|
@@ -72,6 +73,9 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
@Value("${yesterday.return.video.table:loghubods.lastday_return}")
|
|
|
private String yesterdayReturnVideoTable;
|
|
|
|
|
|
+ @Value("${merge.cate2.cluster.video.cnt.min.limit:20}")
|
|
|
+ private Integer mergeCate2ClusterVideoCntMinLimit;
|
|
|
+
|
|
|
// @Value("${video.tag.table:loghubods.video_merge_tag}")
|
|
|
// private String videoTagTable;
|
|
|
|
|
|
@@ -373,20 +377,14 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
|
|
|
@Override
|
|
|
public void videoExecutionTaskCreateHandler(String params) {
|
|
|
- Map<String, String> paramJson = new HashMap<>();
|
|
|
- try {
|
|
|
- paramJson = JSONObject.parseObject(params, new TypeReference<Map<String, String>>() {
|
|
|
- });
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("parse xxl job param error params: {}", params, e);
|
|
|
- }
|
|
|
+ XxlJobParamDto paramJson = JSONObject.parseObject(params, XxlJobParamDto.class);
|
|
|
|
|
|
|
|
|
long start = System.nanoTime();
|
|
|
log.info("videoExecutionTaskCreateHandler start: {}", JSONObject.toJSONString(paramJson));
|
|
|
|
|
|
- String table = paramJson.getOrDefault("table", yesterdayReturnVideoTable);
|
|
|
- String dt = paramJson.get("dt");
|
|
|
+ String table = paramJson.getTableOrDefault(yesterdayReturnVideoTable);
|
|
|
+ String dt = paramJson.getDt();
|
|
|
if (StringUtils.isBlank(dt)) {
|
|
|
LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
|
|
|
dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
|
|
|
@@ -398,7 +396,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- String contentScope = paramJson.getOrDefault("contentScope", YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
|
|
|
+ String contentScope = paramJson.getContentScopeOrDefault(YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
|
|
|
|
|
|
XxlJobLogger.log("videoExecutionTaskCreateHandler records size={}", records.size());
|
|
|
List<Record> findRecords = new ArrayList<>();
|
|
|
@@ -578,4 +576,109 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void videoClusterExecutionTaskCreateHandler(String params) {
|
|
|
+ log.info("videoClusterExecutionTaskCreateHandler params: {}", params);
|
|
|
+
|
|
|
+ XxlJobParamDto xxlJobParamDto = JSONObject.parseObject(params, XxlJobParamDto.class);
|
|
|
+ String table = xxlJobParamDto.getTable();
|
|
|
+ String contentScope = xxlJobParamDto.getContentScope();
|
|
|
+ if (StringUtils.isBlank(table) || StringUtils.isBlank(contentScope)) {
|
|
|
+ XxlJobLogger.log("table and contentScope must not null");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String dt = xxlJobParamDto.getDt();
|
|
|
+ if (StringUtils.isBlank(dt)) {
|
|
|
+ LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
|
|
|
+ dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
|
|
|
+ }
|
|
|
+
|
|
|
+ String sql = String.format("select * from %s where dt = %s", table, dt);
|
|
|
+ log.info("query sql: {}", sql);
|
|
|
+ XxlJobLogger.log("query sql: {}", sql);
|
|
|
+ List<Record> records = odpsManager.query(sql);
|
|
|
+ if (CollectionUtils.isEmpty(records)) {
|
|
|
+ log.info("records is empty");
|
|
|
+ XxlJobLogger.log("records is empty");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, ManualClusterExecutionConfigDTO> mergeCate2AndConfigMap = new HashMap<>();
|
|
|
+ for (Record record : records) {
|
|
|
+ String mergeCate2 = record.getString("merge_leve2");
|
|
|
+ if (StringUtils.isBlank(mergeCate2)) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (!mergeCate2AndConfigMap.containsKey(mergeCate2)) {
|
|
|
+ ManualClusterExecutionConfigDTO manualClusterExecutionConfigDTO = new ManualClusterExecutionConfigDTO();
|
|
|
+ manualClusterExecutionConfigDTO.setContentScope(contentScope);
|
|
|
+ manualClusterExecutionConfigDTO.setCategory(mergeCate2);
|
|
|
+ manualClusterExecutionConfigDTO.setContents(new ArrayList<>());
|
|
|
+
|
|
|
+ String patternName = String.format("%s_%s_%s", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")), mergeCate2, contentScope);
|
|
|
+ manualClusterExecutionConfigDTO.setPatternName(patternName);
|
|
|
+
|
|
|
+ mergeCate2AndConfigMap.put(mergeCate2, manualClusterExecutionConfigDTO);
|
|
|
+ }
|
|
|
+
|
|
|
+ String contentId = record.getString("videoid");
|
|
|
+ Double weightScore = Double.parseDouble(record.getString("vov"));
|
|
|
+ ManualClusterExecutionConfigDTO.ContentDTO content = new ManualClusterExecutionConfigDTO.ContentDTO();
|
|
|
+ content.setContentId(contentId);
|
|
|
+ content.setWeightScore(weightScore);
|
|
|
+
|
|
|
+ mergeCate2AndConfigMap.get(mergeCate2).getContents().add(content);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (Map.Entry<String, ManualClusterExecutionConfigDTO> entry : mergeCate2AndConfigMap.entrySet()) {
|
|
|
+ String mergeCate2 = entry.getKey();
|
|
|
+ ManualClusterExecutionConfigDTO configDTO = entry.getValue();
|
|
|
+ try {
|
|
|
+ if (CollectionUtils.isEmpty(configDTO.getContents()) || configDTO.getContents().size() < mergeCate2ClusterVideoCntMinLimit) {
|
|
|
+ log.error("品类 {} 视频数量 {} 不够 {}, 跳过创建聚类任务", mergeCate2, configDTO.getContents().size(), mergeCate2ClusterVideoCntMinLimit);
|
|
|
+ XxlJobLogger.log("品类 {} 视频数量 {} 不够 {}, 跳过创建聚类任务", mergeCate2, configDTO.getContents().size(), mergeCate2ClusterVideoCntMinLimit);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ SdExecutionTask sdExecutionTask = new SdExecutionTask();
|
|
|
+ sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
|
|
|
+ sdExecutionTask.setTaskType(TaskTypeEnum.CLUSTER.getValue());
|
|
|
+ sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
|
|
|
+ sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
+
|
|
|
+ // 属性设置
|
|
|
+ SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
|
|
|
+ propertiesDTO.setContentScope(contentScope);
|
|
|
+ propertiesDTO.setPatternName(configDTO.getPatternName());
|
|
|
+ propertiesDTO.setCategory(configDTO.getCategory());
|
|
|
+ sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
|
|
|
+
|
|
|
+ // 关联内容:多条
|
|
|
+ List<ManualClusterExecutionConfigDTO.ContentDTO> contentDTOS = configDTO.getContents();
|
|
|
+ List<SdExecutionTaskContent> contentList = new ArrayList<>(contentDTOS.size());
|
|
|
+ for (ManualClusterExecutionConfigDTO.ContentDTO contentDTO : contentDTOS) {
|
|
|
+ if (Objects.isNull(contentDTO) || Objects.isNull(contentDTO.getContentId())) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ String contentId = contentDTO.getContentId();
|
|
|
+ SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
|
|
|
+ sdExecutionTaskContent.setContentType(ContentTypeEnum.VIDEO.getValue());
|
|
|
+ sdExecutionTaskContent.setContentId(contentId);
|
|
|
+ ContentInputParamsDTO contentInputParamsDTO = new ContentInputParamsDTO();
|
|
|
+ contentInputParamsDTO.setWeightScore(contentDTO.getWeightScore());
|
|
|
+ sdExecutionTaskContent.setInputParams(JSONObject.toJSONString(contentInputParamsDTO));
|
|
|
+ sdExecutionTaskContent.setContent(null);
|
|
|
+ contentList.add(sdExecutionTaskContent);
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
|
|
|
+ log.info("videoClusterExecutionTaskCreateHandler category = {} sdExecutionTask create result={}", mergeCate2, createResult);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("videoClusterExecutionTaskCreateHandler error: {}", mergeCate2, e);
|
|
|
+ XxlJobLogger.log("videoClusterExecutionTaskCreateHandler error: {} \n", mergeCate2, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
}
|