|  | @@ -2,7 +2,9 @@ package com.tzld.piaoquan.content.understanding.service.impl;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  import com.alibaba.fastjson.JSON;
 | 
	
		
			
				|  |  |  import com.alibaba.fastjson.TypeReference;
 | 
	
		
			
				|  |  | +import com.google.common.util.concurrent.ThreadFactoryBuilder;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.content.understanding.common.enums.ExceptionEnum;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.content.understanding.common.enums.TaskStatusEnum;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.content.understanding.common.exception.CommonException;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.content.understanding.dao.mapper.CuPipelineMapper;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.content.understanding.dao.mapper.CuPipelineStepMapper;
 | 
	
	
		
			
				|  | @@ -13,10 +15,7 @@ import com.tzld.piaoquan.content.understanding.model.dto.ContentDTO;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.content.understanding.model.dto.TreeNode;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.content.understanding.model.param.ActionParam;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.content.understanding.model.param.ContentAnalyseParam;
 | 
	
		
			
				|  |  | -import com.tzld.piaoquan.content.understanding.model.po.CuPipelineStep;
 | 
	
		
			
				|  |  | -import com.tzld.piaoquan.content.understanding.model.po.CuPipelineStepExample;
 | 
	
		
			
				|  |  | -import com.tzld.piaoquan.content.understanding.model.po.CuPrompt;
 | 
	
		
			
				|  |  | -import com.tzld.piaoquan.content.understanding.model.po.CuTask;
 | 
	
		
			
				|  |  | +import com.tzld.piaoquan.content.understanding.model.po.*;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.content.understanding.service.Action;
 | 
	
		
			
				|  |  |  import com.tzld.piaoquan.content.understanding.service.PipelineService;
 | 
	
		
			
				|  |  |  import lombok.extern.slf4j.Slf4j;
 | 
	
	
		
			
				|  | @@ -25,7 +24,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 | 
	
		
			
				|  |  |  import org.springframework.data.mongodb.core.MongoTemplate;
 | 
	
		
			
				|  |  |  import org.springframework.stereotype.Service;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import javax.annotation.PostConstruct;
 | 
	
		
			
				|  |  | +import javax.annotation.PreDestroy;
 | 
	
		
			
				|  |  |  import java.util.*;
 | 
	
		
			
				|  |  | +import java.util.concurrent.*;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  /**
 | 
	
		
			
				|  |  |   * @author supeng
 | 
	
	
		
			
				|  | @@ -54,6 +56,32 @@ public class PipelineServiceImpl implements PipelineService {
 | 
	
		
			
				|  |  |      @Autowired
 | 
	
		
			
				|  |  |      private CuTaskMapper cuTaskMapper;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 线程池队列大小
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    private static final int QUEUE_MAX_SIZE = 100000;
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 线程命名
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("pipeline-service-pool-%d").build();
 | 
	
		
			
				|  |  | +    /**
 | 
	
		
			
				|  |  | +     * 线程池
 | 
	
		
			
				|  |  | +     */
 | 
	
		
			
				|  |  | +    private static ExecutorService pool;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @PostConstruct
 | 
	
		
			
				|  |  | +    public void init() {
 | 
	
		
			
				|  |  | +        //init thread pool
 | 
	
		
			
				|  |  | +        pool = new ThreadPoolExecutor(32, 32,
 | 
	
		
			
				|  |  | +                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(QUEUE_MAX_SIZE), NAMED_THREAD_FACTORY, new ThreadPoolExecutor.AbortPolicy());
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @PreDestroy
 | 
	
		
			
				|  |  | +    public void destroy() {
 | 
	
		
			
				|  |  | +        //gracefully shutdown
 | 
	
		
			
				|  |  | +        pool.shutdown();
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      @Override
 | 
	
		
			
				|  |  |      public void execute(ContentAnalyseParam param) {
 | 
	
		
			
				|  |  |          // 1.获取pipeline配置
 | 
	
	
		
			
				|  | @@ -100,18 +128,22 @@ public class PipelineServiceImpl implements PipelineService {
 | 
	
		
			
				|  |  |          if (Objects.isNull(stepList) || stepList.isEmpty()) {
 | 
	
		
			
				|  |  |              throw new CommonException(ExceptionEnum.CONFIG_ERROR, "无配置 pipelineId:" + pipelineId);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | -        // 3.构建tree结构
 | 
	
		
			
				|  |  | -        TreeNode root = buildTree(stepList);
 | 
	
		
			
				|  |  | -        root.setType(contentType);
 | 
	
		
			
				|  |  |          if (Objects.isNull(content)) {
 | 
	
		
			
				|  |  |              throw new CommonException(ExceptionEnum.DATA_ERROR, "数据异常content:" + content);
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | -        root.setInput(content);
 | 
	
		
			
				|  |  | -        ContentAnalyseDTO dto1 = new ContentAnalyseDTO();
 | 
	
		
			
				|  |  | -        dto1.setVideoId(contentDTO.getVideoId());
 | 
	
		
			
				|  |  | -        dto1.setContent(content);
 | 
	
		
			
				|  |  | -        // 4.按照步骤执行pipeline每一步动作
 | 
	
		
			
				|  |  | -        executeTaskTreeNodeBFS(root, dto1);
 | 
	
		
			
				|  |  | +        //异步执行
 | 
	
		
			
				|  |  | +        pool.execute(() -> {
 | 
	
		
			
				|  |  | +            // 3.构建tree结构
 | 
	
		
			
				|  |  | +            TreeNode root = buildTree(stepList);
 | 
	
		
			
				|  |  | +            root.setType(contentType);
 | 
	
		
			
				|  |  | +            root.setInput(content);
 | 
	
		
			
				|  |  | +            ContentAnalyseDTO dto1 = new ContentAnalyseDTO();
 | 
	
		
			
				|  |  | +            dto1.setTaskId(taskId);
 | 
	
		
			
				|  |  | +            dto1.setVideoId(contentDTO.getVideoId());
 | 
	
		
			
				|  |  | +            dto1.setContent(content);
 | 
	
		
			
				|  |  | +            // 4.按照步骤执行pipeline每一步动作
 | 
	
		
			
				|  |  | +            executeTaskTreeNodeBFS(root, dto1);
 | 
	
		
			
				|  |  | +        });
 | 
	
		
			
				|  |  |          return taskId;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -201,6 +233,7 @@ public class PipelineServiceImpl implements PipelineService {
 | 
	
		
			
				|  |  |          Queue<TreeNode> queue = new LinkedList<>();
 | 
	
		
			
				|  |  |          queue.offer(root);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +        String taskId = dto.getTaskId();
 | 
	
		
			
				|  |  |          Long videoId = dto.getVideoId();
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          while (!queue.isEmpty()) {
 | 
	
	
		
			
				|  | @@ -220,7 +253,15 @@ public class PipelineServiceImpl implements PipelineService {
 | 
	
		
			
				|  |  |                          //保存数据到mongo
 | 
	
		
			
				|  |  |                          mongoTemplate.insert(resultMap, COLLECTION_NAME);
 | 
	
		
			
				|  |  |                          //保存数据到mysql
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | +                        CuTaskExample example = new CuTaskExample();
 | 
	
		
			
				|  |  | +                        example.createCriteria().andTaskIdEqualTo(taskId);
 | 
	
		
			
				|  |  | +                        CuTask cuTask = new CuTask();
 | 
	
		
			
				|  |  | +                        cuTask.setOutput(JSON.toJSONString(resultMap));
 | 
	
		
			
				|  |  | +                        cuTask.setTaskStatus(TaskStatusEnum.SUCCESS.getValue());
 | 
	
		
			
				|  |  | +                        int insert = cuTaskMapper.updateByExampleSelective(cuTask, example);
 | 
	
		
			
				|  |  | +                        if (insert <= 0) {
 | 
	
		
			
				|  |  | +                            log.error("step execute error step = {}, result = {} insert = {}", JSON.toJSONString(step), result, insert);
 | 
	
		
			
				|  |  | +                        }
 | 
	
		
			
				|  |  |                      } catch (Exception e) {
 | 
	
		
			
				|  |  |                          log.error("step execute error step = {}, result = {}", e, JSON.toJSONString(step), result);
 | 
	
		
			
				|  |  |                      }
 |