supeng пре 1 недеља
родитељ
комит
b0af479f2b
25 измењених фајлова са 695 додато и 104 уклоњено
  1. 32 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/ContentTypeEnum.java
  2. 3 2
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/ExceptionEnum.java
  3. 3 3
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/ExecutionTaskTypeEnum.java
  4. 6 6
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/WorkflowTaskStatusEnum.java
  5. 34 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/deconstruction/DeconstructionTaskStatusEnum.java
  6. 32 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/deconstruction/SenceTypeEnum.java
  7. 138 3
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/ContentDeconstructionClient.java
  8. 15 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/deconstruction/ApiResponse.java
  9. 8 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/deconstruction/QueryResponseDataDTO.java
  10. 8 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/deconstruction/SubmitResponseDataDTO.java
  11. 22 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/longvideoapi/JsonView.java
  12. 15 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/longvideoapi/WxCoverImgVO.java
  13. 29 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/longvideoapi/WxUserInfoForVideoVO.java
  14. 131 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/longvideoapi/WxVideoV2VO.java
  15. 6 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdExecutionTask.java
  16. 2 3
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdWorkflowTask.java
  17. 23 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/request/DeconstructionTaskSubmitParam.java
  18. 19 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/request/VideoInfoGetParam.java
  19. 2 2
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/vo/SdWorkflowTaskVO.java
  20. 7 2
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ExecutionTaskService.java
  21. 82 9
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskServiceImpl.java
  22. 23 23
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/WorkflowTaskServiceImpl.java
  23. 32 48
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/util/IdGeneratorUtil.java
  24. 2 2
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/util/OdpsManager.java
  25. 21 1
      supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/ExecutionTaskJob.java

+ 32 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/ContentTypeEnum.java

@@ -0,0 +1,32 @@
+package com.tzld.piaoquan.sde.common.enums;
+
+import lombok.Getter;
+
+/**
+ * 子任务类型
+ *
+ * @author supeng
+ */
+@Getter
+public enum ContentTypeEnum {
+    TEXT(1, "文本"),
+    IMAGE(2, "图片"),
+    VIDEO(3, "视频");
+
+    private final Integer value;
+    private final String desc;
+
+    ContentTypeEnum(Integer value, String desc) {
+        this.value = value;
+        this.desc = desc;
+    }
+
+    public static ContentTypeEnum getInstance(Integer value) {
+        for (ContentTypeEnum contentTypeEnum : ContentTypeEnum.values()) {
+            if (contentTypeEnum.getValue().equals(value)) {
+                return contentTypeEnum;
+            }
+        }
+        return null;
+    }
+}

+ 3 - 2
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/ExceptionEnum.java

@@ -50,13 +50,14 @@ public enum ExceptionEnum {
     EXTERNAL_SERVICE_TIMEOUT(5001, "外部服务超时"),
     NETWORK_ERROR(5002, "网络错误"),
     HTTP_REQUEST_ERROR(5003, "HTTP请求异常"),
+    API_REQUEST_ERROR(5004, "API请求异常"),
 
     // ========================= 6000 业务规则/操作类 =========================
     ILLEGAL_OPERATION(6000, "非法操作"),
     BUSINESS_RULE_VIOLATION(6001, "业务规则冲突"),
     FEATURE_FLAG_CONFLICT(6002, "功能开关冲突"),
-    NOT_SUPPORT_TASK_TYPE(6003, "不支持该任务类型"),
-    NOT_SUPPORT_SUB_TASK_TYPE(6004, "不支持该子任务类型"),
+    NOT_SUPPORT_WORKFLOW_TASK_TYPE(6003, "不支持该流程任务类型"),
+    NOT_SUPPORT_EXECUTION_TASK_TYPE(6004, "不支持该执行任务类型"),
 
     // ========================= 7000 并发/事务类 =========================
     CONCURRENT_MODIFICATION(7000, "并发修改冲突"),

+ 3 - 3
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/ExecutionTaskTypeEnum.java

@@ -20,9 +20,9 @@ public enum ExecutionTaskTypeEnum {
         this.desc = desc;
     }
     public static ExecutionTaskTypeEnum getInstance(Integer value) {
-        for (ExecutionTaskTypeEnum subTaskTypeEnum : ExecutionTaskTypeEnum.values()) {
-            if (subTaskTypeEnum.getValue().equals(value)) {
-                return subTaskTypeEnum;
+        for (ExecutionTaskTypeEnum executionTaskTypeEnum : ExecutionTaskTypeEnum.values()) {
+            if (executionTaskTypeEnum.getValue().equals(value)) {
+                return executionTaskTypeEnum;
             }
         }
         return null;

+ 6 - 6
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/TaskStatusEnum.java → supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/WorkflowTaskStatusEnum.java

@@ -8,7 +8,7 @@ import lombok.Getter;
  * @author supeng
  */
 @Getter
-public enum TaskStatusEnum {
+public enum WorkflowTaskStatusEnum {
     INIT(0, "初始化"),
     PRE_PROCESSING(1, "前置处理/子任务处理中"),
     READY(2, "就绪"),
@@ -20,15 +20,15 @@ public enum TaskStatusEnum {
     private final Integer value;
     private final String desc;
 
-    TaskStatusEnum(Integer value, String desc) {
+    WorkflowTaskStatusEnum(Integer value, String desc) {
         this.value = value;
         this.desc = desc;
     }
 
-    public static TaskStatusEnum getInstance(Integer value) {
-        for (TaskStatusEnum taskStatusEnum : TaskStatusEnum.values()) {
-            if (taskStatusEnum.getValue().equals(value)) {
-                return taskStatusEnum;
+    public static WorkflowTaskStatusEnum getInstance(Integer value) {
+        for (WorkflowTaskStatusEnum workflowTaskStatusEnum : WorkflowTaskStatusEnum.values()) {
+            if (workflowTaskStatusEnum.getValue().equals(value)) {
+                return workflowTaskStatusEnum;
             }
         }
         return null;

+ 34 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/deconstruction/DeconstructionTaskStatusEnum.java

@@ -0,0 +1,34 @@
+package com.tzld.piaoquan.sde.common.enums.deconstruction;
+
+import lombok.Getter;
+
+/**
+ * 解构任务运行状态
+ *
+ * @author supeng
+ */
+@Getter
+public enum DeconstructionTaskStatusEnum {
+    PENDING(0, "待执行"),
+    RUNNING(1, "执行中"),
+    SUCCESS(2, "成功"),
+    FAILED(3, "失败"),
+    ERROR(-1, "异常");
+
+    private final Integer value;
+    private final String desc;
+
+    DeconstructionTaskStatusEnum(Integer value, String desc) {
+        this.value = value;
+        this.desc = desc;
+    }
+
+    public static DeconstructionTaskStatusEnum getInstance(Integer value) {
+        for (DeconstructionTaskStatusEnum deconstructionTaskStatusEnum : DeconstructionTaskStatusEnum.values()) {
+            if (deconstructionTaskStatusEnum.getValue().equals(value)) {
+                return deconstructionTaskStatusEnum;
+            }
+        }
+        return null;
+    }
+}

+ 32 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/common/enums/deconstruction/SenceTypeEnum.java

@@ -0,0 +1,32 @@
+package com.tzld.piaoquan.sde.common.enums.deconstruction;
+
+import lombok.Getter;
+
+/**
+ * 子任务类型
+ *
+ * @author supeng
+ */
+@Getter
+public enum SenceTypeEnum {
+    TOPIC_SELECTION(0, "选题"),
+    CONTENT_CREATION(1, "创作"),
+    PRODUCTION(2, "制作");
+
+    private final Integer value;
+    private final String desc;
+
+    SenceTypeEnum(Integer value, String desc) {
+        this.value = value;
+        this.desc = desc;
+    }
+
+    public static SenceTypeEnum getInstance(Integer value) {
+        for (SenceTypeEnum senceTypeEnum : SenceTypeEnum.values()) {
+            if (senceTypeEnum.getValue().equals(value)) {
+                return senceTypeEnum;
+            }
+        }
+        return null;
+    }
+}

+ 138 - 3
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/ContentDeconstructionClient.java

@@ -1,9 +1,28 @@
 package com.tzld.piaoquan.sde.integration;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+import com.tzld.piaoquan.sde.common.enums.ContentTypeEnum;
+import com.tzld.piaoquan.sde.common.enums.ExceptionEnum;
+import com.tzld.piaoquan.sde.common.enums.deconstruction.SenceTypeEnum;
+import com.tzld.piaoquan.sde.common.exception.BizException;
+import com.tzld.piaoquan.sde.common.exception.HttpServiceException;
+import com.tzld.piaoquan.sde.model.dto.deconstruction.ApiResponse;
+import com.tzld.piaoquan.sde.model.dto.deconstruction.SubmitResponseDataDTO;
+import com.tzld.piaoquan.sde.model.dto.longvideoapi.JsonView;
+import com.tzld.piaoquan.sde.model.dto.longvideoapi.WxVideoV2VO;
 import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
+import com.tzld.piaoquan.sde.model.request.DeconstructionTaskSubmitParam;
+import com.tzld.piaoquan.sde.model.request.VideoInfoGetParam;
 import lombok.extern.slf4j.Slf4j;
+import okhttp3.*;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
 /**
  * 解构
  *
@@ -12,6 +31,21 @@ import org.springframework.stereotype.Service;
 @Slf4j
 @Service
 public class ContentDeconstructionClient {
+
+    private static final MediaType MEDIA_TYPE = MediaType.parse("application/json, charset=utf-8");
+
+    @Value("${deconstruction.task.submit.url:http://supply-content-deconstruction-api.piaoquantv.com/api/v1/content/tasks/decode}")
+    private String deconstructionTaskSubmitUrl;
+
+    @Value("${deconstruction.task.query.url:http://supply-content-deconstruction-api.piaoquantv.com/api/v1/content/tasks/%s}")
+    private String deconstructionTaskQueryUrl;
+
+    @Value("${longvideoapi.videoinfo.get.url:http://longvideoapi-internal.piaoquantv.com/longvideoapi/openapi/video/getVideoInfo}")
+    private String longVideoApiVideoInfoGetUrl;
+
+    private static final OkHttpClient client = new OkHttpClient().newBuilder().connectTimeout(10, TimeUnit.SECONDS)
+            .readTimeout(10, TimeUnit.SECONDS).writeTimeout(10, TimeUnit.SECONDS).build();
+
     /**
      * 提交解构任务
      *
@@ -19,10 +53,90 @@ public class ContentDeconstructionClient {
      * @return 解构任务ID
      */
     public String submitDeconstructionTask(SdExecutionTask sdExecutionTask) {
-        if (sdExecutionTask == null) {
+        if (Objects.isNull(sdExecutionTask) || Objects.isNull(sdExecutionTask.getContentType())
+                || Objects.isNull(sdExecutionTask.getContentId())) {
+            throw new IllegalArgumentException("sdExecutionTask is null");
+        }
+        DeconstructionTaskSubmitParam param = new DeconstructionTaskSubmitParam();
+        param.setScene(SenceTypeEnum.TOPIC_SELECTION.getValue());
+        param.setContent_type(sdExecutionTask.getContentType());
+        DeconstructionTaskSubmitParam.Content content = new DeconstructionTaskSubmitParam.Content();
+        String contentId = sdExecutionTask.getContentId();
+        content.setChannel_content_id(contentId);
+        if (Objects.equals(sdExecutionTask.getContentType(), ContentTypeEnum.VIDEO.getValue())) {
+            Long videoId = Long.parseLong(contentId);
+            //获取视频信息
+            WxVideoV2VO wxVideoV2VO = getVideoInfo(videoId);
+            if (Objects.isNull(wxVideoV2VO)) {
+                throw new BizException(ExceptionEnum.DATA_NOT_EXIST, "videoInfo get error videoId:" + videoId);
+            }
+            String videoPath = wxVideoV2VO.getVideoPath();
+            if (Objects.isNull(videoPath) || videoPath.endsWith(".m3u8")) {
+                throw new BizException(ExceptionEnum.DATA_ERROR, "视频格式不支持解构 videoId:" + videoId);
+            }
+            content.setTitle(wxVideoV2VO.getTitle());
+            content.setVideo_url(videoPath);
+            content.setChannel_account_id(wxVideoV2VO.getUid().toString());
+            if (Objects.nonNull(wxVideoV2VO.getUser())) {
+                content.setChannel_account_name(wxVideoV2VO.getUser().getNickName());
+            }
+        }
+        param.setContent(content);
+        //提交任务
+        return submitDeconstructionTask(param);
+    }
 
+    private String submitDeconstructionTask(DeconstructionTaskSubmitParam param) {
+        RequestBody body = RequestBody.create(MEDIA_TYPE, JSON.toJSONString(param));
+        Request request = new Request.Builder()
+                .url(deconstructionTaskSubmitUrl)
+                .post(body)
+                .addHeader("Content-Type", "application/json")
+                .addHeader("Accept", "application/json")
+                .build();
+        try (Response response = client.newCall(request).execute()) {
+            if (!response.isSuccessful()) {
+                throw new HttpServiceException(ExceptionEnum.HTTP_REQUEST_ERROR.getCode(), "code: " + response.code() + ", msg: " + response.message());
+            }
+            String respBody = response.body().string();
+            // 5. 反序列化返回值
+            ApiResponse<SubmitResponseDataDTO> apiResponse = JSON.parseObject(respBody, new TypeReference<ApiResponse<SubmitResponseDataDTO>>() {
+            });
+            if (apiResponse.getCode() != 0) {
+                throw new BizException(ExceptionEnum.API_REQUEST_ERROR.getCode(), JSON.toJSONString(apiResponse));
+            }
+            return apiResponse.getData().getTaskId();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
-        return "";
+    }
+
+    private WxVideoV2VO getVideoInfo(Long videoId) {
+        VideoInfoGetParam videoInfoGetParam = new VideoInfoGetParam();
+        videoInfoGetParam.setVideoId(videoId);
+        RequestBody body = RequestBody.create(MEDIA_TYPE, JSON.toJSONString(videoInfoGetParam));
+        Request request = new Request.Builder()
+                .url(longVideoApiVideoInfoGetUrl)
+                .post(body)
+                .addHeader("Content-Type", "application/json")
+                .addHeader("Accept", "application/json")
+                .build();
+        try (Response response = client.newCall(request).execute()) {
+            if (!response.isSuccessful()) {
+                throw new HttpServiceException(ExceptionEnum.HTTP_REQUEST_ERROR.getCode(), "code: " + response.code() + ", msg: " + response.message());
+            }
+            String respBody = response.body().string();
+            // 5. 反序列化返回值
+            JsonView<WxVideoV2VO> jsonView = JSON.parseObject(respBody, new TypeReference<JsonView<WxVideoV2VO>>() {
+            });
+            if (jsonView.getCode() != 0) {
+                throw new BizException(ExceptionEnum.API_REQUEST_ERROR.getCode(), JSON.toJSONString(jsonView));
+            }
+            return jsonView.getData();
+        } catch (IOException e) {
+            log.error(e.getMessage(), e);
+        }
+        return null;
     }
 
     /**
@@ -32,6 +146,27 @@ public class ContentDeconstructionClient {
      * @return 解构结果
      */
     public String getDeconstructionTaskResult(String jobId) {
-        return "";
+        String url = String.format(deconstructionTaskQueryUrl, jobId);
+        Request request = new Request.Builder()
+                .url(url)
+                .get()
+                .addHeader("Content-Type", "application/json")
+                .addHeader("Accept", "application/json")
+                .build();
+        try (Response response = client.newCall(request).execute()) {
+            if (!response.isSuccessful()) {
+                throw new HttpServiceException(ExceptionEnum.HTTP_REQUEST_ERROR.getCode(), "code: " + response.code() + ", msg: " + response.message());
+            }
+            String respBody = response.body().string();
+            // 5. 反序列化返回值
+            ApiResponse<SubmitResponseDataDTO> apiResponse = JSON.parseObject(respBody, new TypeReference<ApiResponse<SubmitResponseDataDTO>>() {
+            });
+            if (apiResponse.getCode() != 0) {
+                throw new BizException(ExceptionEnum.API_REQUEST_ERROR.getCode(), JSON.toJSONString(apiResponse));
+            }
+            return apiResponse.getData().getTaskId();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 }

+ 15 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/deconstruction/ApiResponse.java

@@ -0,0 +1,15 @@
+package com.tzld.piaoquan.sde.model.dto.deconstruction;
+
+import lombok.Data;
+
+/**
+ * @author supeng
+ */
+@Data
+public class ApiResponse<T> {
+    private Integer code;
+    private String msg;
+    private T data;
+    private String reason;
+
+}

+ 8 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/deconstruction/QueryResponseDataDTO.java

@@ -0,0 +1,8 @@
+package com.tzld.piaoquan.sde.model.dto.deconstruction;
+
+import lombok.Data;
+
+@Data
+public class QueryResponseDataDTO {
+    private String taskId;
+}

+ 8 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/deconstruction/SubmitResponseDataDTO.java

@@ -0,0 +1,8 @@
+package com.tzld.piaoquan.sde.model.dto.deconstruction;
+
+import lombok.Data;
+
+@Data
+public class SubmitResponseDataDTO {
+    private String taskId;
+}

+ 22 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/longvideoapi/JsonView.java

@@ -0,0 +1,22 @@
+package com.tzld.piaoquan.sde.model.dto.longvideoapi;
+
+import lombok.Data;
+
+/**
+ * copy from longvideoapi project
+ * @param <T>
+ */
+@Data
+public final class JsonView<T> {
+	private int code = 0;
+	private String msg = "success";
+	//@JsonInclude(Include.NON_NULL)
+	private T data;
+	
+	private Object errorData;
+//	private ExtData extData;
+
+	private T otherData;
+	
+	private String extInfo;
+}

+ 15 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/longvideoapi/WxCoverImgVO.java

@@ -0,0 +1,15 @@
+package com.tzld.piaoquan.sde.model.dto.longvideoapi;
+
+import lombok.Data;
+
+/**
+ * copy from longvideoapi project
+ */
+@Data
+public class WxCoverImgVO {
+	private String coverImgPath;
+	private Long coverImgId;
+	private Integer width;
+	private Integer height;
+	private String thumbnailImagePath;
+}

+ 29 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/longvideoapi/WxUserInfoForVideoVO.java

@@ -0,0 +1,29 @@
+package com.tzld.piaoquan.sde.model.dto.longvideoapi;
+
+
+import lombok.Data;
+
+/**
+ * copy from longvideoapi project
+ */
+@Data
+public class WxUserInfoForVideoVO {
+    private Long uid;
+    private String nickName;
+    private String avatarUrl;
+    private Boolean isBothFollow = false;
+    private Boolean isFollowed = false;
+    private Integer userType;
+    private Integer vipStatus = 0;
+    private String vipDesc;
+    private Integer subscribeStatus = 0;
+    private String introduction;
+    private Integer positionType = 1;
+    private Integer otherVideoShowCount = 10;
+    private Integer sensitiveStatus;
+    private Integer videos;
+    private Long idols;
+    private Long fans;
+    private Long playCountTotal;
+    private Boolean isCrawler;
+}

+ 131 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/dto/longvideoapi/WxVideoV2VO.java

@@ -0,0 +1,131 @@
+package com.tzld.piaoquan.sde.model.dto.longvideoapi;
+
+import lombok.Data;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * copy from longvideoapi project
+ */
+@Data
+public class WxVideoV2VO {
+    private Long id;
+    private Integer status;
+    private WxUserInfoForVideoVO user;
+    private WxCoverImgVO coverImg;
+    //    private List<VideoBarrage> barrage;
+    private Long uid;
+    private String videoPath;
+    private String ossVideoPath;
+    private Integer playCount;
+    private Integer playCountTotal;
+    private Integer shareCount;
+    private Integer totalTime;
+    private Date gmtCreate;
+    private Date gmtModifie;
+    private Long gmtCreateTimestamp;
+    private Long gmtModifiedTimestamp;
+    private Integer transcodeStatus;
+    private Integer width;
+    private Integer height;
+    private String rotate;
+    private Long bitRate;
+    private String shareImgPath;
+    private Long shareImgId;
+    private String thumbnailImagePath;
+    private String fileExtensions;
+    private String title;
+    private Long titleId;
+    private Boolean favorited = false;
+    private Integer shareCountFriend;
+    private Long favoriteds;
+    private Long playTime;
+    private int playBeforeDay;
+    private int sendBeforeDay;
+    private Long lastTimestamp;
+    private String gmtCreateDescr;
+    private Integer shareLinkType;
+    private String playCountFormatStr;
+    private String shareTitle;
+    private Long shareTitleId;
+    private Integer sensitiveStatus;
+    private String sensitiveMsg = "";
+    private Integer barrageSwitch = 1;
+    private Long videoCollectionId;
+    private String tabShareImgPath;
+    private String h5ShareImgPath;
+    private Map<String, String> h5ShareImgPathMap;
+    private Boolean showHotRecommend = true;
+    private Integer commentCount = 0;
+    private Integer barrageCount = 0;
+    private Long size;
+    private String descr;
+    private Integer measureType = 0;
+    private Integer encryption = 1;
+    private String pwd;
+    private boolean firstPicture = false;
+    private String videoCoverSnapshotPath;
+    private Integer hasTailVideo = 0;
+    private Integer tailType = 0;
+    private String videoTailPath;
+    private Long measureId;
+    private Integer measure = 0;
+
+    // 新加试看视频相关字段
+    // 试看视频的转码
+    private String sampleRequestId;
+    private String sampleJobId;
+    private Integer sampleTranscodeStatus;
+    private String sampleTransedVideoPath;
+    private Integer sampleTotalTime;
+    //    private ChargeVO chargeDetail;
+    private Boolean hasShareSpaceData = false;
+    private Integer isRecommendShare = 0;
+    private Integer recommendSource = 0;
+    private String videoReportMeta;
+    private Integer auditStatus;
+    private Integer appAuditStatus;
+    private String auditTranscationId;
+    private String auditReason;
+    //    private VideoProcessShareTailLabVO processShareTailLab;
+//    private VideoProcessShareHeadLabVO processShareHeadLab;
+    private Integer videoLevelCode;
+    private Integer sharePageType;
+    //    private VideoShareJumpModel videoShareJumpModel;
+//    private ActivityProductionVideoVoteVO apVoteData;
+    private Integer recommendStatus;
+    private String cutVoStr;
+    private String recommendId;
+    private String recommendLogVO;
+    private Integer appRecommendStatus;
+    //    private ProduceProjectLinkVO produceProjectData;
+//    private ProduceProjectDataV2VO produceProjectDataV2;
+//    private TopicVO topicInfo;
+    private Integer lehuoquanRecommendStatus;
+    private String flowPool;
+    private Integer isTopRUOV = 0;
+    private Integer originalStatus;
+    private Integer frozenStatus;
+    //	@ApiModelProperty(value = "是否有权限播放高清视频:0 否 1 是")
+//	private Integer hasMultiBitRatePermission;
+//	@ApiModelProperty(value = "多码率信息")
+//	private List<VideoMultiBitRateVO> multiBitRate;
+    private Integer isInFlowPool;
+    private String recomTraceId;
+    private Integer canBeReward = 0;
+    private Integer rewardCount = 0;
+    //    private List<VideoDetailRewardUserVO> rewardUsers;
+    private Integer h5CanBeReward = 0;
+    //	private VideoTitleInfoVO originalTitle;
+//	private VideoTitleInfoVO distributeTitle;
+//	private VideoCoverInfoVO originalCover;
+//	private VideoCoverInfoVO distributeCover;
+    private Integer trafficType;
+    private Integer coverTrafficType;
+    private String h265VideoPath;
+    private Long h265BitRate;
+    private Integer aiLevel = 0;
+    private String aiAlertText;
+
+}

+ 6 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdExecutionTask.java

@@ -24,6 +24,12 @@ public class SdExecutionTask {
      * 执行任务编号,用于异步回调时的精确匹配 & 对外暴露
      */
     private String taskNo;
+    /**
+     * 内容类型
+     *
+     * @see com.tzld.piaoquan.sde.common.enums.ContentTypeEnum
+     */
+    private Integer contentType;
     /**
      * 内容ID,videoId等
      */

+ 2 - 3
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdWorkflowTask.java

@@ -3,8 +3,7 @@ package com.tzld.piaoquan.sde.model.entity;
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
-import com.tzld.piaoquan.sde.common.enums.TaskStatusEnum;
-import com.tzld.piaoquan.sde.common.enums.TaskTypeEnum;
+import com.tzld.piaoquan.sde.common.enums.WorkflowTaskStatusEnum;
 import lombok.Data;
 
 import java.util.Date;
@@ -43,7 +42,7 @@ public class SdWorkflowTask {
      * 任务状态:0 INIT(初始化), 1 PRE_PROCESSING(前置处理/子任务处理中),
      * 2 READY(就绪), 3 PROCESSING(处理中), 4 SUCCESS(成功) 5 FAILED(失败) 6 TIMEOUT(超时)
      *
-     * @see TaskStatusEnum
+     * @see WorkflowTaskStatusEnum
      */
     private Integer taskStatus;
     /**

+ 23 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/request/DeconstructionTaskSubmitParam.java

@@ -0,0 +1,23 @@
+package com.tzld.piaoquan.sde.model.request;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class DeconstructionTaskSubmitParam {
+    private Integer scene;
+    private Integer content_type;
+    private Content content;
+
+    @Data
+    public static class Content {
+        private String channel_content_id;
+        private String video_url;
+        private List<String> images;
+        private String body_text;
+        private String title;
+        private String channel_account_id;
+        private String channel_account_name;
+    }
+}

+ 19 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/request/VideoInfoGetParam.java

@@ -0,0 +1,19 @@
+package com.tzld.piaoquan.sde.model.request;
+
+import lombok.Data;
+import javax.validation.constraints.NotNull;
+
+/**
+ * 视频ID
+ *
+ * @author supeng
+ */
+@Data
+public class VideoInfoGetParam {
+    /**
+     * 视频ID
+     */
+    @NotNull(message = "视频ID")
+    private Long videoId;
+
+}

+ 2 - 2
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/vo/SdWorkflowTaskVO.java

@@ -1,6 +1,6 @@
 package com.tzld.piaoquan.sde.model.vo;
 
-import com.tzld.piaoquan.sde.common.enums.TaskStatusEnum;
+import com.tzld.piaoquan.sde.common.enums.WorkflowTaskStatusEnum;
 import com.tzld.piaoquan.sde.common.enums.TaskTypeEnum;
 import lombok.Data;
 
@@ -35,7 +35,7 @@ public class SdWorkflowTaskVO {
      * 任务状态:0 INIT(初始化), 1 PRE_PROCESSING(前置处理/子任务处理中),
      * 2 READY(就绪), 3 PROCESSING(处理中), 4 SUCCESS(成功) 5 FAILED(失败) 6 TIMEOUT(超时)
      *
-     * @see TaskStatusEnum
+     * @see WorkflowTaskStatusEnum
      */
     private Integer taskStatus;
     private String taskStatusLabel;

+ 7 - 2
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ExecutionTaskService.java

@@ -33,12 +33,17 @@ public interface ExecutionTaskService {
     SdExecutionTask get(CommonRequest<ExecutionTaskGetParam> request);
 
     /**
-     * 任务提交
+     * 执行任务提交
      */
     void executionTaskSubmitHandler();
 
     /**
-     * 更新任务状态/结果同步
+     * 更新执行任务状态/结果同步
      */
     void executionTaskSyncHandler();
+
+    /**
+     * 昨日回流Top内容定时解构
+     */
+    void yesterdayTopReturnVideoExecutionTaskCreateHandler();
 }

+ 82 - 9
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskServiceImpl.java

@@ -1,13 +1,12 @@
 package com.tzld.piaoquan.sde.service.impl;
 
+import com.aliyun.odps.data.Record;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.tzld.piaoquan.sde.common.api.CommonRequest;
-import com.tzld.piaoquan.sde.common.enums.ExecutionTaskStatusEnum;
-import com.tzld.piaoquan.sde.common.enums.ExecutionTaskTypeEnum;
-import com.tzld.piaoquan.sde.common.enums.IsDeleteEnum;
-import com.tzld.piaoquan.sde.common.enums.TaskStatusEnum;
+import com.tzld.piaoquan.sde.common.enums.*;
+import com.tzld.piaoquan.sde.common.exception.HttpServiceException;
 import com.tzld.piaoquan.sde.integration.ContentDeconstructionClient;
 import com.tzld.piaoquan.sde.mapper.SdExecutionTaskMapper;
 import com.tzld.piaoquan.sde.mapper.SdExecutionTaskRawResultMapper;
@@ -15,14 +14,17 @@ import com.tzld.piaoquan.sde.mapper.SdExecutionTaskResultItemMapper;
 import com.tzld.piaoquan.sde.mapper.SdWorkflowTaskMapper;
 import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
 import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskRawResult;
-import com.tzld.piaoquan.sde.model.entity.SdWorkflowTask;
 import com.tzld.piaoquan.sde.model.request.ExecutionTaskGetParam;
 import com.tzld.piaoquan.sde.model.request.ExecutionTaskListParam;
 import com.tzld.piaoquan.sde.model.vo.SdExecutionTaskVO;
 import com.tzld.piaoquan.sde.service.ExecutionTaskService;
+import com.tzld.piaoquan.sde.util.DateUtil;
+import com.tzld.piaoquan.sde.util.IdGeneratorUtil;
+import com.tzld.piaoquan.sde.util.OdpsManager;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
@@ -51,6 +53,14 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
     @Autowired
     private SdWorkflowTaskMapper sdWorkflowTaskMapper;
 
+    @Value("${yesterday.return.video.table:loghubods.lastday_return}")
+    private String yesterdayReturnVideoTable;
+
+    private static final Integer TOP_N = 10;
+
+    @Autowired
+    private OdpsManager odpsManager;
+
     @Override
     public void create(SdExecutionTask sdExecutionTask) {
         int rows = sdExecutionTaskMapper.insert(sdExecutionTask);
@@ -97,9 +107,9 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
             if (Objects.nonNull(executionTaskTypeEnum)) {
                 sdExecutionTaskVO.setTaskTypeLabel(executionTaskTypeEnum.getDesc());
             }
-            TaskStatusEnum taskStatusEnum = TaskStatusEnum.getInstance(sdExecutionTask.getTaskStatus());
-            if (Objects.nonNull(taskStatusEnum)) {
-                sdExecutionTaskVO.setTaskStatusLabel(taskStatusEnum.getDesc());
+            WorkflowTaskStatusEnum workflowTaskStatusEnum = WorkflowTaskStatusEnum.getInstance(sdExecutionTask.getTaskStatus());
+            if (Objects.nonNull(workflowTaskStatusEnum)) {
+                sdExecutionTaskVO.setTaskStatusLabel(workflowTaskStatusEnum.getDesc());
             }
             pageVO.getRecords().add(sdExecutionTaskVO);
         }
@@ -134,8 +144,24 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
         //提交子任务
         for (SdExecutionTask sdExecutionTask : list) {
             try {
-                String jobId = contentDeconstructionClient.submitDeconstructionTask(sdExecutionTask);
+                ExecutionTaskTypeEnum executionTaskTypeEnum = ExecutionTaskTypeEnum.getInstance(sdExecutionTask.getTaskType());
+                String jobId = null;
+                switch (Objects.requireNonNull(executionTaskTypeEnum)) {
+                    case DECONSTRUCT:
+                        jobId = contentDeconstructionClient.submitDeconstructionTask(sdExecutionTask);
+                        break;
+                    case CLUSTER:
+                        break;
+                    default:
+                        log.error("executionTaskTypeEnum is illegal.");
+                }
                 if (Objects.isNull(jobId) || jobId.isEmpty()) {
+                    SdExecutionTask update = new SdExecutionTask();
+                    update.setId(sdExecutionTask.getId());
+                    update.setExternalJobId("任务提交失败:未获取到jobId");
+                    update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
+                    int rows = sdExecutionTaskMapper.updateById(update);
+                    log.info("subTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
                     continue;
                 }
                 SdExecutionTask update = new SdExecutionTask();
@@ -146,6 +172,16 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
                 log.info("subTask submit success, id:{} rows = {}", sdExecutionTask.getId(), rows);
             } catch (Exception e) {
                 log.error("subTask submit error {}", sdExecutionTask, e);
+                if (e instanceof HttpServiceException) {
+                    //http请求异常 不直接失败;下次重试
+                    continue;
+                }
+                SdExecutionTask update = new SdExecutionTask();
+                update.setId(sdExecutionTask.getId());
+                update.setExternalJobId("任务提交失败:" + e.getMessage());
+                update.setTaskStatus(ExecutionTaskStatusEnum.FAILED.getValue());
+                int rows = sdExecutionTaskMapper.updateById(update);
+                log.info("subTask submit failure, id:{} rows = {}", sdExecutionTask.getId(), rows);
             }
         }
         long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
@@ -195,4 +231,41 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
         long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
         log.info("executionTaskSyncHandler finish cost={}ms", costMs);
     }
+
+    @Override
+    public void yesterdayTopReturnVideoExecutionTaskCreateHandler() {
+        long start = System.nanoTime();
+        log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler start");
+        //获取昨日Top10视频
+        LocalDateTime yesterday = LocalDateTime.now().minusDays(1);
+        String dt = DateUtil.formatLocalDateTime(yesterday, "yyyyMMdd");
+        String sql = "select * from " + yesterdayReturnVideoTable + " where dt='" + dt + "' ORDER BY 回流人数 DESC LIMIT " + TOP_N + ";";
+        List<Record> records = odpsManager.query(sql);
+        if (Objects.isNull(records) || records.isEmpty()) {
+            log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler records is empty");
+            return;
+        }
+        long topCostMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler get top video cost={}ms", topCostMs);
+        //创建解构任务
+        int count = 0;
+        for (Record record : records) {
+            try {
+                String videoId = record.getString("videoid");
+                SdExecutionTask sdExecutionTask = new SdExecutionTask();
+                sdExecutionTask.setTaskNo(IdGeneratorUtil.generateExecutionTaskNo());
+                sdExecutionTask.setTaskType(TaskTypeEnum.DECONSTRUCT.getValue());
+                sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
+                sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
+                sdExecutionTask.setContentId(videoId);
+                create(sdExecutionTask);
+                count++;
+            } catch (Exception e) {
+                log.error("yesterdayTopReturnVideoExecutionTaskCreateHandler error {}", record, e);
+            }
+        }
+        long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler recordSize = {}  count = {} finish cost = {}ms", records.size(), count, costMs);
+    }
+
 }

+ 23 - 23
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/WorkflowTaskServiceImpl.java

@@ -64,7 +64,7 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
     public void create(CommonRequest<WorkflowTaskCreateParam> request) {
         WorkflowTaskCreateParam params = request.getParams();
         SdWorkflowTask sdWorkflowTask = new SdWorkflowTask();
-        sdWorkflowTask.setTaskNo(IdGeneratorUtil.generateTaskNo());
+        sdWorkflowTask.setTaskNo(IdGeneratorUtil.generateWorkflowTaskNo());
         String taskName = params.getTaskName();
         if (Objects.isNull(taskName) || taskName.trim().isEmpty()) {
             taskName = PREFIX_TASK_NAME + Constant.LINE + DateUtil.formatLocalDateTime(LocalDateTime.now(), TASK_NAME_DATETIME_PATTEN);
@@ -81,10 +81,10 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
         sdWorkflowTask.setStrategyId(params.getStrategyId());
         TaskTypeEnum taskTypeEnum = TaskTypeEnum.getInstance(params.getTaskType());
         if (Objects.isNull(taskTypeEnum)) {
-            throw new BizException(ExceptionEnum.NOT_SUPPORT_TASK_TYPE);
+            throw new BizException(ExceptionEnum.NOT_SUPPORT_WORKFLOW_TASK_TYPE);
         }
 //        sdWorkflowTask.setTaskType(params.getTaskType());
-        sdWorkflowTask.setTaskStatus(TaskStatusEnum.INIT.getValue());
+        sdWorkflowTask.setTaskStatus(WorkflowTaskStatusEnum.INIT.getValue());
         int rows = sdWorkflowTaskMapper.insert(sdWorkflowTask);
         log.info("create insert task rows = {}", rows);
         if (rows <= 0) {
@@ -94,10 +94,10 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
         int total = 0;
         for (Long videoId : params.getVideoIds()) {
             SdExecutionTask sdExecutionTask = new SdExecutionTask();
-            String executionTaskNo = IdGeneratorUtil.generateSubTaskNo(sdWorkflowTask.getTaskNo());
+            String executionTaskNo = IdGeneratorUtil.generateExecutionTaskNo();
             sdExecutionTask.setTaskNo(executionTaskNo);
             sdExecutionTask.setContentId(String.valueOf(videoId));
-            sdExecutionTask.setTaskStatus(TaskStatusEnum.INIT.getValue());
+            sdExecutionTask.setTaskStatus(WorkflowTaskStatusEnum.INIT.getValue());
             int subRows = sdExecutionTaskMapper.insert(sdExecutionTask);
             total += subRows;
             log.info("create ExecutionTask rows = {}", subRows);
@@ -139,9 +139,9 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
         for (SdWorkflowTask sdWorkflowTask : pageList.getRecords()) {
             SdWorkflowTaskVO sdWorkflowTaskVO = new SdWorkflowTaskVO();
             BeanUtils.copyProperties(sdWorkflowTask, sdWorkflowTaskVO);
-            TaskStatusEnum taskStatusEnum = TaskStatusEnum.getInstance(sdWorkflowTask.getTaskStatus());
-            if (Objects.nonNull(taskStatusEnum)) {
-                sdWorkflowTaskVO.setTaskStatusLabel(taskStatusEnum.getDesc());
+            WorkflowTaskStatusEnum workflowTaskStatusEnum = WorkflowTaskStatusEnum.getInstance(sdWorkflowTask.getTaskStatus());
+            if (Objects.nonNull(workflowTaskStatusEnum)) {
+                sdWorkflowTaskVO.setTaskStatusLabel(workflowTaskStatusEnum.getDesc());
             }
             String strategyName = strategyMap.get(sdWorkflowTask.getStrategyId());
             if (Objects.nonNull(strategyName)) {
@@ -168,8 +168,8 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
                         .minusHours(6)
                         .atZone(ZoneId.systemDefault())
                         .toInstant());
-        List<Integer> taskStatusList = Arrays.asList(TaskStatusEnum.INIT.getValue(),
-                TaskStatusEnum.PRE_PROCESSING.getValue());
+        List<Integer> taskStatusList = Arrays.asList(WorkflowTaskStatusEnum.INIT.getValue(),
+                WorkflowTaskStatusEnum.PRE_PROCESSING.getValue());
         LambdaQueryWrapper<SdWorkflowTask> wrapper = Wrappers.lambdaQuery(SdWorkflowTask.class)
                 .eq(SdWorkflowTask::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
                 .in(SdWorkflowTask::getTaskStatus, taskStatusList)
@@ -180,11 +180,11 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
             return;
         }
         for (SdWorkflowTask sdWorkflowTask : tasks) {
-            TaskStatusEnum taskStatusEnum = TaskStatusEnum.getInstance(sdWorkflowTask.getTaskStatus());
-            if (Objects.isNull(taskStatusEnum)) {
+            WorkflowTaskStatusEnum workflowTaskStatusEnum = WorkflowTaskStatusEnum.getInstance(sdWorkflowTask.getTaskStatus());
+            if (Objects.isNull(workflowTaskStatusEnum)) {
                 continue;
             }
-            switch (taskStatusEnum) {
+            switch (workflowTaskStatusEnum) {
                 case INIT:
                     initTaskStatusCheck(sdWorkflowTask);
                     break;
@@ -203,9 +203,9 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
     private void preProcessingTaskStatsCheck(SdWorkflowTask sdWorkflowTask) {
         try {
             //获取完成任务数
-            List<Integer> finishTaskStatusList = Arrays.asList(TaskStatusEnum.SUCCESS.getValue(),
-                    TaskStatusEnum.FAILED.getValue(),
-                    TaskStatusEnum.TIMEOUT.getValue());
+            List<Integer> finishTaskStatusList = Arrays.asList(WorkflowTaskStatusEnum.SUCCESS.getValue(),
+                    WorkflowTaskStatusEnum.FAILED.getValue(),
+                    WorkflowTaskStatusEnum.TIMEOUT.getValue());
             LambdaQueryWrapper<SdExecutionTask> wrapper = Wrappers.lambdaQuery(SdExecutionTask.class)
                     .eq(SdExecutionTask::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
 //                    .eq(SdExecutionTask::getTaskId, sdworkflowTask.getId())
@@ -223,10 +223,10 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
                 updateTask.setFinishedExecutionTaskCount(finishCount.intValue());
                 //如果全部完成,更新状态
                 if (allFinished) {
-                    updateTask.setTaskStatus(TaskStatusEnum.READY.getValue());
+                    updateTask.setTaskStatus(WorkflowTaskStatusEnum.READY.getValue());
                 }
                 int updateRows = sdWorkflowTaskMapper.updateById(updateTask);
-                log.info("preProcessingTaskStatsCheck status:{} task update rows:{}", TaskStatusEnum.PRE_PROCESSING.getDesc(), updateRows);
+                log.info("preProcessingTaskStatsCheck status:{} task update rows:{}", WorkflowTaskStatusEnum.PRE_PROCESSING.getDesc(), updateRows);
             }
         } catch (Exception e) {
             log.error("preProcessingTaskStatsCheck error", e);
@@ -248,9 +248,9 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
             if (Objects.nonNull(count) && count > 0) {
                 SdWorkflowTask updateTask = new SdWorkflowTask();
                 updateTask.setId(sdTask.getId());
-                updateTask.setTaskStatus(TaskStatusEnum.PRE_PROCESSING.getValue());
+                updateTask.setTaskStatus(WorkflowTaskStatusEnum.PRE_PROCESSING.getValue());
                 int updateRows = sdWorkflowTaskMapper.updateById(updateTask);
-                log.info("initTaskStatusCheck status:{} task update rows:{}", TaskStatusEnum.INIT.getDesc(), updateRows);
+                log.info("initTaskStatusCheck status:{} task update rows:{}", WorkflowTaskStatusEnum.INIT.getDesc(), updateRows);
             }
         } catch (Exception e) {
             log.error("initTaskStatusCheck error", e);
@@ -264,7 +264,7 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
         //查询可以执行的任务
         LambdaQueryWrapper<SdWorkflowTask> wrapper = Wrappers.lambdaQuery(SdWorkflowTask.class)
                 .eq(SdWorkflowTask::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
-                .eq(SdWorkflowTask::getTaskStatus, TaskStatusEnum.READY.getValue());
+                .eq(SdWorkflowTask::getTaskStatus, WorkflowTaskStatusEnum.READY.getValue());
         List<SdWorkflowTask> tasks = sdWorkflowTaskMapper.selectList(wrapper);
         if (Objects.isNull(tasks) || tasks.isEmpty()) {
             log.info("workflowTaskExecuteHandler tasks is empty");
@@ -284,7 +284,7 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
                 if (Objects.isNull(strategyResultDTO)) {
                     SdWorkflowTask updateTask = new SdWorkflowTask();
                     updateTask.setId(sdWorkflowTask.getId());
-                    updateTask.setTaskStatus(TaskStatusEnum.FAILED.getValue());
+                    updateTask.setTaskStatus(WorkflowTaskStatusEnum.FAILED.getValue());
                     updateTask.setErrorMsg("");
                     int updateRows = sdWorkflowTaskMapper.updateById(updateTask);
                     log.info("");
@@ -299,7 +299,7 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
                 if (rows > 0) {
                     SdWorkflowTask updateTask = new SdWorkflowTask();
                     updateTask.setId(sdWorkflowTask.getId());
-                    updateTask.setTaskStatus(TaskStatusEnum.SUCCESS.getValue());
+                    updateTask.setTaskStatus(WorkflowTaskStatusEnum.SUCCESS.getValue());
                     int updateRows = sdWorkflowTaskMapper.updateById(updateTask);
                     log.info("task success update rows:{}", updateRows);
                 }

+ 32 - 48
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/util/IdGeneratorUtil.java

@@ -17,12 +17,12 @@ public class IdGeneratorUtil {
     /**
      * 供需任务前缀
      */
-    private static final String TASK_PREFIX = "TK";
+    private static final String WORKFLOW_TASK_PREFIX = "WT";
 
     /**
      * 子任务分隔符
      */
-    private static final String SUB_TASK_SEPARATOR = "-";
+    private static final String EXECUTION_TASK_SEPARATOR = "ET";
 
     /**
      * 日期格式 (用于主任务前缀,方便按天归档/分表)
@@ -40,68 +40,52 @@ public class IdGeneratorUtil {
     private static final char[] ALPHABET = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz".toCharArray();
 
     // ================= 核心方法 =================
+
     /**
-     * 生成主任务编号 (taskNo)
-     * 格式: TK + yyyyMMdd + 雪花ID
-     * 示例: TK202601141756281923812
+     * 生成 业务工作流编号 (WorkflowNo)
+     * 格式: WF + yyyyMMdd + 雪花ID
+     * 示例: WF202601231756281923812
+     *
+     * @return 唯一的业务单号
      */
-    public static String generateTaskNo() {
-        // 1. 获取日期前缀
-        String dateStr = LocalDateTime.now().format(DATE_FMT);
-
-        // 2. 获取 MyBatis-Plus 内置雪花ID (Long)
-        long workerId = IdWorker.getId();
-
-        // 3. 拼接返回
-        return TASK_PREFIX + dateStr + workerId;
+    public static String generateWorkflowTaskNo() {
+        return generateNo(WORKFLOW_TASK_PREFIX);
     }
 
     /**
-     * 生成单个子任务编号 (SubTaskNo)
-     * 格式: TaskNo + "-" + 6位随机码
-     * 示例: TK202601141756281923812-Ab9Xz1
+     * 生成 底层执行任务编号 (ExecutionNo)
+     * 优化点:不再依赖 taskNo,支持独立生成
+     * 格式: EX + yyyyMMdd + 雪花ID
+     * 示例: EX202601231756289988776
+     *
+     * @return 唯一的执行单号
      */
-    public static String generateSubTaskNo(String taskNo) {
-        if (taskNo == null || taskNo.isEmpty()) {
-            throw new IllegalArgumentException("TaskNo cannot be empty");
-        }
-        // 预分配 StringBuilder 容量,避免扩容
-        StringBuilder sb = new StringBuilder(taskNo.length() + 1 + SUFFIX_LEN);
-        sb.append(taskNo).append(SUB_TASK_SEPARATOR);
-
-        ThreadLocalRandom random = ThreadLocalRandom.current();
-        for (int i = 0; i < SUFFIX_LEN; i++) {
-            sb.append(ALPHABET[random.nextInt(ALPHABET.length)]);
-        }
-        return sb.toString();
+    public static String generateExecutionTaskNo() {
+        return generateNo(EXECUTION_TASK_SEPARATOR);
     }
 
     /**
-     * 批量生成子任务编号
-     *
-     * @param taskNo 主任务编号
-     * @param count  需要生成的数量
+     * 通用生成逻辑
      */
-    public static List<String> generateSubTaskNos(String taskNo, int count) {
-        if (count <= 0) {
-            return new ArrayList<>(0);
-        }
-        List<String> list = new ArrayList<>(count);
-        for (int i = 0; i < count; i++) {
-            list.add(generateSubTaskNo(taskNo));
-        }
-        return list;
+    private static String generateNo(String prefix) {
+        // 1. 获取日期部分 (8位)
+        String dateStr = LocalDateTime.now().format(DATE_FMT);
+
+        // 2. 获取分布式唯一ID (MyBatis-Plus IdWorker, 19位)
+        long workerId = IdWorker.getId();
+
+        // 3. 拼接 (总长度约 2 + 8 + 19 = 29字符)
+        return prefix + dateStr + workerId;
     }
 
     // ================= 测试 Main =================
     public static void main(String[] args) {
         // 1. 测试生成主任务
-        String mainTaskNo = generateTaskNo();
-        System.out.println("主任务 No: " + mainTaskNo);
+        String workflowTaskNo = generateWorkflowTaskNo();
+        System.out.println("workflowTaskNo: " + workflowTaskNo);
 
         // 2. 测试生成 5 个子任务
-        System.out.println("\n--- 子任务列表 ---");
-        List<String> subIds = generateSubTaskNos(mainTaskNo, 5);
-        subIds.forEach(System.out::println);
+        String executionTaskNo = generateExecutionTaskNo();
+        System.out.printf("executionTaskNo: " + executionTaskNo);
     }
 }

+ 2 - 2
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/util/ODPSManager.java → supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/util/OdpsManager.java

@@ -15,7 +15,7 @@ import java.util.*;
 
 @Slf4j
 @Component
-public class ODPSManager {
+public class OdpsManager {
     private final static String ACCESSID = "LTAIWYUujJAm7CbH";
     private final static String ACCESSKEY = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
     private final static String ENDPOINT = "http://service.cn.maxcompute.aliyun.com/api";
@@ -30,7 +30,7 @@ public class ODPSManager {
             i = SQLTask.run(odps, sql);
             i.waitForSuccess();
             List<Record> records = SQLTask.getResultByInstanceTunnel(i);
-            if (Objects.nonNull(records) && records.size() != 0) {
+            if (Objects.nonNull(records) && !records.isEmpty()) {
                 return records;
             }
         } catch (Exception e) {

+ 21 - 1
supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/ExecutionTaskJob.java

@@ -19,7 +19,7 @@ public class ExecutionTaskJob {
     private ExecutionTaskService executionTaskService;
 
     /**
-     * 定时提交任务
+     * 定时提交执行任务
      *
      * @param params
      * @return
@@ -57,4 +57,24 @@ public class ExecutionTaskJob {
         }
         return ReturnT.SUCCESS;
     }
+
+    /**
+     * 昨日回流Top内容定时解构
+     *
+     * @param params
+     * @return
+     */
+    @XxlJob("yesterdayTopReturnVideoExecutionTaskCreateHandler")
+    public ReturnT<String> yesterdayTopReturnVideoExecutionTaskCreateHandler(String params) {
+        XxlJobLogger.log("yesterdayTopReturnVideoExecutionTaskCreateHandler start");
+        try {
+            executionTaskService.yesterdayTopReturnVideoExecutionTaskCreateHandler();
+        } catch (Exception e) {
+            XxlJobLogger.log("yesterdayTopReturnVideoExecutionTaskCreateHandler error", e);
+            return ReturnT.FAIL;
+        } finally {
+            XxlJobLogger.log("yesterdayTopReturnVideoExecutionTaskCreateHandler end");
+        }
+        return ReturnT.SUCCESS;
+    }
 }