Selaa lähdekoodia

feat:添加分品类聚类任务

zhaohaipeng 3 viikkoa sitten
vanhempi
commit
67327eafcc

+ 28 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/task/XxlJobParamDto.java

@@ -0,0 +1,28 @@
+package com.tzld.piaoquan.sde.model.dto.task;
+
+import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
+
+@Data
+public class XxlJobParamDto {
+
+    private String table;
+
+    private String dt;
+
+    private String contentScope;
+
+    public String getTableOrDefault(String detaultValue){
+        if (StringUtils.isBlank(table)){
+            return detaultValue;
+        }
+        return table;
+    }
+
+    public String getContentScopeOrDefault(String detaultValue){
+        if (StringUtils.isBlank(contentScope)){
+            return detaultValue;
+        }
+        return contentScope;
+    }
+}

+ 1 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ExecutionTaskService.java

@@ -57,4 +57,5 @@ public interface ExecutionTaskService {
 
     void batchCreateDeconstructTask(CommonRequest<JSONObject> request);
 
+    void videoClusterExecutionTaskCreateHandler(String params);
 }

+ 7 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ContentProfileServiceImpl.java

@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.tzld.piaoquan.sde.common.enums.ContentProfileStageEnum;
 import com.tzld.piaoquan.sde.common.enums.ContentTypeEnum;
 import com.tzld.piaoquan.sde.common.enums.IsDeleteEnum;
+import com.tzld.piaoquan.sde.common.enums.TaskTypeEnum;
 import com.tzld.piaoquan.sde.mapper.ContentProfileMapper;
 import com.tzld.piaoquan.sde.mapper.SdExecutionTaskContentMapper;
 import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
@@ -63,6 +64,12 @@ public class ContentProfileServiceImpl implements ContentProfileService {
             if (Objects.isNull(sdExecutionTask) || Objects.isNull(queryResponseDataDTO)) {
                 return;
             }
+
+            // 聚类结果不保存
+            if (TaskTypeEnum.CLUSTER.getValue().equals(sdExecutionTask.getTaskType())) {
+                return;
+            }
+
             SdExecutionTaskContent content = this.findByExecutionTaskId(sdExecutionTask.getId());
             if (Objects.isNull(content)) {
                 return;

+ 114 - 11
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskServiceImpl.java

@@ -1,7 +1,6 @@
 package com.tzld.piaoquan.sde.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.TypeReference;
 import com.aliyun.odps.data.Record;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
@@ -16,6 +15,7 @@ import com.tzld.piaoquan.sde.model.dto.ContentInputParamsDTO;
 import com.tzld.piaoquan.sde.model.dto.SdExecutionTaskPropertiesDTO;
 import com.tzld.piaoquan.sde.model.dto.cluster.ManualClusterExecutionConfigDTO;
 import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
+import com.tzld.piaoquan.sde.model.dto.task.XxlJobParamDto;
 import com.tzld.piaoquan.sde.model.entity.ContentProfile;
 import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
 import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
@@ -40,6 +40,7 @@ import org.springframework.util.CollectionUtils;
 
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -72,6 +73,9 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
     @Value("${yesterday.return.video.table:loghubods.lastday_return}")
     private String yesterdayReturnVideoTable;
 
+    @Value("${merge.cate2.cluster.video.cnt.min.limit:20}")
+    private Integer mergeCate2ClusterVideoCntMinLimit;
+
 //    @Value("${video.tag.table:loghubods.video_merge_tag}")
 //    private String videoTagTable;
 
@@ -373,20 +377,14 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
 
     @Override
     public void videoExecutionTaskCreateHandler(String params) {
-        Map<String, String> paramJson = new HashMap<>();
-        try {
-            paramJson = JSONObject.parseObject(params, new TypeReference<Map<String, String>>() {
-            });
-        } catch (Exception e) {
-            log.error("parse xxl job param error params: {}", params, e);
-        }
+        XxlJobParamDto paramJson = JSONObject.parseObject(params, XxlJobParamDto.class);
 
 
         long start = System.nanoTime();
         log.info("videoExecutionTaskCreateHandler start: {}", JSONObject.toJSONString(paramJson));
 
-        String table = paramJson.getOrDefault("table", yesterdayReturnVideoTable);
-        String dt = paramJson.get("dt");
+        String table = paramJson.getTableOrDefault(yesterdayReturnVideoTable);
+        String dt = paramJson.getDt();
         if (StringUtils.isBlank(dt)) {
             LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
             dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
@@ -398,7 +396,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
             return;
         }
 
-        String contentScope = paramJson.getOrDefault("contentScope", YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
+        String contentScope = paramJson.getContentScopeOrDefault(YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
 
         XxlJobLogger.log("videoExecutionTaskCreateHandler records size={}", records.size());
         List<Record> findRecords = new ArrayList<>();
@@ -578,4 +576,109 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
         }
     }
 
+    @Override
+    public void videoClusterExecutionTaskCreateHandler(String params) {
+        log.info("videoClusterExecutionTaskCreateHandler params: {}", params);
+
+        XxlJobParamDto xxlJobParamDto = JSONObject.parseObject(params, XxlJobParamDto.class);
+        String table = xxlJobParamDto.getTable();
+        if (StringUtils.isBlank(table)) {
+            XxlJobLogger.log("table must not null");
+            return;
+        }
+        String contentScope = xxlJobParamDto.getContentScopeOrDefault(MANUAL_SELECT_VIDEO_SCOPE);
+        String dt = xxlJobParamDto.getDt();
+        if (StringUtils.isBlank(dt)) {
+            LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
+            dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
+        }
+
+        String sql = String.format("select * from %s where dt = %s", table, dt);
+        log.info("query sql: {}", sql);
+        XxlJobLogger.log("query sql: {}", sql);
+        List<Record> records = odpsManager.query(sql);
+        if (CollectionUtils.isEmpty(records)) {
+            log.info("records is empty");
+            XxlJobLogger.log("records is empty");
+            return;
+        }
+
+        Map<String, ManualClusterExecutionConfigDTO> mergeCate2AndConfigMap = new HashMap<>();
+        for (Record record : records) {
+            String mergeCate2 = record.getString("merge_leve2");
+            if (StringUtils.isBlank(mergeCate2)) {
+                continue;
+            }
+            if (!mergeCate2AndConfigMap.containsKey(mergeCate2)) {
+                ManualClusterExecutionConfigDTO manualClusterExecutionConfigDTO = new ManualClusterExecutionConfigDTO();
+                manualClusterExecutionConfigDTO.setContentScope(contentScope);
+                manualClusterExecutionConfigDTO.setCategory(mergeCate2);
+                manualClusterExecutionConfigDTO.setContents(new ArrayList<>());
+
+                String patternName = String.format("%s_%s", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")), mergeCate2);
+                manualClusterExecutionConfigDTO.setPatternName(patternName);
+
+                mergeCate2AndConfigMap.put(mergeCate2, manualClusterExecutionConfigDTO);
+            }
+
+            String contentId = record.getString("videoid");
+            Double weightScore = Double.parseDouble(record.getString("vov"));
+            ManualClusterExecutionConfigDTO.ContentDTO content = new ManualClusterExecutionConfigDTO.ContentDTO();
+            content.setContentId(contentId);
+            content.setWeightScore(weightScore);
+
+            mergeCate2AndConfigMap.get(mergeCate2).getContents().add(content);
+        }
+
+        for (Map.Entry<String, ManualClusterExecutionConfigDTO> entry : mergeCate2AndConfigMap.entrySet()) {
+            String mergeCate2 = entry.getKey();
+            ManualClusterExecutionConfigDTO configDTO = entry.getValue();
+            try {
+                if (CollectionUtils.isEmpty(configDTO.getContents()) || configDTO.getContents().size() < mergeCate2ClusterVideoCntMinLimit) {
+                    log.error("品类 {} 视频数量 {} 不够 {}, 跳过创建聚类任务", mergeCate2, configDTO.getContents().size(), mergeCate2ClusterVideoCntMinLimit);
+                    XxlJobLogger.log("品类 {} 视频数量 {} 不够 {}, 跳过创建聚类任务", mergeCate2, configDTO.getContents().size(), mergeCate2ClusterVideoCntMinLimit);
+                    continue;
+                }
+
+                SdExecutionTask sdExecutionTask = new SdExecutionTask();
+                sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
+                sdExecutionTask.setTaskType(TaskTypeEnum.CLUSTER.getValue());
+                sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
+                sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
+
+                // 属性设置
+                SdExecutionTaskPropertiesDTO propertiesDTO = new SdExecutionTaskPropertiesDTO();
+                propertiesDTO.setContentScope(contentScope);
+                propertiesDTO.setPatternName(configDTO.getPatternName());
+                propertiesDTO.setCategory(configDTO.getCategory());
+                sdExecutionTask.setProperties(JSONObject.toJSONString(propertiesDTO));
+
+                // 关联内容:多条
+                List<ManualClusterExecutionConfigDTO.ContentDTO> contentDTOS = configDTO.getContents();
+                List<SdExecutionTaskContent> contentList = new ArrayList<>(contentDTOS.size());
+                for (ManualClusterExecutionConfigDTO.ContentDTO contentDTO : contentDTOS) {
+                    if (Objects.isNull(contentDTO) || Objects.isNull(contentDTO.getContentId())) {
+                        continue;
+                    }
+                    String contentId = contentDTO.getContentId();
+                    SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
+                    sdExecutionTaskContent.setContentType(ContentTypeEnum.VIDEO.getValue());
+                    sdExecutionTaskContent.setContentId(contentId);
+                    ContentInputParamsDTO contentInputParamsDTO = new ContentInputParamsDTO();
+                    contentInputParamsDTO.setWeightScore(contentDTO.getWeightScore());
+                    sdExecutionTaskContent.setInputParams(JSONObject.toJSONString(contentInputParamsDTO));
+                    sdExecutionTaskContent.setContent(null);
+                    contentList.add(sdExecutionTaskContent);
+                }
+
+                boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
+                log.info("videoClusterExecutionTaskCreateHandler category = {} sdExecutionTask create result={}", mergeCate2, createResult);
+            } catch (Exception e) {
+                log.error("videoClusterExecutionTaskCreateHandler error: {}", mergeCate2, e);
+                XxlJobLogger.log("videoClusterExecutionTaskCreateHandler error: {} \n", mergeCate2, e);
+            }
+        }
+
+    }
+
 }

+ 15 - 0
supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/ExecutionTaskJob.java

@@ -98,6 +98,21 @@ public class ExecutionTaskJob {
         return ReturnT.SUCCESS;
     }
 
+    @XxlJob("videoClusterExecutionTaskCreateHandler")
+    public ReturnT<String> videoClusterExecutionTaskCreateHandler(String params) {
+        XxlJobLogger.log("videoClusterExecutionTaskCreateHandler start");
+        try {
+            executionTaskService.videoClusterExecutionTaskCreateHandler(params);
+        } catch (Exception e) {
+            log.error("videoClusterExecutionTaskCreateHandler error", e);
+            XxlJobLogger.log("videoClusterExecutionTaskCreateHandler error", e);
+            return ReturnT.FAIL;
+        } finally {
+            XxlJobLogger.log("videoClusterExecutionTaskCreateHandler end");
+        }
+        return ReturnT.SUCCESS;
+    }
+
     /**
      * 手动创建聚类任务
      *