|
@@ -76,40 +76,6 @@ public class PipelineServiceImpl implements PipelineService {
|
|
executeTreeNodeBFS(root, dto);
|
|
executeTreeNodeBFS(root, dto);
|
|
}
|
|
}
|
|
|
|
|
|
-// @Override
|
|
|
|
-// public void execute(ContentUnderstandDTO dto) {
|
|
|
|
-// // 1.数据准备
|
|
|
|
-// Long pipelineId = dto.getPipelineId();
|
|
|
|
-// // 1.新增任务
|
|
|
|
-// String taskId = UUID.randomUUID().toString().replace("-", "") + System.currentTimeMillis();
|
|
|
|
-// CuTask cuTask = new CuTask();
|
|
|
|
-// cuTask.setTaskId(taskId);
|
|
|
|
-// cuTask.setPipelineId(pipelineId);
|
|
|
|
-//// cuTask.setInput();
|
|
|
|
-// int insertCount = cuTaskMapper.insertSelective(cuTask);
|
|
|
|
-// // 2.获取pipeline配置
|
|
|
|
-// Integer contentType = dto.getContentType();
|
|
|
|
-// CuPipelineStepExample example = new CuPipelineStepExample();
|
|
|
|
-// example.createCriteria().andPipelineIdEqualTo(pipelineId);
|
|
|
|
-// List<CuPipelineStep> stepList = cuPipelineStepMapper.selectByExample(example);
|
|
|
|
-// if (Objects.isNull(stepList) || stepList.isEmpty()) {
|
|
|
|
-// throw new CommonException(ExceptionEnum.CONFIG_ERROR, "无配置 pipelineId:" + pipelineId);
|
|
|
|
-// }
|
|
|
|
-// // 3.构建tree结构
|
|
|
|
-// TreeNode root = buildTree(stepList);
|
|
|
|
-// root.setType(contentType);
|
|
|
|
-// String content = getContent(dto, contentType);
|
|
|
|
-// if (Objects.isNull(content)) {
|
|
|
|
-// throw new CommonException(ExceptionEnum.DATA_ERROR, "数据异常content:" + content);
|
|
|
|
-// }
|
|
|
|
-// root.setInput(content);
|
|
|
|
-// ContentAnalyseDTO dto1 = new ContentAnalyseDTO();
|
|
|
|
-// dto1.setVideoId(dto.getVideoId());
|
|
|
|
-// dto1.setContent(content);
|
|
|
|
-// // 4.按照步骤执行pipeline每一步动作
|
|
|
|
-// executeTreeNodeBFS(root, dto1);
|
|
|
|
-// }
|
|
|
|
-
|
|
|
|
@Override
|
|
@Override
|
|
public String executeTask(ContentDTO contentDTO) {
|
|
public String executeTask(ContentDTO contentDTO) {
|
|
// 1.数据准备
|
|
// 1.数据准备
|
|
@@ -145,28 +111,10 @@ public class PipelineServiceImpl implements PipelineService {
|
|
dto1.setVideoId(contentDTO.getVideoId());
|
|
dto1.setVideoId(contentDTO.getVideoId());
|
|
dto1.setContent(content);
|
|
dto1.setContent(content);
|
|
// 4.按照步骤执行pipeline每一步动作
|
|
// 4.按照步骤执行pipeline每一步动作
|
|
- executeTreeNodeBFS(root, dto1);
|
|
|
|
|
|
+ executeTaskTreeNodeBFS(root, dto1);
|
|
return taskId;
|
|
return taskId;
|
|
}
|
|
}
|
|
|
|
|
|
-// private String getContent(ContentUnderstandDTO dto, Integer contentType) {
|
|
|
|
-// String content = null;
|
|
|
|
-// if (Objects.equals(contentType, ContentTypeEnum.TITLE.getValue())) {
|
|
|
|
-// content = dto.getTitle();
|
|
|
|
-// } else if (Objects.equals(contentType, ContentTypeEnum.COVER.getValue())) {
|
|
|
|
-// content = dto.getCoverUrl();
|
|
|
|
-// } else if (Objects.equals(contentType, ContentTypeEnum.VIDEO.getValue())) {
|
|
|
|
-// content = dto.getVideoUrl();
|
|
|
|
-// } else if (Objects.equals(contentType, ContentTypeEnum.INTRODUCTION.getValue())) {
|
|
|
|
-// content = dto.getIntroduction();
|
|
|
|
-// } else if (Objects.equals(contentType, ContentTypeEnum.AUDIO.getValue())) {
|
|
|
|
-// content = dto.getAudioUrl();
|
|
|
|
-// } else if (Objects.equals(contentType, ContentTypeEnum.SRT_VTT.getValue())) {
|
|
|
|
-// content = dto.getSrt();
|
|
|
|
-// }
|
|
|
|
-// return content;
|
|
|
|
-// }
|
|
|
|
-
|
|
|
|
public TreeNode buildTree(List<CuPipelineStep> stepList) {
|
|
public TreeNode buildTree(List<CuPipelineStep> stepList) {
|
|
Map<Long, TreeNode> nodeMap = new HashMap<>();
|
|
Map<Long, TreeNode> nodeMap = new HashMap<>();
|
|
TreeNode root = null;
|
|
TreeNode root = null;
|
|
@@ -219,6 +167,49 @@ public class PipelineServiceImpl implements PipelineService {
|
|
String input = currentNode.getInput() == null ? "" : currentNode.getInput();
|
|
String input = currentNode.getInput() == null ? "" : currentNode.getInput();
|
|
// 执行action
|
|
// 执行action
|
|
String result = executeStep(step, input, type);
|
|
String result = executeStep(step, input, type);
|
|
|
|
+ if (Objects.isNull(currentNode.getChildren()) || currentNode.getChildren().isEmpty()) {
|
|
|
|
+ if (Objects.nonNull(result)) {
|
|
|
|
+ //异常不阻塞其他分支执行
|
|
|
|
+ try {
|
|
|
|
+ Map<String, Object> resultMap = JSON.parseObject(result, new TypeReference<Map<String, Object>>() {
|
|
|
|
+ });
|
|
|
|
+ resultMap.put("video_id", videoId);
|
|
|
|
+ mongoTemplate.insert(resultMap, COLLECTION_NAME);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error("step execute error step = {}, result = {}", e, JSON.toJSONString(step), result);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ for (TreeNode child : currentNode.getChildren()) {
|
|
|
|
+ child.setInput(result);
|
|
|
|
+ queue.offer(child);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 遍历执行
|
|
|
|
+ *
|
|
|
|
+ * @param root 根节点
|
|
|
|
+ * @param dto 数据
|
|
|
|
+ */
|
|
|
|
+ public void executeTaskTreeNodeBFS(TreeNode root, ContentAnalyseDTO dto) {
|
|
|
|
+ if (Objects.isNull(root) || Objects.isNull(dto) || Objects.isNull(dto.getVideoId())) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ Queue<TreeNode> queue = new LinkedList<>();
|
|
|
|
+ queue.offer(root);
|
|
|
|
+
|
|
|
|
+ Long videoId = dto.getVideoId();
|
|
|
|
+
|
|
|
|
+ while (!queue.isEmpty()) {
|
|
|
|
+ TreeNode currentNode = queue.poll();
|
|
|
|
+ CuPipelineStep step = currentNode.getStep();
|
|
|
|
+ Integer type = currentNode.getType();
|
|
|
|
+ String input = currentNode.getInput() == null ? "" : currentNode.getInput();
|
|
|
|
+ // 执行action
|
|
|
|
+ String result = executeTaskStep(step, input, type);
|
|
if (Objects.isNull(currentNode.getChildren()) || currentNode.getChildren().isEmpty()) {
|
|
if (Objects.isNull(currentNode.getChildren()) || currentNode.getChildren().isEmpty()) {
|
|
if (Objects.nonNull(result)) {
|
|
if (Objects.nonNull(result)) {
|
|
//异常不阻塞其他分支执行
|
|
//异常不阻塞其他分支执行
|
|
@@ -264,4 +255,26 @@ public class PipelineServiceImpl implements PipelineService {
|
|
actionParam.setPrompt(prompt);
|
|
actionParam.setPrompt(prompt);
|
|
return action.execute(actionParam);
|
|
return action.execute(actionParam);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 执行step 动作
|
|
|
|
+ *
|
|
|
|
+ * @param step
|
|
|
|
+ * @param input
|
|
|
|
+ * @return
|
|
|
|
+ */
|
|
|
|
+ public String executeTaskStep(CuPipelineStep step, String input, Integer type) {
|
|
|
|
+ CuPrompt cuPrompt = cuPromptMapper.selectByPrimaryKey(step.getPromptId());
|
|
|
|
+ if (Objects.isNull(cuPrompt) || Objects.isNull(cuPrompt.getId())) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ String prompt = String.format(cuPrompt.getPrompt(), input);
|
|
|
|
+
|
|
|
|
+ Action action = actionMap.get(step.getAction());
|
|
|
|
+ ActionParam actionParam = new ActionParam();
|
|
|
|
+ actionParam.setType(type);
|
|
|
|
+ actionParam.setInput(input);
|
|
|
|
+ actionParam.setPrompt(prompt);
|
|
|
|
+ return action.execute(actionParam);
|
|
|
|
+ }
|
|
}
|
|
}
|