Explorar el Código

update job handler

supeng hace 6 horas
padre
commit
4c54f6812f
Se han modificado 16 ficheros con 554 adiciones y 169 borrados
  1. 74 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/config/ThreadPoolConfig.java
  2. 37 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/ContentDeconstructionClient.java
  3. 44 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/LoghubClient.java
  4. 12 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/OpenRouterClient.java
  5. 4 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdSubTask.java
  6. 0 4
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdSubTaskResultItem.java
  7. 0 8
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/LoghubService.java
  8. 0 8
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/OpenRouterService.java
  9. 6 0
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/SubTaskService.java
  10. 9 1
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/TaskService.java
  11. 0 75
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/LoghubServiceImpl.java
  12. 0 14
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/OpenRouterServiceImpl.java
  13. 109 2
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/SubTaskServiceImpl.java
  14. 199 13
      supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/TaskServiceImpl.java
  15. 60 0
      supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/SubTaskJob.java
  16. 0 44
      supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/TaskJob.java

+ 74 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/config/ThreadPoolConfig.java

@@ -0,0 +1,74 @@
+package com.tzld.piaoquan.sde.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+@Slf4j
+@Configuration
+public class ThreadPoolConfig {
+
+    /**
+     * 通用线程池(尽量按照需求创建单独的线程池使用)
+     * 加了 @Primary,方便 @Async 默认使用
+     */
+    @Bean("commonExecutor")
+    @Primary
+    public Executor commonExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(16);
+        executor.setMaxPoolSize(32);
+        executor.setQueueCapacity(200);
+        executor.setThreadNamePrefix("common-pool-");
+        // 核心业务通常使用 CallerRunsPolicy,在这个策略下,任务如果被拒绝,会由调用线程(主线程)执行
+        // 这样可以减缓提交速度,但保证业务不丢
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        executor.setAwaitTerminationSeconds(60);
+        executor.initialize();
+        return executor;
+    }
+
+    /**
+     * 阿里云日志上报线程池
+     * 特点:IO 密集、允许丢失、队列大、避免影响主业务
+     */
+    @Bean("loghubExecutor")
+    public Executor loghubExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(8);
+        executor.setMaxPoolSize(16);
+        executor.setQueueCapacity(100000);
+        executor.setThreadNamePrefix("loghub-pool-");
+        //抛异常
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
+        // 自定义拒绝策略,打印日志
+//        executor.setRejectedExecutionHandler((r, exe) -> {
+//            log.error("阿里云日志线程池已满,正在丢弃任务!请检查日志服务健康状况。");
+//        });
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        executor.initialize();
+        return executor;
+    }
+
+    /**
+     * 3. 第三方调用/通知专用线程池 (可选)
+     * 如:发送邮件、短信、Webhook
+     */
+    @Bean("subTaskExecutor")
+    public Executor subTaskExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(4);
+        executor.setMaxPoolSize(8);
+        executor.setQueueCapacity(500);
+        executor.setThreadNamePrefix("subtask-pool-");
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        executor.initialize();
+        return executor;
+    }
+}

+ 37 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/ContentDeconstructionClient.java

@@ -0,0 +1,37 @@
+package com.tzld.piaoquan.sde.integration;
+
+import com.tzld.piaoquan.sde.model.entity.SdSubTask;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+ * 解构
+ *
+ * @author supeng
+ */
+@Slf4j
+@Service
+public class ContentDeconstructionClient {
+    /**
+     * 提交解构任务
+     *
+     * @param sdSubTask 子任务
+     * @return 解构任务ID
+     */
+    public String submitDeconstructionTask(SdSubTask sdSubTask) {
+        if (sdSubTask == null) {
+
+        }
+        return "";
+    }
+
+    /**
+     * 获取解构任务结果
+     *
+     * @param jobId 解构任务ID
+     * @return 解构结果
+     */
+    public String getDeconstructionTaskResult(String jobId) {
+        return "";
+    }
+}

+ 44 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/LoghubClient.java

@@ -0,0 +1,44 @@
+package com.tzld.piaoquan.sde.integration;
+
+import com.tzld.commons.aliyun.log.AliyunLogManager;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.concurrent.*;
+
+@Slf4j
+@Service
+public class LoghubClient {
+
+    @Value("${aliyun.log.project}")
+    private String project;
+    @Value("${aliyun.log.logstore.action}")
+    private String actionLogStore;
+
+    @Autowired
+    private Executor loghubExecutor;
+
+    @Autowired
+    private AliyunLogManager aliyunLogManager;
+
+    public void addLog(String logProject, String logStore, String logTopic, Map<String, Object> map) {
+        loghubExecutor.execute(() -> submitLog(logProject, logStore, logTopic, map));
+    }
+
+    public void addActionLog(Map<String, Object> map) {
+        loghubExecutor.execute(() -> submitLog(project, actionLogStore, "", map));
+    }
+
+    private void submitLog(String project, String logStore, String topic, Map<String, Object> data) {
+        try {
+            aliyunLogManager.sendLog(project, logStore, topic, data);
+        } catch (Exception e) {
+            log.error("调用阿里云logHub异常", e);
+        }
+    }
+
+
+}

+ 12 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/integration/OpenRouterClient.java

@@ -0,0 +1,12 @@
+package com.tzld.piaoquan.sde.integration;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author supeng
+ */
+@Slf4j
+@Service
+public class OpenRouterClient {
+}

+ 4 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdSubTask.java

@@ -30,6 +30,10 @@ public class SdSubTask {
      * 子任务业务编号,用于异步回调时的精确匹配 & 对外暴露
      */
     private String subTaskNo;
+    /**
+     * 内容ID,videoId等
+     */
+    private String contentId;
     /**
      * 子任务类型:1 DECONSTRUCT(解构), 2 CLUSTER(聚类)
      *

+ 0 - 4
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/model/entity/SdSubTaskResultItem.java

@@ -20,10 +20,6 @@ public class SdSubTaskResultItem {
      */
     @TableId(type = IdType.AUTO)
     private Long id;
-    /**
-     * 冗余主任务ID,支持直接跨过子任务查全量结果,关联 sd_task.id
-     */
-    private Long taskId;
     /**
      * 来源子任务ID,关联 sd_sub_task.id
      */

+ 0 - 8
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/LoghubService.java

@@ -1,8 +0,0 @@
-package com.tzld.piaoquan.sde.service;
-
-import java.util.Map;
-
-public interface LoghubService {
-    void addLog(String logProject, String logStore, String logTopic, Map<String, Object> map);
-    void addActionLog(Map<String, Object> map);
-}

+ 0 - 8
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/OpenRouterService.java

@@ -1,8 +0,0 @@
-package com.tzld.piaoquan.sde.service;
-
-/**
- *
- * @author supeng
- */
-public interface OpenRouterService {
-}

+ 6 - 0
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/SubTaskService.java

@@ -7,11 +7,17 @@ import com.tzld.piaoquan.sde.model.request.SubTaskListParam;
 import com.tzld.piaoquan.sde.model.entity.SdSubTask;
 import com.tzld.piaoquan.sde.model.vo.SdSubTaskVO;
 
+import java.util.List;
+
 /**
  * @author supeng
  */
 public interface SubTaskService {
 
+    void create(SdSubTask sdSubTask);
+
+    void create(List<SdSubTask> sdSubTasks);
+
     /**
      * 任务列表
      *

+ 9 - 1
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/TaskService.java

@@ -9,29 +9,37 @@ import com.tzld.piaoquan.sde.model.entity.SdTask;
 import com.tzld.piaoquan.sde.model.vo.SdTaskVO;
 
 /**
- *
  * @author supeng
  */
 public interface TaskService {
     /**
      * 创建任务
+     *
      * @param request
      */
     void create(CommonRequest<TaskCreateParam> request);
 
     /**
      * 任务列表
+     *
      * @param request
      */
     Page<SdTaskVO> list(CommonRequest<TaskListParam> request);
 
     /**
      * 获取任务信息
+     *
      * @param request
      */
     SdTask get(CommonRequest<TaskGetParam> request);
 
+    /**
+     * 任务状态检查更新
+     */
     void taskStatusCheckHandler();
 
+    /**
+     * 任务执行
+     */
     void taskExecuteHandler();
 }

+ 0 - 75
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/LoghubServiceImpl.java

@@ -1,75 +0,0 @@
-package com.tzld.piaoquan.sde.service.impl;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.tzld.commons.aliyun.log.AliyunLogManager;
-import com.tzld.piaoquan.sde.service.LoghubService;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-@Slf4j
-@Service
-public class LoghubServiceImpl implements LoghubService {
-
-    @Value("${aliyun.log.project}")
-    private String project;
-    @Value("${aliyun.log.logstore.action}")
-    private String actionLogStore;
-
-    /**
-     * 线程池队列大小
-     */
-    private static final int QUEUE_MAX_SIZE = 100000;
-    /**
-     * 线程命名
-     */
-    private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("loghub-service-pool-%d").build();
-
-    private static ThreadPoolExecutor executor;
-
-    @Autowired
-    private AliyunLogManager aliyunLogManager;
-
-
-    @PostConstruct
-    public void init() {
-        //init thread pool
-        executor = new ThreadPoolExecutor(64, 64,
-                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(QUEUE_MAX_SIZE), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());
-    }
-
-    @PreDestroy
-    public void destroy() {
-        //gracefully shutdown
-        executor.shutdown();
-    }
-
-    @Override
-    public void addLog(String logProject, String logStore, String logTopic, Map<String, Object> map) {
-        executor.execute(() -> submitLog(logProject, logStore, logTopic, map));
-    }
-
-    @Override
-    public void addActionLog(Map<String, Object> map) {
-        executor.execute(() -> submitLog(project, actionLogStore, "", map));
-    }
-
-    private void submitLog(String project, String logStore, String topic, Map<String, Object> data) {
-        try {
-            aliyunLogManager.sendLog(project, logStore, topic, data);
-        } catch (Exception e) {
-            log.error("调用阿里云logHub异常", e);
-        }
-    }
-
-
-}

+ 0 - 14
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/OpenRouterServiceImpl.java

@@ -1,14 +0,0 @@
-package com.tzld.piaoquan.sde.service.impl;
-
-import com.tzld.piaoquan.sde.service.OpenRouterService;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-/**
- * @author supeng
- */
-@Slf4j
-@Service
-public class OpenRouterServiceImpl implements OpenRouterService {
-
-}

+ 109 - 2
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/SubTaskServiceImpl.java

@@ -5,9 +5,15 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.tzld.piaoquan.sde.common.api.CommonRequest;
 import com.tzld.piaoquan.sde.common.enums.IsDeleteEnum;
+import com.tzld.piaoquan.sde.common.enums.SubTaskStatusEnum;
 import com.tzld.piaoquan.sde.common.enums.SubTaskTypeEnum;
 import com.tzld.piaoquan.sde.common.enums.TaskStatusEnum;
+import com.tzld.piaoquan.sde.integration.ContentDeconstructionClient;
 import com.tzld.piaoquan.sde.mapper.SdSubTaskMapper;
+import com.tzld.piaoquan.sde.mapper.SdSubTaskRawResultMapper;
+import com.tzld.piaoquan.sde.mapper.SdSubTaskResultItemMapper;
+import com.tzld.piaoquan.sde.mapper.SdTaskMapper;
+import com.tzld.piaoquan.sde.model.entity.SdSubTaskRawResult;
 import com.tzld.piaoquan.sde.model.request.SubTaskGetParam;
 import com.tzld.piaoquan.sde.model.request.SubTaskListParam;
 import com.tzld.piaoquan.sde.model.entity.SdSubTask;
@@ -18,7 +24,12 @@ import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Date;
+import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author supeng
@@ -27,8 +38,30 @@ import java.util.Objects;
 @Service
 public class SubTaskServiceImpl implements SubTaskService {
 
+    @Autowired
+    private ContentDeconstructionClient contentDeconstructionClient;
+
     @Autowired
     private SdSubTaskMapper sdSubTaskMapper;
+    @Autowired
+    private SdSubTaskRawResultMapper sdSubTaskRawResultMapper;
+    @Autowired
+    private SdSubTaskResultItemMapper sdSubTaskResultItemMapper;
+    @Autowired
+    private SdTaskMapper sdTaskMapper;
+
+    @Override
+    public void create(SdSubTask sdSubTask) {
+        int rows = sdSubTaskMapper.insert(sdSubTask);
+        log.info("create sub task rows = {}", rows);
+    }
+
+    @Override
+    public void create(List<SdSubTask> sdSubTasks) {
+        for (SdSubTask sdSubTask : sdSubTasks) {
+            create(sdSubTask);
+        }
+    }
 
     @Override
     public Page<SdSubTaskVO> list(CommonRequest<SubTaskListParam> request) {
@@ -80,11 +113,85 @@ public class SubTaskServiceImpl implements SubTaskService {
 
     @Override
     public void subTaskSubmitHandler() {
-
+        long start = System.nanoTime();
+        log.info("subTaskSubmitHandler start");
+        // 6 小时前
+        Date hourAgo = Date.from(
+                LocalDateTime.now()
+                        .minusHours(6)
+                        .atZone(ZoneId.systemDefault())
+                        .toInstant());
+        LambdaQueryWrapper<SdSubTask> wrapper = Wrappers.lambdaQuery(SdSubTask.class)
+                .eq(SdSubTask::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
+                .eq(SdSubTask::getTaskStatus, SubTaskStatusEnum.INIT.getValue())
+                .ge(SdSubTask::getCreateTime, hourAgo);
+        List<SdSubTask> list = sdSubTaskMapper.selectList(wrapper);
+        if (Objects.isNull(list) || list.isEmpty()) {
+            log.info("there is no subTask need submit.");
+            return;
+        }
+        //提交子任务
+        for (SdSubTask sdSubTask : list) {
+            try {
+                String jobId = contentDeconstructionClient.submitDeconstructionTask(sdSubTask);
+                if (Objects.isNull(jobId) || jobId.isEmpty()) {
+                    continue;
+                }
+                SdSubTask update = new SdSubTask();
+                update.setId(sdSubTask.getId());
+                update.setExternalJobId(jobId);
+                update.setTaskStatus(SubTaskStatusEnum.SUBMITTED.getValue());
+                int rows = sdSubTaskMapper.updateById(update);
+                log.info("subTask submit success, id:{} rows = {}", sdSubTask.getId(), rows);
+            } catch (Exception e) {
+                log.error("subTask submit error {}", sdSubTask, e);
+            }
+        }
+        long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("subTaskSubmitHandler finish cost={}ms", costMs);
     }
 
     @Override
     public void subTaskSyncHandler() {
-
+        long start = System.nanoTime();
+        log.info("subTaskSyncHandler start");
+        // 6 小时前
+        Date hourAgo = Date.from(
+                LocalDateTime.now()
+                        .minusHours(6)
+                        .atZone(ZoneId.systemDefault())
+                        .toInstant());
+        LambdaQueryWrapper<SdSubTask> wrapper = Wrappers.lambdaQuery(SdSubTask.class)
+                .eq(SdSubTask::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
+                .eq(SdSubTask::getTaskStatus, SubTaskStatusEnum.SUBMITTED.getValue())
+                .ge(SdSubTask::getCreateTime, hourAgo);
+        List<SdSubTask> list = sdSubTaskMapper.selectList(wrapper);
+        if (Objects.isNull(list) || list.isEmpty()) {
+            log.info("there is no subTask need sync.");
+            return;
+        }
+        for (SdSubTask sdSubTask : list) {
+            try {
+                String rawResult = contentDeconstructionClient.getDeconstructionTaskResult(sdSubTask.getExternalJobId());
+                if (Objects.isNull(rawResult) || rawResult.isEmpty()) {
+                    continue;
+                }
+                SdSubTaskRawResult sdSubTaskRawResult = new SdSubTaskRawResult();
+                sdSubTaskRawResult.setSubTaskId(sdSubTask.getId());
+                sdSubTaskRawResult.setRawResult(rawResult);
+                int rows = sdSubTaskRawResultMapper.insert(sdSubTaskRawResult);
+                log.info("subTask sync success, id:{} rows = {}", sdSubTask.getId(), rows);
+                //TODO解析结果
+//                if (rows > 0) {
+//                    SdSubTaskResultItem sdSubTaskResultItem = new SdSubTaskResultItem();
+//                    sdSubTaskResultItem.setSubTaskId(sdSubTask.getId());
+//                    sdSubTaskResultItemMapper.insert(sdSubTaskResultItem);
+//                }
+            } catch (Exception e) {
+                log.error("subTask sync error {}", sdSubTask, e);
+            }
+        }
+        long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("subTaskSyncHandler finish cost={}ms", costMs);
     }
 }

+ 199 - 13
supply-demand-engine-core/src/main/java/com/tzld/piaoquan/sde/service/impl/TaskServiceImpl.java

@@ -5,18 +5,14 @@ import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.tzld.piaoquan.sde.common.api.CommonRequest;
 import com.tzld.piaoquan.sde.common.constant.Constant;
-import com.tzld.piaoquan.sde.common.enums.ExceptionEnum;
-import com.tzld.piaoquan.sde.common.enums.IsDeleteEnum;
-import com.tzld.piaoquan.sde.common.enums.TaskStatusEnum;
-import com.tzld.piaoquan.sde.common.enums.TaskTypeEnum;
+import com.tzld.piaoquan.sde.common.enums.*;
 import com.tzld.piaoquan.sde.common.exception.BizException;
-import com.tzld.piaoquan.sde.mapper.SdStrategyMapper;
-import com.tzld.piaoquan.sde.mapper.SdTaskMapper;
+import com.tzld.piaoquan.sde.integration.OpenRouterClient;
+import com.tzld.piaoquan.sde.mapper.*;
+import com.tzld.piaoquan.sde.model.entity.*;
 import com.tzld.piaoquan.sde.model.request.TaskCreateParam;
 import com.tzld.piaoquan.sde.model.request.TaskGetParam;
 import com.tzld.piaoquan.sde.model.request.TaskListParam;
-import com.tzld.piaoquan.sde.model.entity.SdStrategy;
-import com.tzld.piaoquan.sde.model.entity.SdTask;
 import com.tzld.piaoquan.sde.model.vo.SdTaskVO;
 import com.tzld.piaoquan.sde.service.TaskService;
 import com.tzld.piaoquan.sde.util.DateUtil;
@@ -27,9 +23,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.time.ZoneId;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /**
@@ -46,8 +42,19 @@ public class TaskServiceImpl implements TaskService {
     @Autowired
     private SdTaskMapper sdTaskMapper;
 
+    @Autowired
+    private SdSubTaskMapper sdSubTaskMapper;
+
     @Autowired
     private SdStrategyMapper sdStrategyMapper;
+    @Autowired
+    private SdPromptTemplateMapper sdPromptTemplateMapper;
+    @Autowired
+    private SdSubTaskResultItemMapper sdSubTaskResultItemMapper;
+    @Autowired
+    private OpenRouterClient openRouterServiceClient;
+    @Autowired
+    private OpenRouterClient openRouterClient;
 
     @Override
     public void create(CommonRequest<TaskCreateParam> request) {
@@ -74,10 +81,30 @@ public class TaskServiceImpl implements TaskService {
         }
         sdTask.setTaskType(params.getTaskType());
         sdTask.setTaskStatus(TaskStatusEnum.INIT.getValue());
-        int flag = sdTaskMapper.insert(sdTask);
-        if (flag <= 0) {
+        int rows = sdTaskMapper.insert(sdTask);
+        log.info("create insert task rows = {}", rows);
+        if (rows <= 0) {
             throw new BizException(ExceptionEnum.DATA_INSERT_ERROR);
         }
+        //子任务创建
+        int total = 0;
+        for (Long videoId : params.getVideoIds()) {
+            SdSubTask sdSubTask = new SdSubTask();
+            sdSubTask.setTaskId(sdTask.getId());
+            String subTaskNo = IdGeneratorUtil.generateSubTaskNo(sdTask.getTaskNo());
+            sdSubTask.setSubTaskNo(subTaskNo);
+            sdSubTask.setContentId(String.valueOf(videoId));
+            sdSubTask.setTaskStatus(TaskStatusEnum.INIT.getValue());
+            int subRows = sdSubTaskMapper.insert(sdSubTask);
+            total += subRows;
+            log.info("create subTask rows = {}", subRows);
+        }
+        //更新子任务数
+        SdTask updateTask = new SdTask();
+        updateTask.setId(sdTask.getId());
+        updateTask.setSubTaskCount(total);
+        int updateRows = sdTaskMapper.updateById(updateTask);
+        log.info("create update task rows = {}, total = {}", updateRows, total);
     }
 
     @Override
@@ -135,11 +162,170 @@ public class TaskServiceImpl implements TaskService {
 
     @Override
     public void taskStatusCheckHandler() {
+        long start = System.nanoTime();
+        log.info("taskStatusCheckHandler start");
+        // 6 小时前
+        Date hourAgo = Date.from(
+                LocalDateTime.now()
+                        .minusHours(6)
+                        .atZone(ZoneId.systemDefault())
+                        .toInstant());
+        List<Integer> taskStatusList = Arrays.asList(TaskStatusEnum.INIT.getValue(),
+                TaskStatusEnum.PRE_PROCESSING.getValue());
+        LambdaQueryWrapper<SdTask> wrapper = Wrappers.lambdaQuery(SdTask.class)
+                .eq(SdTask::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
+                .in(SdTask::getTaskStatus, taskStatusList)
+                .ge(SdTask::getCreateTime, hourAgo);
+        List<SdTask> tasks = sdTaskMapper.selectList(wrapper);
+        if (Objects.isNull(tasks) || tasks.isEmpty()) {
+            log.info("taskStatusCheckHandler tasks is empty");
+            return;
+        }
+        for (SdTask sdTask : tasks) {
+            TaskStatusEnum taskStatusEnum = TaskStatusEnum.getInstance(sdTask.getTaskStatus());
+            if (Objects.isNull(taskStatusEnum)) {
+                continue;
+            }
+            switch (taskStatusEnum) {
+                case INIT:
+                    initTaskStatusCheck(sdTask);
+                    break;
+                case PRE_PROCESSING:
+                    preProcessingTaskStatsCheck(sdTask);
+                    break;
+                default:
+                    //其他状态不在这里处理
+                    break;
+            }
+        }
+        long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("taskStatusCheckHandler finish cost={}ms", costMs);
+    }
 
+    private void preProcessingTaskStatsCheck(SdTask sdTask) {
+        try {
+            //获取完成任务数
+            List<Integer> finishTaskStatusList = Arrays.asList(TaskStatusEnum.SUCCESS.getValue(),
+                    TaskStatusEnum.FAILED.getValue(),
+                    TaskStatusEnum.TIMEOUT.getValue());
+            LambdaQueryWrapper<SdSubTask> wrapper = Wrappers.lambdaQuery(SdSubTask.class)
+                    .eq(SdSubTask::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
+                    .eq(SdSubTask::getTaskId, sdTask.getId())
+                    .in(SdSubTask::getTaskStatus, finishTaskStatusList);
+            Long finishCount = sdSubTaskMapper.selectCount(wrapper);
+            //更新主任务
+            if (Objects.isNull(finishCount)) {
+                return;
+            }
+            boolean needUpdateFinishedCount = finishCount.intValue() > sdTask.getFinishedSubTaskCount();
+            boolean allFinished = finishCount.intValue() == sdTask.getSubTaskCount();
+            if (needUpdateFinishedCount || allFinished) {
+                SdTask updateTask = new SdTask();
+                updateTask.setId(sdTask.getId());
+                updateTask.setFinishedSubTaskCount(finishCount.intValue());
+                //如果全部完成,更新状态
+                if (allFinished) {
+                    updateTask.setTaskStatus(TaskStatusEnum.READY.getValue());
+                }
+                int updateRows = sdTaskMapper.updateById(updateTask);
+                log.info("preProcessingTaskStatsCheck status:{} task update rows:{}", TaskStatusEnum.PRE_PROCESSING.getDesc(), updateRows);
+            }
+        } catch (Exception e) {
+            log.error("preProcessingTaskStatsCheck error", e);
+        }
+    }
+
+    /**
+     * 初始任务状态更新
+     */
+    private void initTaskStatusCheck(SdTask sdTask) {
+        try {
+            //检查子任务状态
+            LambdaQueryWrapper<SdSubTask> wrapper = Wrappers.lambdaQuery(SdSubTask.class)
+                    .eq(SdSubTask::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
+                    .eq(SdSubTask::getTaskId, sdTask.getId())
+                    .ne(SdSubTask::getTaskStatus, SubTaskStatusEnum.INIT.getValue());
+            Long count = sdSubTaskMapper.selectCount(wrapper);
+            //更新主任务状态
+            if (Objects.nonNull(count) && count > 0) {
+                SdTask updateTask = new SdTask();
+                updateTask.setId(sdTask.getId());
+                updateTask.setTaskStatus(TaskStatusEnum.PRE_PROCESSING.getValue());
+                int updateRows = sdTaskMapper.updateById(updateTask);
+                log.info("initTaskStatusCheck status:{} task update rows:{}", TaskStatusEnum.INIT.getDesc(), updateRows);
+            }
+        } catch (Exception e) {
+            log.error("initTaskStatusCheck error", e);
+        }
     }
 
     @Override
     public void taskExecuteHandler() {
+        long start = System.nanoTime();
+        log.info("taskExecuteHandler start");
+        //查询可以执行的任务
+        LambdaQueryWrapper<SdTask> wrapper = Wrappers.lambdaQuery(SdTask.class)
+                .eq(SdTask::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
+                .eq(SdTask::getTaskStatus, TaskStatusEnum.READY.getValue());
+        List<SdTask> tasks = sdTaskMapper.selectList(wrapper);
+        if (Objects.isNull(tasks) || tasks.isEmpty()) {
+            log.info("taskExecuteHandler tasks is empty");
+            return;
+        }
+        for (SdTask sdTask : tasks) {
+            try {
+                //提取需求
+                SdStrategy strategy = sdStrategyMapper.selectById(sdTask.getStrategyId());
+                if (Objects.isNull(strategy)) {
+                    log.info("taskExecuteHandler strategy is null");
+                    return;
+                }
+                SdPromptTemplate promptTemplate = sdPromptTemplateMapper.selectById(strategy.getPromptTemplateId());
+                if (Objects.isNull(promptTemplate)) {
+                    log.info("taskExecuteHandler promptTemplate is null");
+                    return;
+                }
+
+                LambdaQueryWrapper<SdSubTask> subTaskWrapper = Wrappers.lambdaQuery(SdSubTask.class)
+                        .eq(SdSubTask::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
+                        .eq(SdSubTask::getTaskId, sdTask.getId())
+                        .eq(SdSubTask::getTaskStatus, SubTaskStatusEnum.SUCCESS.getValue())
+                        .eq(SdSubTask::getSubTaskType, sdTask.getTaskType())
+                        .select(SdSubTask::getId);
+                List<SdSubTask> subTasks = sdSubTaskMapper.selectList(subTaskWrapper);
+                if (Objects.isNull(subTasks)) {
+                    log.info("taskExecuteHandler subTasks is null");
+                    return;
+                }
+                List<Long> subTaskIds = subTasks.stream().map(SdSubTask::getId).collect(Collectors.toList());
+                //TODO
+                String itemType = "";
+                LambdaQueryWrapper<SdSubTaskResultItem> resultItemWrapper = Wrappers.lambdaQuery(SdSubTaskResultItem.class)
+                        .eq(SdSubTaskResultItem::getIsDeleted, IsDeleteEnum.NORMAL.getValue())
+                        .in(SdSubTaskResultItem::getSubTaskId, subTaskIds)
+                        .eq(SdSubTaskResultItem::getItemType, itemType)
+                        .select(SdSubTaskResultItem::getItemContent);
+                List<SdSubTaskResultItem> resultItems = sdSubTaskResultItemMapper.selectList(resultItemWrapper);
+                if (Objects.isNull(resultItems) || resultItems.isEmpty()) {
+                    log.info("taskExecuteHandler resultItems is null");
+                    return;
+                }
+                //去掉null,换行拼接
+                String input = resultItems.stream()
+                        .map(SdSubTaskResultItem::getItemContent)
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.joining("\n"));
+
+                String prompt = promptTemplate.getPromptContent();
+                String finalPrompt = prompt.replaceAll("input", input);
+
 
+
+            } catch (Exception e) {
+                log.error("taskExecuteHandler error", e);
+            }
+        }
+        long costMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        log.info("taskExecuteHandler finish cost={}ms", costMs);
     }
 }

+ 60 - 0
supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/SubTaskJob.java

@@ -0,0 +1,60 @@
+package com.tzld.piaoquan.sde.job;
+
+import com.tzld.piaoquan.sde.service.SubTaskService;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import com.xxl.job.core.log.XxlJobLogger;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author supeng
+ */
+@Slf4j
+@Component
+public class SubTaskJob {
+
+    @Autowired
+    private SubTaskService subTaskService;
+
+    /**
+     * 定时提交子任务
+     *
+     * @param params
+     * @return
+     */
+    @XxlJob("subTaskSubmitHandler")
+    public ReturnT<String> subTaskSubmitHandler(String params) {
+        XxlJobLogger.log("subTaskSubmitHandler start");
+        try {
+            subTaskService.subTaskSubmitHandler();
+        } catch (Exception e) {
+            XxlJobLogger.log("subTaskSubmitHandler error", e);
+            return ReturnT.FAIL;
+        } finally {
+            XxlJobLogger.log("subTaskSubmitHandler end");
+        }
+        return ReturnT.SUCCESS;
+    }
+
+    /**
+     * 定时更新子任务状态/结果同步
+     *
+     * @param params
+     * @return
+     */
+    @XxlJob("subTaskSyncHandler")
+    public ReturnT<String> subTaskSyncHandler(String params) {
+        XxlJobLogger.log("subTaskSyncHandler start");
+        try {
+            subTaskService.subTaskSyncHandler();
+        } catch (Exception e) {
+            XxlJobLogger.log("subTaskSyncHandler error", e);
+            return ReturnT.FAIL;
+        } finally {
+            XxlJobLogger.log("subTaskSyncHandler end");
+        }
+        return ReturnT.SUCCESS;
+    }
+}

+ 0 - 44
supply-demand-engine-job/src/main/java/com/tzld/piaoquan/sde/job/TaskJob.java

@@ -1,6 +1,5 @@
 package com.tzld.piaoquan.sde.job;
 
-import com.tzld.piaoquan.sde.service.SubTaskService;
 import com.tzld.piaoquan.sde.service.TaskService;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
@@ -19,49 +18,6 @@ public class TaskJob {
     @Autowired
     private TaskService taskService;
 
-    @Autowired
-    private SubTaskService subTaskService;
-
-    /**
-     * 定时提交子任务
-     *
-     * @param params
-     * @return
-     */
-    @XxlJob("subTaskSubmitHandler")
-    public ReturnT<String> subTaskSubmitHandler(String params) {
-        XxlJobLogger.log("subTaskSubmitHandler start");
-        try {
-            subTaskService.subTaskSubmitHandler();
-        } catch (Exception e) {
-            XxlJobLogger.log("subTaskSubmitHandler error", e);
-            return ReturnT.FAIL;
-        } finally {
-            XxlJobLogger.log("subTaskSubmitHandler end");
-        }
-        return ReturnT.SUCCESS;
-    }
-
-    /**
-     * 定时更新子任务状态/结果同步
-     *
-     * @param params
-     * @return
-     */
-    @XxlJob("subTaskSyncHandler")
-    public ReturnT<String> subTaskSyncHandler(String params) {
-        XxlJobLogger.log("subTaskSyncHandler start");
-        try {
-            subTaskService.subTaskSyncHandler();
-        } catch (Exception e) {
-            XxlJobLogger.log("subTaskSyncHandler error", e);
-            return ReturnT.FAIL;
-        } finally {
-            XxlJobLogger.log("subTaskSyncHandler end");
-        }
-        return ReturnT.SUCCESS;
-    }
-
     /**
      * 任务状态检测更新
      *