Browse Source

feat:添加解构创作任务

zhaohaipeng 3 weeks ago
parent
commit
553677f040
14 changed files with 325 additions and 108 deletions
  1. 0 34
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/ContentProfileStageEnum.java
  2. 34 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/TaskStageEnum.java
  3. 4 4
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/deconstruction/SceneTypeEnum.java
  4. 20 6
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/ContentDeconstructionClusterClient.java
  5. 14 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/deconstruction/DeconstructionODPSRecordInfo.java
  6. 0 14
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/task/XxlJobParamDto.java
  7. 8 1
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/ContentProfile.java
  8. 7 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdExecutionTask.java
  9. 7 2
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ContentProfileService.java
  10. 2 1
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ExecutionTaskService.java
  11. 40 7
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ContentProfileServiceImpl.java
  12. 158 38
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskServiceImpl.java
  13. 30 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/util/CoverUtil.java
  14. 1 1
      supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/ExecutionTaskJob.java

+ 0 - 34
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/ContentProfileStageEnum.java

@@ -1,34 +0,0 @@
-package com.tzld.piaoquan.sde.common.enums;
-
-import lombok.Getter;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-@Getter
-public enum ContentProfileStageEnum {
-
-    DECONSTRUCTION_SELECT_TOPIC(1, "解构-选题"),
-    DECONSTRUCTION_CREATION(2, "解构-创作"),
-    DECONSTRUCTION_MAKE(3, "解构-制作"),
-    ;
-
-    private final int value;
-
-    private final String desc;
-
-    ContentProfileStageEnum(int value, String desc) {
-        this.value = value;
-        this.desc = desc;
-    }
-
-    private static final Map<Integer, ContentProfileStageEnum> VALUE_MAP = Arrays.stream(ContentProfileStageEnum.values())
-            .collect(Collectors.toMap(ContentProfileStageEnum::getValue, Function.identity()));
-
-    public static ContentProfileStageEnum getByValue(int value) {
-        return VALUE_MAP.get(value);
-    }
-
-}

+ 34 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/TaskStageEnum.java

@@ -0,0 +1,34 @@
+package com.tzld.piaoquan.sde.common.enums;
+
+import lombok.Getter;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+@Getter
+public enum TaskStageEnum {
+
+    SELECT_TOPIC(1, "解构-选题"),
+    CONTENT_CREATION(2, "解构-创作"),
+    PRODUCTION(3, "解构-制作"),
+    ;
+
+    private final int value;
+
+    private final String desc;
+
+    TaskStageEnum(int value, String desc) {
+        this.value = value;
+        this.desc = desc;
+    }
+
+    private static final Map<Integer, TaskStageEnum> VALUE_MAP = Arrays.stream(TaskStageEnum.values())
+            .collect(Collectors.toMap(TaskStageEnum::getValue, Function.identity()));
+
+    public static TaskStageEnum getByValue(int value) {
+        return VALUE_MAP.get(value);
+    }
+
+}

+ 4 - 4
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/deconstruction/SenceTypeEnum.java → supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/deconstruction/SceneTypeEnum.java

@@ -8,7 +8,7 @@ import lombok.Getter;
  * @author supeng
  */
 @Getter
-public enum SenceTypeEnum {
+public enum SceneTypeEnum {
     TOPIC_SELECTION(0, "选题"),
     CONTENT_CREATION(1, "创作"),
     PRODUCTION(2, "制作");
@@ -16,13 +16,13 @@ public enum SenceTypeEnum {
     private final Integer value;
     private final String desc;
 
-    SenceTypeEnum(Integer value, String desc) {
+    SceneTypeEnum(Integer value, String desc) {
         this.value = value;
         this.desc = desc;
     }
 
-    public static SenceTypeEnum getInstance(Integer value) {
-        for (SenceTypeEnum senceTypeEnum : SenceTypeEnum.values()) {
+    public static SceneTypeEnum getInstance(Integer value) {
+        for (SceneTypeEnum senceTypeEnum : SceneTypeEnum.values()) {
             if (senceTypeEnum.getValue().equals(value)) {
                 return senceTypeEnum;
             }

+ 20 - 6
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/ContentDeconstructionClusterClient.java

@@ -6,7 +6,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.tzld.piaoquan.sde.common.enums.ContentTypeEnum;
 import com.tzld.piaoquan.sde.common.enums.ExceptionEnum;
 import com.tzld.piaoquan.sde.common.enums.IsDeleteEnum;
-import com.tzld.piaoquan.sde.common.enums.deconstruction.SenceTypeEnum;
+import com.tzld.piaoquan.sde.common.enums.TaskStageEnum;
 import com.tzld.piaoquan.sde.common.exception.BizException;
 import com.tzld.piaoquan.sde.common.exception.HttpServiceException;
 import com.tzld.piaoquan.sde.mapper.SdExecutionTaskContentMapper;
@@ -22,6 +22,7 @@ import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
 import com.tzld.piaoquan.sde.model.request.ClusterTaskSubmitParam;
 import com.tzld.piaoquan.sde.model.request.DeconstructionTaskSubmitParam;
 import com.tzld.piaoquan.sde.model.request.VideoInfoGetParam;
+import com.tzld.piaoquan.sde.util.CoverUtil;
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.*;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -30,7 +31,6 @@ import org.springframework.stereotype.Service;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
@@ -61,8 +61,8 @@ public class ContentDeconstructionClusterClient {
     @Value("${longvideoapi.videoinfo.get.url:http://longvideoapi-internal.piaoquantv.com/longvideoapi/openapi/video/getVideoInfo}")
     private String longVideoApiVideoInfoGetUrl;
 
-    private static final OkHttpClient client = new OkHttpClient().newBuilder().connectTimeout(60, TimeUnit.SECONDS)
-            .readTimeout(60, TimeUnit.SECONDS).writeTimeout(60, TimeUnit.SECONDS).build();
+    private static final OkHttpClient client = new OkHttpClient().newBuilder().connectTimeout(5, TimeUnit.MINUTES)
+            .readTimeout(5, TimeUnit.MINUTES).writeTimeout(5, TimeUnit.MINUTES).build();
 
     public ContentDeconstructionClusterClient(SdExecutionTaskContentMapper sdExecutionTaskContentMapper) {
         this.sdExecutionTaskContentMapper = sdExecutionTaskContentMapper;
@@ -79,8 +79,15 @@ public class ContentDeconstructionClusterClient {
                 || Objects.isNull(sdExecutionTask.getContentType())) {
             throw new IllegalArgumentException("sdExecutionTask is null");
         }
+
+        TaskStageEnum taskStage = TaskStageEnum.getByValue(sdExecutionTask.getContentType());
+        // 历史数据没有taskStage字段,默认都为选题
+        if (Objects.isNull(taskStage)){
+            taskStage = TaskStageEnum.SELECT_TOPIC;
+        }
+
         DeconstructionTaskSubmitParam param = new DeconstructionTaskSubmitParam();
-        param.setScene(SenceTypeEnum.TOPIC_SELECTION.getValue());
+        param.setScene(CoverUtil.getSceneTypeByStage(taskStage).getValue());
         param.setContent_type(sdExecutionTask.getContentType());
         DeconstructionTaskSubmitParam.Content content = new DeconstructionTaskSubmitParam.Content();
         Long executionTaskId = sdExecutionTask.getId();
@@ -217,9 +224,16 @@ public class ContentDeconstructionClusterClient {
         if (Objects.isNull(sdExecutionTask.getProperties()) || sdExecutionTask.getProperties().isEmpty()) {
             throw new BizException(ExceptionEnum.DATA_ERROR, "sdExecutionTask properties is null");
         }
+
+        TaskStageEnum taskStage = TaskStageEnum.getByValue(sdExecutionTask.getContentType());
+        // 历史数据没有taskStage字段,默认都为选题
+        if (Objects.isNull(taskStage)){
+            taskStage = TaskStageEnum.SELECT_TOPIC;
+        }
+
         SdExecutionTaskPropertiesDTO sdExecutionTaskPropertiesDTO = JSON.parseObject(sdExecutionTask.getProperties(), SdExecutionTaskPropertiesDTO.class);
         ClusterTaskSubmitParam param = new ClusterTaskSubmitParam();
-        param.setScene(SenceTypeEnum.TOPIC_SELECTION.getValue());
+        param.setScene(CoverUtil.getSceneTypeByStage(taskStage).getValue());
         param.setContent_type(sdExecutionTask.getContentType());
         param.setPattern_name(sdExecutionTaskPropertiesDTO.getPatternName());
         Long executionTaskId = sdExecutionTask.getId();

+ 14 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/deconstruction/DeconstructionODPSRecordInfo.java

@@ -0,0 +1,14 @@
+package com.tzld.piaoquan.sde.model.dto.deconstruction;
+
+import lombok.Data;
+
+@Data
+public class DeconstructionODPSRecordInfo {
+
+    private String channelContentId;
+
+    private String mergeCate2;
+
+    private Double vov;
+
+}

+ 0 - 14
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/task/XxlJobParamDto.java

@@ -1,7 +1,6 @@
 package com.tzld.piaoquan.sde.model.dto.task;
 
 import lombok.Data;
-import org.apache.commons.lang3.StringUtils;
 
 @Data
 public class XxlJobParamDto {
@@ -12,17 +11,4 @@ public class XxlJobParamDto {
 
     private String contentScope;
 
-    public String getTableOrDefault(String detaultValue){
-        if (StringUtils.isBlank(table)){
-            return detaultValue;
-        }
-        return table;
-    }
-
-    public String getContentScopeOrDefault(String detaultValue){
-        if (StringUtils.isBlank(contentScope)){
-            return detaultValue;
-        }
-        return contentScope;
-    }
 }

+ 8 - 1
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/ContentProfile.java

@@ -3,6 +3,7 @@ package com.tzld.piaoquan.sde.model.entity;
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
+import com.tzld.piaoquan.sde.common.enums.TaskStageEnum;
 import lombok.Data;
 
 import java.util.Date;
@@ -40,7 +41,7 @@ public class ContentProfile {
     /**
      * 步骤/阶段:1 解构-选题 2 解构-创作 3 解构-制作
      *
-     * @see com.tzld.piaoquan.sde.common.enums.ContentProfileStageEnum
+     * @see TaskStageEnum
      */
     private Integer stage;
 
@@ -59,6 +60,12 @@ public class ContentProfile {
      */
     private String version;
 
+    /**
+     * 状态
+     * @see com.tzld.piaoquan.sde.common.enums.ExecutionTaskStatusEnum
+     */
+    private Integer status;
+
     /**
      * 记录创建时间,由数据库自动生成
      */

+ 7 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdExecutionTask.java

@@ -32,6 +32,7 @@ public class SdExecutionTask {
     private Integer contentType;
     /**
      * 属性配置等:JSON
+     *
      * @see com.tzld.piaoquan.sde.model.dto.SdExecutionTaskPropertiesDTO
      */
     private String properties;
@@ -41,6 +42,12 @@ public class SdExecutionTask {
      * @see com.tzld.piaoquan.sde.common.enums.ExecutionTaskTypeEnum
      */
     private Integer taskType;
+    /**
+     * 任务执行步骤
+     *
+     * @see com.tzld.piaoquan.sde.common.enums.TaskStageEnum
+     */
+    private Integer taskStage;
     /**
      * 执行状态:0 INIT, 1 SUBMITTED(已提交), 2 RUNNING(运行中), 3 SUCCESS, 4 FAILED 5 TIMEOUT
      *

+ 7 - 2
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ContentProfileService.java

@@ -1,6 +1,7 @@
 package com.tzld.piaoquan.sde.service;
 
-import com.tzld.piaoquan.sde.common.enums.ContentProfileStageEnum;
+import com.tzld.piaoquan.sde.common.enums.ExecutionTaskStatusEnum;
+import com.tzld.piaoquan.sde.common.enums.TaskStageEnum;
 import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
 import com.tzld.piaoquan.sde.model.entity.ContentProfile;
 import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
@@ -14,9 +15,13 @@ public interface ContentProfileService {
      * @param stageEnum 步骤
      * @return 内容库数据库
      */
-    ContentProfile findContentProfileByIdAndStage(String contentId, ContentProfileStageEnum stageEnum);
+    ContentProfile findContentProfileByIdAndStage(String contentId, TaskStageEnum stageEnum);
 
     void insertOrUpdate(ContentProfile contentProfile);
 
     void deconstructSelectTopicResultSync(SdExecutionTask sdExecutionTask, QueryResponseDataDTO queryResponseDataDTO);
+
+    void updateContentStageStatus(String channelContentId, TaskStageEnum taskStage, ExecutionTaskStatusEnum taskStatus);
+
+    void updateStatusByExecutionTask(SdExecutionTask sdExecutionTask, ExecutionTaskStatusEnum taskStatus);
 }

+ 2 - 1
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ExecutionTaskService.java

@@ -48,8 +48,9 @@ public interface ExecutionTaskService {
      */
     void yesterdayTopReturnVideoExecutionTaskCreateHandler();
 
-    void videoExecutionTaskCreateHandler(String params);
+    void videoDeconstructionSelectTopicExecutionTaskCreateHandler(String params);
 
+    void videoDeconstructionContentCreationExecutionTaskCreateHandler(String params);
     /**
      * 聚类
      */

+ 40 - 7
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ContentProfileServiceImpl.java

@@ -1,10 +1,7 @@
 package com.tzld.piaoquan.sde.service.impl;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.tzld.piaoquan.sde.common.enums.ContentProfileStageEnum;
-import com.tzld.piaoquan.sde.common.enums.ContentTypeEnum;
-import com.tzld.piaoquan.sde.common.enums.IsDeleteEnum;
-import com.tzld.piaoquan.sde.common.enums.TaskTypeEnum;
+import com.tzld.piaoquan.sde.common.enums.*;
 import com.tzld.piaoquan.sde.mapper.ContentProfileMapper;
 import com.tzld.piaoquan.sde.mapper.SdExecutionTaskContentMapper;
 import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
@@ -29,7 +26,7 @@ public class ContentProfileServiceImpl implements ContentProfileService {
     private SdExecutionTaskContentMapper sdExecutionTaskContentMapper;
 
     @Override
-    public ContentProfile findContentProfileByIdAndStage(String contentId, ContentProfileStageEnum stageEnum) {
+    public ContentProfile findContentProfileByIdAndStage(String contentId, TaskStageEnum stageEnum) {
         if (StringUtils.isEmpty(contentId) || Objects.isNull(stageEnum)) {
             throw new RuntimeException("contentId and stageEnum must not null");
         }
@@ -46,7 +43,7 @@ public class ContentProfileServiceImpl implements ContentProfileService {
         if (StringUtils.isEmpty(contentProfile.getContentId()) || Objects.isNull(contentProfile.getStage())) {
             throw new RuntimeException("contentId and stageEnum must not null");
         }
-        ContentProfile profile = this.findContentProfileByIdAndStage(contentProfile.getContentId(), ContentProfileStageEnum.getByValue(contentProfile.getStage()));
+        ContentProfile profile = this.findContentProfileByIdAndStage(contentProfile.getContentId(), TaskStageEnum.getByValue(contentProfile.getStage()));
         if (Objects.isNull(profile)) {
             contentProfile.setIsDeleted(IsDeleteEnum.NORMAL.getValue());
             contentProfileMapper.insert(contentProfile);
@@ -78,7 +75,7 @@ public class ContentProfileServiceImpl implements ContentProfileService {
             ContentProfile contentProfile = new ContentProfile();
             contentProfile.setContentId(content.getContentId());
             contentProfile.setContentType(content.getContentType());
-            contentProfile.setStage(ContentProfileStageEnum.DECONSTRUCTION_SELECT_TOPIC.getValue());
+            contentProfile.setStage(TaskStageEnum.SELECT_TOPIC.getValue());
             contentProfile.setRawResult(queryResponseDataDTO.getResult());
             contentProfile.setRawUrl(queryResponseDataDTO.getUrl());
 
@@ -88,6 +85,42 @@ public class ContentProfileServiceImpl implements ContentProfileService {
         }
     }
 
+    @Override
+    public void updateContentStageStatus(String channelContentId, TaskStageEnum taskStage, ExecutionTaskStatusEnum taskStatus) {
+        if (StringUtils.isEmpty(channelContentId) || Objects.isNull(taskStage) || Objects.isNull(taskStatus)) {
+            return;
+        }
+        ContentProfile contentProfile = this.findContentProfileByIdAndStage(channelContentId, taskStage);
+        if (Objects.isNull(contentProfile)) {
+            return;
+        }
+        contentProfile.setStatus(taskStatus.getValue());
+        contentProfileMapper.updateById(contentProfile);
+    }
+
+    @Override
+    public void updateStatusByExecutionTask(SdExecutionTask sdExecutionTask, ExecutionTaskStatusEnum taskStatus) {
+        try {
+            if (Objects.isNull(sdExecutionTask) || Objects.isNull(taskStatus)) {
+                return;
+            }
+            // 聚类任务不处理
+            if (TaskTypeEnum.CLUSTER.getValue().equals(sdExecutionTask.getTaskType())) {
+                return;
+            }
+            SdExecutionTaskContent executionTaskContent = this.findByExecutionTaskId(sdExecutionTask.getId());
+            if (Objects.isNull(executionTaskContent)) {
+                return;
+            }
+
+            TaskStageEnum taskStage = TaskStageEnum.getByValue(sdExecutionTask.getTaskStage());
+
+            this.updateContentStageStatus(executionTaskContent.getContentId(), taskStage, taskStatus);
+        } catch (Exception e) {
+            log.error("updateStatusByExecutionTask error {} status {}", sdExecutionTask.getId(), taskStatus);
+        }
+    }
+
     private SdExecutionTaskContent findByExecutionTaskId(Long executionTaskId) {
         if (Objects.isNull(executionTaskId)) {
             return null;

+ 158 - 38
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskServiceImpl.java

@@ -5,6 +5,7 @@ import com.aliyun.odps.data.Record;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.google.common.collect.Sets;
 import com.tzld.piaoquan.sde.common.api.CommonRequest;
 import com.tzld.piaoquan.sde.common.enums.*;
 import com.tzld.piaoquan.sde.common.enums.deconstruction.DeconstructionTaskStatusEnum;
@@ -14,6 +15,7 @@ import com.tzld.piaoquan.sde.mapper.*;
 import com.tzld.piaoquan.sde.model.dto.ContentInputParamsDTO;
 import com.tzld.piaoquan.sde.model.dto.SdExecutionTaskPropertiesDTO;
 import com.tzld.piaoquan.sde.model.dto.cluster.ManualClusterExecutionConfigDTO;
+import com.tzld.piaoquan.sde.model.dto.deconstruction.DeconstructionODPSRecordInfo;
 import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
 import com.tzld.piaoquan.sde.model.dto.task.XxlJobParamDto;
 import com.tzld.piaoquan.sde.model.entity.ContentProfile;
@@ -26,6 +28,7 @@ import com.tzld.piaoquan.sde.model.vo.SdExecutionTaskVO;
 import com.tzld.piaoquan.sde.service.ContentProfileService;
 import com.tzld.piaoquan.sde.service.ExecutionTaskCreateService;
 import com.tzld.piaoquan.sde.service.ExecutionTaskService;
+import com.tzld.piaoquan.sde.util.CoverUtil;
 import com.tzld.piaoquan.sde.util.DateUtil;
 import com.tzld.piaoquan.sde.util.IdGeneratorUtil;
 import com.tzld.piaoquan.sde.util.OdpsManager;
@@ -66,6 +69,8 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
     private SdExecutionTaskContentMapper sdExecutionTaskContentMapper;
     @Autowired
     private ContentProfileService contentProfileService;
+    @Autowired
+    private ContentProfileMapper contentProfileMapper;
 
     private static final String YESTERDAY_RETURN_TOP10_VIDEO_SCOPE = "yesterday_return_top10_video_scope";
     private static final String MANUAL_SELECT_VIDEO_SCOPE = "manual_select_video_scope";
@@ -192,6 +197,9 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
                     update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
                     int rows = sdExecutionTaskMapper.updateById(update);
                     log.info("executionTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
+
+                    // 同步更新内容库状态
+                    contentProfileService.updateStatusByExecutionTask(sdExecutionTask, ExecutionTaskStatusEnum.FAILED);
                     continue;
                 }
                 SdExecutionTask update = new SdExecutionTask();
@@ -211,6 +219,9 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
                 update.setErrorMsg("任务提交失败:" + e.getMessage());
                 update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
                 int rows = sdExecutionTaskMapper.updateById(update);
+
+                // 同步更新内容库状态
+                contentProfileService.updateStatusByExecutionTask(sdExecutionTask, ExecutionTaskStatusEnum.FAILED);
                 log.info("executionTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
             }
         }
@@ -288,6 +299,12 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
 
                     contentProfileService.deconstructSelectTopicResultSync(sdExecutionTask, queryResponseDataDTO);
                 }
+
+                // 任务执行完成之后,更新内容库状态
+                if (Sets.newHashSet(ExecutionTaskStatusEnum.SUCCESS, ExecutionTaskStatusEnum.FAILED).contains(executionTaskStatusEnum)) {
+                    contentProfileService.updateStatusByExecutionTask(sdExecutionTask, executionTaskStatusEnum);
+                }
+
             } catch (Exception e) {
                 log.error("executionTask sync error {}", sdExecutionTask, e);
                 if (e instanceof HttpServiceException) {
@@ -298,6 +315,9 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
                 update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
                 update.setErrorMsg(e.getMessage());
                 int rows = sdExecutionTaskMapper.updateById(update);
+
+                contentProfileService.updateStatusByExecutionTask(sdExecutionTask, ExecutionTaskStatusEnum.FAILED);
+
                 log.error("executionTask failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
             }
         }
@@ -376,62 +396,163 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
     }
 
     @Override
-    public void videoExecutionTaskCreateHandler(String params) {
-        XxlJobParamDto paramJson = JSONObject.parseObject(params, XxlJobParamDto.class);
+    public void videoDeconstructionSelectTopicExecutionTaskCreateHandler(String params) {
+        log.info("videoExecutionTaskCreateHandler start: {}", params);
+        long start = System.nanoTime();
 
 
-        long start = System.nanoTime();
-        log.info("videoExecutionTaskCreateHandler start: {}", JSONObject.toJSONString(paramJson));
+        XxlJobParamDto paramJson = JSONObject.parseObject(params, XxlJobParamDto.class);
 
-        String table = paramJson.getTableOrDefault(yesterdayReturnVideoTable);
+        String table = paramJson.getTable();
         String dt = paramJson.getDt();
+        String contentScope = paramJson.getContentScope();
+        if (StringUtils.isAnyBlank(table, contentScope)) {
+            log.error("table and contentScopre must not null");
+            XxlJobLogger.log("table and contentScopre must not null");
+            return;
+        }
         if (StringUtils.isBlank(dt)) {
-            LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
-            dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
+            dt = DateUtil.formatLocalDateTime(LocalDateTime.now().minusDays(1), "yyyyMMdd");
         }
-        String sql = "select * from " + table + " where dt='" + dt + "' ORDER BY sort_field DESC;";
+
+        String sql = String.format("select * from %s where dt='%s' ORDER BY sort_field DESC;", table, dt);
         List<Record> records = odpsManager.query(sql);
-        if (Objects.isNull(records) || records.isEmpty()) {
+        if (CollectionUtils.isEmpty(records)) {
             log.info("videoExecutionTaskCreateHandler records is empty");
+            XxlJobLogger.log("videoExecutionTaskCreateHandler records is empty");
             return;
         }
 
-        String contentScope = paramJson.getContentScopeOrDefault(YESTERDAY_RETURN_TOP10_VIDEO_SCOPE);
+        List<DeconstructionODPSRecordInfo> recordInfos = records.stream()
+                .filter(Objects::nonNull)
+                .map(CoverUtil::deconstructionODPSCoverRecordInfo)
+                .collect(Collectors.toList());
 
         XxlJobLogger.log("videoExecutionTaskCreateHandler records size={}", records.size());
-        List<Record> findRecords = new ArrayList<>();
-        for (Record record : records) {
-            if (Objects.isNull(record)) {
-                continue;
-            }
-            String videoId = record.getString("videoid");
-            // int count = sdExecutionTaskContentMapper.countByContentId(ContentTypeEnum.VIDEO.getValue(), videoId);
-            // if (count > 0) {
-            //     continue;
-            // }
 
-            ContentProfile contentProfile = contentProfileService.findContentProfileByIdAndStage(videoId, ContentProfileStageEnum.DECONSTRUCTION_SELECT_TOPIC);
+        int count = this.deconstructionTaskCreateHandler(recordInfos, contentScope, TaskStageEnum.SELECT_TOPIC);
+
+        long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("videoExecutionTaskCreateHandler recordSize = {} count= {} finish cost = {}ms", records.size(), count, costMs);
+
+    }
+
+    @Override
+    public void videoDeconstructionContentCreationExecutionTaskCreateHandler(String params) {
+        log.info("videoDeconstructionContentCreationExecutionTaskCreateHandler start: {}", params);
+        long start = System.nanoTime();
+
+        XxlJobParamDto paramJson = JSONObject.parseObject(params, XxlJobParamDto.class);
+        String table = paramJson.getTable();
+        String dt = paramJson.getDt();
+        String contentScope = paramJson.getContentScope();
+        if (StringUtils.isAnyBlank(table, contentScope)) {
+            log.error("table and contentScopre must not null");
+            XxlJobLogger.log("table and contentScopre must not null");
+            return;
+        }
+        if (StringUtils.isBlank(dt)) {
+            dt = DateUtil.formatLocalDateTime(LocalDateTime.now().minusDays(1), "yyyyMMdd");
+        }
+
+        String sql = String.format("select * from %s where dt='%s' ORDER BY sort_field DESC;", table, dt);
+        List<Record> records = odpsManager.query(sql);
+        if (CollectionUtils.isEmpty(records)) {
+            log.info("videoDeconstructionContentCreationExecutionTaskCreateHandler records is empty");
+            XxlJobLogger.log("videoDeconstructionContentCreationExecutionTaskCreateHandler records is empty");
+            return;
+        }
+
+        List<DeconstructionODPSRecordInfo> recordInfos = records.stream()
+                .filter(Objects::nonNull)
+                .map(CoverUtil::deconstructionODPSCoverRecordInfo)
+                .collect(Collectors.toList());
+
+
+        // 判断前置的解构任务是否都已完成
+        List<String> videoIds = recordInfos.stream()
+                .map(DeconstructionODPSRecordInfo::getChannelContentId)
+                .collect(Collectors.toList());
+        LambdaQueryWrapper<ContentProfile> queryWrapper = new LambdaQueryWrapper<>();
+        queryWrapper.in(ContentProfile::getContentId, videoIds)
+                .eq(ContentProfile::getIsDeleted, IsDeleteEnum.NORMAL.getValue());
+        List<ContentProfile> contentProfiles = contentProfileMapper.selectList(queryWrapper);
+
+        // 存在未解构的视频
+        if (CollectionUtils.isEmpty(contentProfiles) || recordInfos.size() != contentProfiles.size()) {
+            XxlJobLogger.log("videoDeconstructionContentCreationExecutionTaskCreateHandler exist not deconstruction video");
+            return;
+        }
+
+        // 存在未解构完成视频
+        Set<Integer> taskCompleteStatus = Sets.newHashSet(ExecutionTaskStatusEnum.SUCCESS.getValue(), ExecutionTaskStatusEnum.FAILED.getValue());
+        long taskNotCompleteCnt = contentProfiles.stream()
+                .filter(i -> !taskCompleteStatus.contains(i.getStatus()))
+                .count();
+        if (taskNotCompleteCnt > 0) {
+            log.error("videoDeconstructionContentCreationExecutionTaskCreateHandler depend task exist not complete");
+            XxlJobLogger.log("videoDeconstructionContentCreationExecutionTaskCreateHandler depend task exist not complete");
+            return;
+        }
+
+        // 获取解构成功,vov top10的视频
+        Set<String> deconstructionSuccessContentIds = contentProfiles.stream()
+                .filter(i -> ExecutionTaskStatusEnum.SUBMITTED.getValue().equals(i.getStatus()))
+                .map(ContentProfile::getContentId)
+                .collect(Collectors.toSet());
+
+        if (CollectionUtils.isEmpty(deconstructionSuccessContentIds)) {
+            XxlJobLogger.log("videoDeconstructionContentCreationExecutionTaskCreateHandler video all not deconstruction success");
+            return;
+        }
+        log.info("videoDeconstructionContentCreationExecutionTaskCreateHandler deconstruction success video size {}", deconstructionSuccessContentIds.size());
+
+        List<DeconstructionODPSRecordInfo> top10RecordInfos = recordInfos.stream()
+                .filter(i -> deconstructionSuccessContentIds.contains(i.getChannelContentId()))
+                .sorted(Comparator.comparingDouble(DeconstructionODPSRecordInfo::getVov).reversed())
+                .limit(10)
+                .collect(Collectors.toList());
+        XxlJobLogger.log("videoDeconstructionContentCreationExecutionTaskCreateHandler records size={}", records.size());
+        int count = this.deconstructionTaskCreateHandler(top10RecordInfos, contentScope, TaskStageEnum.CONTENT_CREATION);
+
+        long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("videoDeconstructionContentCreationExecutionTaskCreateHandler recordSize = {} count= {} finish cost = {}ms", records.size(), count, costMs);
+
+    }
+
+    private int deconstructionTaskCreateHandler(List<DeconstructionODPSRecordInfo> recordInfos, String contentScope, TaskStageEnum taskStage) {
+        if (CollectionUtils.isEmpty(recordInfos) || Objects.isNull(taskStage)) {
+            log.error("deconstructionTaskCreateHandler recordInfos and taskStage must not null");
+            XxlJobLogger.log("deconstructionTaskCreateHandler recordInfos and taskStage must not null");
+            return 0;
+        }
+        // 全局过滤,已经创建过同stage的视频不再创建
+        List<DeconstructionODPSRecordInfo> needCreateTaskRecords = new ArrayList<>();
+        for (DeconstructionODPSRecordInfo record : recordInfos) {
+            String videoId = record.getChannelContentId();
+            ContentProfile contentProfile = contentProfileService.findContentProfileByIdAndStage(videoId, taskStage);
             if (Objects.nonNull(contentProfile)) {
                 continue;
             }
-
-            findRecords.add(record);
+            needCreateTaskRecords.add(record);
         }
-        if (findRecords.isEmpty()) {
-            XxlJobLogger.log("videoExecutionTaskCreateHandler findRecords is empty");
-            return;
+
+        if (CollectionUtils.isEmpty(needCreateTaskRecords)) {
+            log.info("deconstructionTaskCreateHandler needCreateTaskRecords is empty");
+            XxlJobLogger.log("deconstructionTaskCreateHandler needCreateTaskRecords is empty");
+            return 0;
         }
-        XxlJobLogger.log("videoExecutionTaskCreateHandler findRecords size={}", findRecords.size());
-        long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        log.info("videoExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
-        // 创建解构任务
+
+        XxlJobLogger.log("deconstructionTaskCreateHandler needCreateTaskRecords size={}", needCreateTaskRecords.size());
         int count = 0;
-        for (Record record : findRecords) {
+
+        for (DeconstructionODPSRecordInfo record : recordInfos) {
             try {
-                String videoId = record.getString("videoid");
+                String videoId = record.getChannelContentId();
                 SdExecutionTask sdExecutionTask = new SdExecutionTask();
                 sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
                 sdExecutionTask.setTaskType(TaskTypeEnum.DECONSTRUCT.getValue());
+                sdExecutionTask.setTaskStage(taskStage.getValue());
                 sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
                 sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
                 // 属性设置
@@ -446,23 +567,22 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
                 List<SdExecutionTaskContent> contentList = new ArrayList<>();
                 contentList.add(sdExecutionTaskContent);
                 boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
-                log.info("videoExecutionTaskCreateHandler sdExecutionTask create videoId = {} result={}", videoId, createResult);
+                log.info("deconstructionTaskCreateHandler sdExecutionTask create videoId = {} result={}", videoId, createResult);
 
                 // 内容库添加数据
                 ContentProfile contentProfile = new ContentProfile();
                 contentProfile.setContentId(videoId);
                 contentProfile.setContentType(ContentTypeEnum.VIDEO.getValue());
-                contentProfile.setStage(ContentProfileStageEnum.DECONSTRUCTION_SELECT_TOPIC.getValue());
+                contentProfile.setStage(taskStage.getValue());
                 contentProfileService.insertOrUpdate(contentProfile);
 
                 count++;
             } catch (Exception e) {
-                log.error("videoExecutionTaskCreateHandler error {}", record, e);
+                log.error("deconstructionTaskCreateHandler error {}", record, e);
             }
         }
-        long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-        log.info("videoExecutionTaskCreateHandler recordSize = {}  count = {} finish cost = {}ms", records.size(), count, costMs);
 
+        return count;
     }
 
     @Override
@@ -572,7 +692,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
             paramJson.put("table", "loghubods.supply_demand_task_video_collect_exp_top");
             paramJson.put("contentScope", contentScope);
             paramJson.put("dt", dt);
-            this.videoExecutionTaskCreateHandler(paramJson.toJSONString());
+            this.videoDeconstructionSelectTopicExecutionTaskCreateHandler(paramJson.toJSONString());
         }
     }
 

+ 30 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/util/CoverUtil.java

@@ -0,0 +1,30 @@
+package com.tzld.piaoquan.sde.util;
+
+import com.aliyun.odps.data.Record;
+import com.tzld.piaoquan.sde.common.enums.TaskStageEnum;
+import com.tzld.piaoquan.sde.common.enums.deconstruction.SceneTypeEnum;
+import com.tzld.piaoquan.sde.model.dto.deconstruction.DeconstructionODPSRecordInfo;
+
+public class CoverUtil {
+
+    public static SceneTypeEnum getSceneTypeByStage(TaskStageEnum taskStage) {
+        switch (taskStage) {
+            case SELECT_TOPIC:
+                return SceneTypeEnum.TOPIC_SELECTION;
+            case CONTENT_CREATION:
+                return SceneTypeEnum.CONTENT_CREATION;
+            case PRODUCTION:
+                return SceneTypeEnum.PRODUCTION;
+            default:
+                throw new RuntimeException("TaskStageEnum " + taskStage + "not exist");
+        }
+    }
+
+    public static DeconstructionODPSRecordInfo deconstructionODPSCoverRecordInfo(Record record) {
+        DeconstructionODPSRecordInfo recordInfo = new DeconstructionODPSRecordInfo();
+        recordInfo.setChannelContentId(record.getString("videoid"));
+        recordInfo.setVov(record.getDouble("vov"));
+        recordInfo.setMergeCate2(record.getString("merge_cate2"));
+        return recordInfo;
+    }
+}

+ 1 - 1
supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/ExecutionTaskJob.java

@@ -87,7 +87,7 @@ public class ExecutionTaskJob {
     public ReturnT<String> videoExecutionTaskCreateHandler(String params) {
         XxlJobLogger.log("videoExecutionTaskCreateHandler start");
         try {
-            executionTaskService.videoExecutionTaskCreateHandler(params);
+            executionTaskService.videoDeconstructionSelectTopicExecutionTaskCreateHandler(params);
         } catch (Exception e) {
             log.error("videoExecutionTaskCreateHandler error", e);
             XxlJobLogger.log("videoExecutionTaskCreateHandler error", e);