浏览代码

optimize import

supeng 1 周之前
父节点
当前提交
b6f2db312d
共有 13 个文件被更改,包括 379 次插入44 次删除
  1. 28 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/config/MybatisPlusSqlInjectorConfig.java
  2. 111 10
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/ContentDeconstructionClusterClient.java
  3. 13 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/mapper/SdExecutionTaskContentMapper.java
  4. 4 4
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdExecutionTask.java
  5. 53 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdExecutionTaskContent.java
  6. 23 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/request/ClusterTaskSubmitParam.java
  7. 29 5
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/request/ExecutionTaskListParam.java
  8. 14 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ExecutionTaskCreateService.java
  9. 2 1
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ExecutionTaskService.java
  10. 66 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskCreateServiceImpl.java
  11. 27 18
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskServiceImpl.java
  12. 5 2
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/WorkflowTaskServiceImpl.java
  13. 4 4
      supply-demand-engine-job/src/test/java/com/tzld/piaoquan/sde/ContentDeconstructionClusterClientTest.java

+ 28 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/config/MybatisPlusSqlInjectorConfig.java

@@ -0,0 +1,28 @@
+package com.tzld.piaoquan.sde.config;
+
+import com.baomidou.mybatisplus.core.injector.AbstractMethod;
+import com.baomidou.mybatisplus.core.injector.DefaultSqlInjector;
+import com.baomidou.mybatisplus.core.metadata.TableInfo;
+import com.baomidou.mybatisplus.extension.injector.methods.InsertBatchSomeColumn;
+import org.apache.ibatis.session.Configuration;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * mybatis plus 注入方法
+ *
+ * @author heyu
+ */
+@Component
+public class MybatisPlusSqlInjectorConfig extends DefaultSqlInjector {
+
+    @Override
+    public List<AbstractMethod> getMethodList(Configuration configuration,
+                                              Class<?> mapperClass,
+                                              TableInfo tableInfo) {
+        List<AbstractMethod> methodList = super.getMethodList(configuration, mapperClass, tableInfo);
+        methodList.add(new InsertBatchSomeColumn());
+        return methodList;
+    }
+}

+ 111 - 10
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/ContentDeconstructionClient.java → supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/ContentDeconstructionClusterClient.java

@@ -2,18 +2,25 @@ package com.tzld.piaoquan.sde.integration;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.TypeReference;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.tzld.piaoquan.sde.common.enums.ContentTypeEnum;
 import com.tzld.piaoquan.sde.common.enums.ExceptionEnum;
-import com.tzld.piaoquan.sde.common.enums.deconstruction.DeconstructionTaskStatusEnum;
+import com.tzld.piaoquan.sde.common.enums.IsDeleteEnum;
 import com.tzld.piaoquan.sde.common.enums.deconstruction.SenceTypeEnum;
 import com.tzld.piaoquan.sde.common.exception.BizException;
 import com.tzld.piaoquan.sde.common.exception.HttpServiceException;
+//import com.tzld.piaoquan.sde.mapper.SdContentMapper;
+//import com.tzld.piaoquan.sde.mapper.SdExecutionTaskContentRelMapper;
+import com.tzld.piaoquan.sde.mapper.SdExecutionTaskContentMapper;
 import com.tzld.piaoquan.sde.model.dto.deconstruction.ApiResponse;
 import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
 import com.tzld.piaoquan.sde.model.dto.deconstruction.SubmitResponseDataDTO;
 import com.tzld.piaoquan.sde.model.dto.longvideoapi.JsonView;
 import com.tzld.piaoquan.sde.model.dto.longvideoapi.WxVideoV2VO;
+//import com.tzld.piaoquan.sde.model.entity.SdContent;
 import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
+import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
+import com.tzld.piaoquan.sde.model.request.ClusterTaskSubmitParam;
 import com.tzld.piaoquan.sde.model.request.DeconstructionTaskSubmitParam;
 import com.tzld.piaoquan.sde.model.request.VideoInfoGetParam;
 import lombok.extern.slf4j.Slf4j;
@@ -21,9 +28,8 @@ import okhttp3.*;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
-import javax.management.Query;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
@@ -34,15 +40,21 @@ import java.util.concurrent.TimeUnit;
  */
 @Slf4j
 @Service
-public class ContentDeconstructionClient {
+public class ContentDeconstructionClusterClient {
 
     private static final MediaType MEDIA_TYPE = MediaType.parse("application/json, charset=utf-8");
+    private final SdExecutionTaskContentMapper sdExecutionTaskContentMapper;
+//    private final SdContentMapper sdContentMapper;
+//    private final SdExecutionTaskContentRelMapper sdExecutionTaskContentRelMapper;
 
     @Value("${deconstruction.task.submit.url:http://supply-content-deconstruction-api.piaoquantv.com/api/v1/content/tasks/decode}")
     private String deconstructionTaskSubmitUrl;
 
     @Value("${deconstruction.task.query.url:http://supply-content-deconstruction-api.piaoquantv.com/api/v1/content/tasks/%s}")
-    private String deconstructionTaskQueryUrl;
+    private String taskQueryUrl;
+
+    @Value("${cluster.task.submit.url:http://supply-content-deconstruction-api.piaoquantv.com/api/v1/content/tasks/pattern}")
+    private String clusterTaskSubmitUrl;
 
     @Value("${longvideoapi.videoinfo.get.url:http://longvideoapi-internal.piaoquantv.com/longvideoapi/openapi/video/getVideoInfo}")
     private String longVideoApiVideoInfoGetUrl;
@@ -50,6 +62,10 @@ public class ContentDeconstructionClient {
     private static final OkHttpClient client = new OkHttpClient().newBuilder().connectTimeout(10, TimeUnit.SECONDS)
             .readTimeout(10, TimeUnit.SECONDS).writeTimeout(10, TimeUnit.SECONDS).build();
 
+    public ContentDeconstructionClusterClient(SdExecutionTaskContentMapper sdExecutionTaskContentMapper) {
+        this.sdExecutionTaskContentMapper = sdExecutionTaskContentMapper;
+    }
+
     /**
      * 提交解构任务
      *
@@ -57,15 +73,24 @@ public class ContentDeconstructionClient {
      * @return 解构任务ID
      */
     public String submitDeconstructionTask(SdExecutionTask sdExecutionTask) {
-        if (Objects.isNull(sdExecutionTask) || Objects.isNull(sdExecutionTask.getContentType())
-                || Objects.isNull(sdExecutionTask.getContentId())) {
+        if (Objects.isNull(sdExecutionTask) || Objects.isNull(sdExecutionTask.getId())
+                || Objects.isNull(sdExecutionTask.getContentType())) {
             throw new IllegalArgumentException("sdExecutionTask is null");
         }
         DeconstructionTaskSubmitParam param = new DeconstructionTaskSubmitParam();
         param.setScene(SenceTypeEnum.TOPIC_SELECTION.getValue());
         param.setContent_type(sdExecutionTask.getContentType());
         DeconstructionTaskSubmitParam.Content content = new DeconstructionTaskSubmitParam.Content();
-        String contentId = sdExecutionTask.getContentId();
+        Long executionTaskId = sdExecutionTask.getId();
+        LambdaQueryWrapper<SdExecutionTaskContent> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(SdExecutionTaskContent::getExecutionTaskId, executionTaskId)
+                .eq(SdExecutionTaskContent::getIsDeleted, IsDeleteEnum.NORMAL.getValue());
+        List<SdExecutionTaskContent> contentList = sdExecutionTaskContentMapper.selectList(wrapper);
+        if (contentList == null || contentList.isEmpty()) {
+            throw new BizException(ExceptionEnum.DATA_NOT_EXIST, "任务内容不存在");
+        }
+//        String contentId = sdExecutionTask.getContentId();
+        String contentId = contentList.get(0).getContentId();
         content.setChannel_content_id(contentId);
         if (Objects.equals(sdExecutionTask.getContentType(), ContentTypeEnum.VIDEO.getValue())) {
             Long videoId = Long.parseLong(contentId);
@@ -155,8 +180,8 @@ public class ContentDeconstructionClient {
      * @param jobId 解构任务ID
      * @return 解构结果
      */
-    public QueryResponseDataDTO getDeconstructionTaskResult(String jobId) {
-        String url = String.format(deconstructionTaskQueryUrl, jobId);
+    public QueryResponseDataDTO getTaskResult(String jobId) {
+        String url = String.format(taskQueryUrl, jobId);
         Request request = new Request.Builder()
                 .url(url)
                 .get()
@@ -179,4 +204,80 @@ public class ContentDeconstructionClient {
             throw new RuntimeException(e);
         }
     }
+
+    /**
+     * 提交聚类任务
+     *
+     * @param sdExecutionTask
+     * @return
+     */
+    public String submitClusterTask(SdExecutionTask sdExecutionTask) {
+        if (Objects.isNull(sdExecutionTask) || Objects.isNull(sdExecutionTask.getContentType())) {
+            throw new IllegalArgumentException("sdExecutionTask is null");
+        }
+        ClusterTaskSubmitParam param = new ClusterTaskSubmitParam();
+        param.setScene(SenceTypeEnum.TOPIC_SELECTION.getValue());
+        param.setContent_type(sdExecutionTask.getContentType());
+        DeconstructionTaskSubmitParam.Content content = new DeconstructionTaskSubmitParam.Content();
+        Long executionTaskId = sdExecutionTask.getId();
+        LambdaQueryWrapper<SdExecutionTaskContent> wrapper = new LambdaQueryWrapper<>();
+        wrapper.eq(SdExecutionTaskContent::getExecutionTaskId, executionTaskId)
+                .eq(SdExecutionTaskContent::getIsDeleted, IsDeleteEnum.NORMAL.getValue());
+        List<SdExecutionTaskContent> contentList = sdExecutionTaskContentMapper.selectList(wrapper);
+        if (contentList == null || contentList.isEmpty()) {
+            throw new BizException(ExceptionEnum.DATA_NOT_EXIST, "任务内容不存在");
+        }
+//        String contentId = sdExecutionTask.getContentId();
+        String contentId = contentList.get(0).getContentId();
+        content.setChannel_content_id(contentId);
+        if (Objects.equals(sdExecutionTask.getContentType(), ContentTypeEnum.VIDEO.getValue())) {
+            Long videoId = Long.parseLong(contentId);
+            //获取视频信息
+            WxVideoV2VO wxVideoV2VO = getVideoInfo(videoId);
+            if (Objects.isNull(wxVideoV2VO)) {
+                throw new BizException(ExceptionEnum.DATA_NOT_EXIST, "videoInfo get error videoId:" + videoId);
+            }
+            String videoPath = wxVideoV2VO.getVideoPath();
+            if (Objects.isNull(videoPath) || videoPath.endsWith(".m3u8")) {
+                throw new BizException(ExceptionEnum.DATA_ERROR, "视频格式不支持解构 videoId:" + videoId + ",videoPath:" + videoPath);
+            }
+            content.setTitle(wxVideoV2VO.getTitle());
+            content.setVideo_url(videoPath);
+            content.setChannel_account_id(wxVideoV2VO.getUid().toString());
+            if (Objects.nonNull(wxVideoV2VO.getUser())) {
+                content.setChannel_account_name(wxVideoV2VO.getUser().getNickName());
+            }
+        }
+//        param.setContents(content);
+        //提交任务
+        return submitClusterTask(param);
+    }
+
+
+    private String submitClusterTask(ClusterTaskSubmitParam param) {
+        RequestBody body = RequestBody.create(MEDIA_TYPE, JSON.toJSONString(param));
+        Request request = new Request.Builder()
+                .url(deconstructionTaskSubmitUrl)
+                .post(body)
+                .addHeader("Content-Type", "application/json")
+                .addHeader("Accept", "application/json")
+                .build();
+        log.info("submitDeconstructionTask deconstructionTaskSubmitUrl= {} param = {}", deconstructionTaskSubmitUrl, JSON.toJSONString(param));
+        try (Response response = client.newCall(request).execute()) {
+            if (!response.isSuccessful()) {
+                throw new HttpServiceException(ExceptionEnum.HTTP_REQUEST_ERROR.getCode(), "code: " + response.code() + ", msg: " + response.message());
+            }
+            String respBody = response.body().string();
+            log.info("submitDeconstructionTask respBody = {}", respBody);
+            // 5. 反序列化返回值
+            ApiResponse<SubmitResponseDataDTO> apiResponse = JSON.parseObject(respBody, new TypeReference<ApiResponse<SubmitResponseDataDTO>>() {
+            });
+            if (apiResponse.getCode() != 0) {
+                throw new BizException(ExceptionEnum.API_REQUEST_ERROR.getCode(), JSON.toJSONString(apiResponse));
+            }
+            return apiResponse.getData().getTaskId();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
 }

+ 13 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/mapper/SdExecutionTaskContentMapper.java

@@ -0,0 +1,13 @@
+package com.tzld.piaoquan.sde.mapper;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
+import org.apache.ibatis.annotations.Mapper;
+
+import java.util.List;
+
+@Mapper
+public interface SdExecutionTaskContentMapper extends BaseMapper<SdExecutionTaskContent> {
+
+    int insertBatchSomeColumn(List<SdExecutionTaskContent> list);
+}

+ 4 - 4
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdExecutionTask.java

@@ -30,10 +30,10 @@ public class SdExecutionTask {
      * @see com.tzld.piaoquan.sde.common.enums.ContentTypeEnum
      */
     private Integer contentType;
-    /**
-     * 内容ID,videoId等
-     */
-    private String contentId;
+//    /**
+//     * 内容ID,videoId等
+//     */
+//    private String contentId;
     /**
      * 执行任务编号:1 DECONSTRUCT(解构), 2 CLUSTER(聚类)
      *

+ 53 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdExecutionTaskContent.java

@@ -0,0 +1,53 @@
+package com.tzld.piaoquan.sde.model.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+import java.util.Date;
+
+/**
+ * 内容
+ *
+ * @author supeng
+ */
+@Data
+@TableName("sd_execution_task_content")
+public class SdExecutionTaskContent {
+    /**
+     * 主键ID
+     */
+    @TableId(type = IdType.AUTO)
+    private Long id;
+    /**
+     * 执行任务ID
+     */
+    private Long executionTaskId;
+    /**
+     * 内容类型
+     *
+     * @see com.tzld.piaoquan.sde.common.enums.ContentTypeEnum
+     */
+    private Integer contentType;
+    /**
+     * 业务侧内容ID,videoId等
+     */
+    private String contentId;
+    /**
+     * 内容:文本/URL等
+     */
+    private String content;
+    /**
+     * 记录创建时间,由数据库自动生成
+     */
+    private Date createTime;
+    /**
+     * 记录最后更新时间,由数据库自动维护
+     */
+    private Date updateTime;
+    /**
+     * 逻辑删除标记:0-正常状态,1-已删除
+     */
+    private Integer isDeleted;
+}

+ 23 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/request/ClusterTaskSubmitParam.java

@@ -0,0 +1,23 @@
+package com.tzld.piaoquan.sde.model.request;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class ClusterTaskSubmitParam {
+    private Integer scene;
+    private Integer content_type;
+    private List<Content> contents;
+
+    @Data
+    public static class Content {
+        private String channel_content_id;
+        private String video_url;
+        private List<String> images;
+        private String body_text;
+        private String title;
+        private String channel_account_id;
+        private String channel_account_name;
+    }
+}

+ 29 - 5
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/request/ExecutionTaskListParam.java

@@ -3,6 +3,8 @@ package com.tzld.piaoquan.sde.model.request;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
+import java.util.Date;
+
 /**
  * 任务创建参数
  *
@@ -12,15 +14,25 @@ import lombok.EqualsAndHashCode;
 @Data
 public class ExecutionTaskListParam extends PageParam {
     /**
-     * 父任务ID,关联 sd_task.id
+     * 执行任务编号,用于异步回调时的精确匹配 & 对外暴露
      */
-    private Long taskId;
+    private String taskNo;
     /**
-     * 子任务业务编号,用于异步回调时的精确匹配 & 对外暴露
+     * 内容类型
+     *
+     * @see com.tzld.piaoquan.sde.common.enums.ContentTypeEnum
      */
-    private String taskNo;
+    private Integer contentType;
+    /**
+     * 内容ID,videoId等
+     */
+    private String contentId;
+    /**
+     * 视频 管理后台地址
+     */
+    private String contentDetailUrl;
     /**
-     * 子任务类型:1 DECONSTRUCT(解构), 2 CLUSTER(聚类)
+     * 执行任务编号:1 DECONSTRUCT(解构), 2 CLUSTER(聚类)
      *
      * @see com.tzld.piaoquan.sde.common.enums.ExecutionTaskTypeEnum
      */
@@ -35,4 +47,16 @@ public class ExecutionTaskListParam extends PageParam {
      * 第三方服务的JobID
      */
     private String externalJobId;
+    /**
+     * 执行失败时的错误详细描述
+     */
+    private String errorMsg;
+    /**
+     * 记录创建时间,由数据库自动生成
+     */
+    private Date createTime;
+    /**
+     * 记录最后更新时间,由数据库自动维护
+     */
+    private Date updateTime;
 }

+ 14 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ExecutionTaskCreateService.java

@@ -0,0 +1,14 @@
+package com.tzld.piaoquan.sde.service;
+
+import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
+import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
+
+import java.util.List;
+
+/**
+ * @author supeng
+ */
+public interface ExecutionTaskCreateService {
+
+    boolean create(SdExecutionTask sdSubTask, List<SdExecutionTaskContent> contentList);
+}

+ 2 - 1
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/ExecutionTaskService.java

@@ -3,6 +3,7 @@ package com.tzld.piaoquan.sde.service;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.tzld.piaoquan.sde.common.api.CommonRequest;
 import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
+import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
 import com.tzld.piaoquan.sde.model.request.ExecutionTaskGetParam;
 import com.tzld.piaoquan.sde.model.request.ExecutionTaskListParam;
 import com.tzld.piaoquan.sde.model.vo.SdExecutionTaskVO;
@@ -14,7 +15,7 @@ import java.util.List;
  */
 public interface ExecutionTaskService {
 
-    void create(SdExecutionTask sdSubTask);
+    boolean create(SdExecutionTask sdSubTask);
 
     void create(List<SdExecutionTask> sdSubTasks);
 

+ 66 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskCreateServiceImpl.java

@@ -0,0 +1,66 @@
+package com.tzld.piaoquan.sde.service.impl;
+
+import com.aliyun.odps.data.Record;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.tzld.piaoquan.sde.common.api.CommonRequest;
+import com.tzld.piaoquan.sde.common.enums.*;
+import com.tzld.piaoquan.sde.common.enums.deconstruction.DeconstructionTaskStatusEnum;
+import com.tzld.piaoquan.sde.common.exception.HttpServiceException;
+import com.tzld.piaoquan.sde.integration.ContentDeconstructionClusterClient;
+import com.tzld.piaoquan.sde.mapper.*;
+import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
+import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
+import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
+import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskRawResult;
+import com.tzld.piaoquan.sde.model.request.ExecutionTaskGetParam;
+import com.tzld.piaoquan.sde.model.request.ExecutionTaskListParam;
+import com.tzld.piaoquan.sde.model.vo.SdExecutionTaskVO;
+import com.tzld.piaoquan.sde.service.ExecutionTaskCreateService;
+import com.tzld.piaoquan.sde.service.ExecutionTaskService;
+import com.tzld.piaoquan.sde.util.DateUtil;
+import com.tzld.piaoquan.sde.util.IdGeneratorUtil;
+import com.tzld.piaoquan.sde.util.OdpsManager;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author supeng
+ */
+@Slf4j
+@Service
+public class ExecutionTaskCreateServiceImpl implements ExecutionTaskCreateService {
+
+    @Autowired
+    private SdExecutionTaskMapper sdExecutionTaskMapper;
+    @Autowired
+    private SdExecutionTaskContentMapper sdExecutionTaskContentMapper;
+
+    @Transactional(rollbackFor = Exception.class)
+    @Override
+    public boolean create(SdExecutionTask sdExecutionTask, List<SdExecutionTaskContent> contentList) {
+        int rows = sdExecutionTaskMapper.insert(sdExecutionTask);
+        log.info("create Execution task rows = {}", rows);
+        if (rows > 0 && Objects.nonNull(contentList)) {
+            Long executionTaskId = sdExecutionTask.getId();
+            for (SdExecutionTaskContent content : contentList) {
+                content.setExecutionTaskId(executionTaskId);
+            }
+            int batchRows = sdExecutionTaskContentMapper.insertBatchSomeColumn(contentList);
+            log.info("create Execution task batch rows = {}", batchRows);
+            return batchRows > 0;
+        }
+        return false;
+    }
+
+}

+ 27 - 18
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/ExecutionTaskServiceImpl.java

@@ -8,17 +8,17 @@ import com.tzld.piaoquan.sde.common.api.CommonRequest;
 import com.tzld.piaoquan.sde.common.enums.*;
 import com.tzld.piaoquan.sde.common.enums.deconstruction.DeconstructionTaskStatusEnum;
 import com.tzld.piaoquan.sde.common.exception.HttpServiceException;
-import com.tzld.piaoquan.sde.integration.ContentDeconstructionClient;
-import com.tzld.piaoquan.sde.mapper.SdExecutionTaskMapper;
-import com.tzld.piaoquan.sde.mapper.SdExecutionTaskRawResultMapper;
-import com.tzld.piaoquan.sde.mapper.SdExecutionTaskResultItemMapper;
-import com.tzld.piaoquan.sde.mapper.SdWorkflowTaskMapper;
+import com.tzld.piaoquan.sde.integration.ContentDeconstructionClusterClient;
+import com.tzld.piaoquan.sde.mapper.*;
 import com.tzld.piaoquan.sde.model.dto.deconstruction.QueryResponseDataDTO;
+//import com.tzld.piaoquan.sde.model.entity.SdContent;
 import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
+import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskContent;
 import com.tzld.piaoquan.sde.model.entity.SdExecutionTaskRawResult;
 import com.tzld.piaoquan.sde.model.request.ExecutionTaskGetParam;
 import com.tzld.piaoquan.sde.model.request.ExecutionTaskListParam;
 import com.tzld.piaoquan.sde.model.vo.SdExecutionTaskVO;
+import com.tzld.piaoquan.sde.service.ExecutionTaskCreateService;
 import com.tzld.piaoquan.sde.service.ExecutionTaskService;
 import com.tzld.piaoquan.sde.util.DateUtil;
 import com.tzld.piaoquan.sde.util.IdGeneratorUtil;
@@ -28,13 +28,11 @@ import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.time.LocalDateTime;
 import java.time.ZoneId;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Objects;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -45,8 +43,7 @@ import java.util.concurrent.TimeUnit;
 public class ExecutionTaskServiceImpl implements ExecutionTaskService {
 
     @Autowired
-    private ContentDeconstructionClient contentDeconstructionClient;
-
+    private ContentDeconstructionClusterClient contentDeconstructionClusterClient;
     @Autowired
     private SdExecutionTaskMapper sdExecutionTaskMapper;
     @Autowired
@@ -55,6 +52,8 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
     private SdExecutionTaskResultItemMapper sdExecutionTaskResultItemMapper;
     @Autowired
     private SdWorkflowTaskMapper sdWorkflowTaskMapper;
+    @Autowired
+    private SdExecutionTaskContentMapper sdExecutionTaskContentMapper;
 
     @Value("${yesterday.return.video.table:loghubods.lastday_return}")
     private String yesterdayReturnVideoTable;
@@ -64,10 +63,14 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
     @Autowired
     private OdpsManager odpsManager;
 
+    @Autowired
+    private ExecutionTaskCreateService executionTaskCreateService;
+
     @Override
-    public void create(SdExecutionTask sdExecutionTask) {
+    public boolean create(SdExecutionTask sdExecutionTask) {
         int rows = sdExecutionTaskMapper.insert(sdExecutionTask);
         log.info("create Execution task rows = {}", rows);
+        return rows > 0;
     }
 
     @Override
@@ -82,7 +85,6 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
         ExecutionTaskListParam params = request.getParams();
         int pageNo = params.getPageNo();
         int pageSize = params.getPageSize();
-        Long taskId = params.getTaskId();
         String executionTaskNo = params.getTaskNo();
         Integer taskStatus = params.getTaskStatus();
         Integer executionTaskType = params.getTaskType();
@@ -154,9 +156,10 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
                 String jobId = null;
                 switch (executionTaskTypeEnum) {
                     case DECONSTRUCT:
-                        jobId = contentDeconstructionClient.submitDeconstructionTask(sdExecutionTask);
+                        jobId = contentDeconstructionClusterClient.submitDeconstructionTask(sdExecutionTask);
                         break;
                     case CLUSTER:
+//                        jobId = contentDeconstructionClusterClient.submitClusterTask(sdExecutionTask);
                         break;
                     default:
                         log.error("executionTaskTypeEnum is illegal.");
@@ -204,7 +207,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
                         .minusHours(6)
                         .atZone(ZoneId.systemDefault())
                         .toInstant());
-        List<Integer> taskStatusList = Arrays.asList(ExecutionTaskStatusEnum.SUBMITTED.getValue(),ExecutionTaskStatusEnum.RUNNING.getValue());
+        List<Integer> taskStatusList = Arrays.asList(ExecutionTaskStatusEnum.SUBMITTED.getValue(), ExecutionTaskStatusEnum.RUNNING.getValue());
         LambdaQueryWrapper<SdExecutionTask> wrapper = Wrappers.lambdaQuery(SdExecutionTask.class)
                 .eq(SdExecutionTask::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
                 .in(SdExecutionTask::getTaskStatus, taskStatusList)
@@ -216,7 +219,7 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
         }
         for (SdExecutionTask sdExecutionTask : list) {
             try {
-                QueryResponseDataDTO queryResponseDataDTO = contentDeconstructionClient.getDeconstructionTaskResult(sdExecutionTask.getExternalJobId());
+                QueryResponseDataDTO queryResponseDataDTO = contentDeconstructionClusterClient.getTaskResult(sdExecutionTask.getExternalJobId());
                 DeconstructionTaskStatusEnum deconstructionTaskStatusEnum = DeconstructionTaskStatusEnum.getInstance(queryResponseDataDTO.getStatus());
                 if (Objects.isNull(deconstructionTaskStatusEnum)) {
                     continue;
@@ -301,8 +304,14 @@ public class ExecutionTaskServiceImpl implements ExecutionTaskService {
                 sdExecutionTask.setTaskType(TaskTypeEnum.DECONSTRUCT.getValue());
                 sdExecutionTask.setTaskStatus(ExecutionTaskStatusEnum.INIT.getValue());
                 sdExecutionTask.setContentType(ContentTypeEnum.VIDEO.getValue());
-                sdExecutionTask.setContentId(videoId);
-                create(sdExecutionTask);
+//                sdExecutionTask.setContentId(videoId);
+                SdExecutionTaskContent sdExecutionTaskContent = new SdExecutionTaskContent();
+                sdExecutionTaskContent.setContentType(ContentTypeEnum.VIDEO.getValue());
+                sdExecutionTaskContent.setContentId(videoId);
+                List<SdExecutionTaskContent> contentList = new ArrayList<>();
+                contentList.add(sdExecutionTaskContent);
+                boolean createResult = executionTaskCreateService.create(sdExecutionTask, contentList);
+                log.info("yesterdayTopReturnVideoExecutionTaskCreateHandler sdExecutionTask create videoId = {} result={}", videoId, createResult);
                 count++;
             } catch (Exception e) {
                 log.error("yesterdayTopReturnVideoExecutionTaskCreateHandler error {}", record, e);

+ 5 - 2
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/WorkflowTaskServiceImpl.java

@@ -55,7 +55,7 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
     private OpenRouterClient openRouterServiceClient;
     @Autowired
     private OpenRouterClient openRouterClient;
-//    @Autowired
+    //    @Autowired
 //    private Map<Integer, DemandExtractionStrategy> strategyMap;
     @Autowired
     private SdWorkflowTaskResultMapper sdWorkflowTaskResultMapper;
@@ -96,9 +96,12 @@ public class WorkflowTaskServiceImpl implements WorkflowTaskService {
             SdExecutionTask sdExecutionTask = new SdExecutionTask();
             String executionTaskNo = IdGeneratorUtil.generateExecutionTaskNo();
             sdExecutionTask.setTaskNo(executionTaskNo);
-            sdExecutionTask.setContentId(String.valueOf(videoId));
+//            sdExecutionTask.setContentId(String.valueOf(videoId));
             sdExecutionTask.setTaskStatus(WorkflowTaskStatusEnum.INIT.getValue());
             int subRows = sdExecutionTaskMapper.insert(sdExecutionTask);
+            if (subRows > 0) {
+
+            }
             total += subRows;
             log.info("create ExecutionTask rows = {}", subRows);
         }

+ 4 - 4
supply-demand-engine-job/src/test/java/com/tzld/piaoquan/sde/ContentDeconstructionClientTest.java → supply-demand-engine-job/src/test/java/com/tzld/piaoquan/sde/ContentDeconstructionClusterClientTest.java

@@ -1,15 +1,15 @@
 package com.tzld.piaoquan.sde;
 
-import com.tzld.piaoquan.sde.integration.ContentDeconstructionClient;
+import com.tzld.piaoquan.sde.integration.ContentDeconstructionClusterClient;
 import com.tzld.piaoquan.sde.mapper.SdExecutionTaskMapper;
 import com.tzld.piaoquan.sde.model.entity.SdExecutionTask;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
-public class ContentDeconstructionClientTest extends BaseTest {
+public class ContentDeconstructionClusterClientTest extends BaseTest {
 
     @Autowired
-    private ContentDeconstructionClient contentDeconstructionClient;
+    private ContentDeconstructionClusterClient contentDeconstructionClusterClient;
 
     @Autowired
     private SdExecutionTaskMapper sdExecutionTaskMapper;
@@ -17,7 +17,7 @@ public class ContentDeconstructionClientTest extends BaseTest {
     @Test
     public void testDeconstruction() {
         SdExecutionTask sdExecutionTask = sdExecutionTaskMapper.selectById(1);
-        contentDeconstructionClient.submitDeconstructionTask(sdExecutionTask);
+        contentDeconstructionClusterClient.submitDeconstructionTask(sdExecutionTask);
     }
 
 }