From dcc9d64ef6c5a8353722aa371eb4920d33d6d756 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Sun, 24 Mar 2024 20:52:00 +0800 Subject: [PATCH] Fix Switch logic task doesn't check the branch exist (#15755) --- .../task/switchtask/SwitchLogicTask.java | 116 +++++++++--------- 1 file changed, 57 insertions(+), 59 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java index 244926c096..1f52f9287d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/switchtask/SwitchLogicTask.java @@ -18,10 +18,8 @@ package org.apache.dolphinscheduler.server.master.runner.task.switchtask; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; @@ -34,7 +32,6 @@ import org.apache.dolphinscheduler.server.master.runner.task.BaseSyncLogicTask; import org.apache.dolphinscheduler.server.master.utils.SwitchTaskUtils; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; import java.util.List; import java.util.Map; @@ -47,7 +44,7 @@ public class SwitchLogicTask extends BaseSyncLogicTask { public static final String TASK_TYPE = "SWITCH"; - private final ProcessInstance processInstance; + private final WorkflowExecuteRunnable workflowExecuteRunnable; private final TaskInstance taskInstance; public SwitchLogicTask(TaskExecutionContext taskExecutionContext, @@ -59,9 +56,8 @@ public class SwitchLogicTask extends BaseSyncLogicTask { .orElseThrow(() -> new LogicTaskInitializeException( "Cannot find the task instance in workflow execute runnable")) .getSwitchDependency()); - WorkflowExecuteRunnable workflowExecuteRunnable = + this.workflowExecuteRunnable = processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId()); - this.processInstance = workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowInstance(); this.taskInstance = workflowExecuteRunnable.getTaskInstance(taskExecutionContext.getTaskInstanceId()) .orElseThrow(() -> new LogicTaskInitializeException( "Cannot find the task instance in workflow execute runnable")); @@ -69,84 +65,86 @@ public class SwitchLogicTask extends BaseSyncLogicTask { @Override public void handle() throws MasterTaskExecuteException { - DependResult conditionResult = calculateConditionResult(); - TaskExecutionStatus status = - (conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE; - log.info("Switch task execute finished, condition result is: {}, task status is: {}", conditionResult, - status.name()); - taskExecutionContext.setCurrentExecutionStatus(status); + // Calculate the condition result and get the next node + if (CollectionUtils.isEmpty(taskParameters.getDependTaskList())) { + moveToDefaultBranch(); + } else { + calculateSwitchBranch(); + } + taskInstance.setSwitchDependency(taskParameters); + log.info("Switch task execute finished"); + taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); } - // todo: don't use depend result, use switch result - private DependResult calculateConditionResult() { - DependResult conditionResult = DependResult.SUCCESS; + private void moveToDefaultBranch() { + checkIfBranchExist(taskParameters.getNextNode()); List switchResultVos = taskParameters.getDependTaskList(); + switchResultVos.add(new SwitchResultVo(null, taskParameters.getNextNode())); + taskParameters.setResultConditionLocation(switchResultVos.size() - 1); - SwitchResultVo switchResultVo = new SwitchResultVo(); - switchResultVo.setNextNode(taskParameters.getNextNode()); - switchResultVos.add(switchResultVo); - // todo: refactor these calculate code - int finalConditionLocation = switchResultVos.size() - 1; - int i = 0; + log.info("The condition is not satisfied, move to the default branch: {}", + taskParameters.getNextNode().stream().map(node -> workflowExecuteRunnable.getWorkflowExecuteContext() + .getWorkflowGraph().getDag().getNode(node).getName()).collect(Collectors.toList())); + } - Map globalParams = JSONUtils - .toList(processInstance.getGlobalParams(), Property.class) - .stream() - .collect(Collectors.toMap(Property::getProp, Property -> Property)); + private void calculateSwitchBranch() { + List switchResultVos = taskParameters.getDependTaskList(); + if (CollectionUtils.isEmpty(switchResultVos)) { + moveToDefaultBranch(); + } + Map globalParams = taskExecutionContext.getPrepareParamsMap(); Map varParams = JSONUtils .toList(taskInstance.getVarPool(), Property.class) .stream() .collect(Collectors.toMap(Property::getProp, Property -> Property)); - for (SwitchResultVo info : switchResultVos) { - log.info("Begin to execute {} condition: {} ", (i + 1), info.getCondition()); - if (StringUtils.isEmpty(info.getCondition())) { - finalConditionLocation = i; - break; - } - String content = - SwitchTaskUtils.generateContentWithTaskParams(info.getCondition(), globalParams, varParams); + int finalConditionLocation = -1; + for (int i = 0; i < switchResultVos.size(); i++) { + SwitchResultVo switchResultVo = switchResultVos.get(i); + log.info("Begin to execute {} condition: {} ", i, switchResultVo.getCondition()); + String content = SwitchTaskUtils.generateContentWithTaskParams(switchResultVo.getCondition(), globalParams, + varParams); log.info("Format condition sentence::{} successfully", content); - Boolean result; + boolean result; try { result = SwitchTaskUtils.evaluate(content); log.info("Execute condition sentence: {} successfully: {}", content, result); + if (result) { + finalConditionLocation = i; + } } catch (Exception e) { log.info("Execute condition sentence: {} failed", content, e); - conditionResult = DependResult.FAILED; - break; - } - if (result) { - finalConditionLocation = i; - break; } - i++; } - taskParameters.setDependTaskList(switchResultVos); - taskParameters.setResultConditionLocation(finalConditionLocation); - taskInstance.setSwitchDependency(taskParameters); - - if (!isValidSwitchResult(switchResultVos.get(finalConditionLocation))) { - conditionResult = DependResult.FAILED; - log.error("The switch task depend result is invalid, result:{}, switch branch:{}", conditionResult, - finalConditionLocation); + if (finalConditionLocation >= 0) { + checkIfBranchExist(switchResultVos.get(finalConditionLocation).getNextNode()); + log.info("The condition is satisfied, move to the branch: {}", + switchResultVos.get(finalConditionLocation).getNextNode().stream() + .map(node -> workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowGraph().getDag() + .getNode(node).getName()) + .collect(Collectors.toList())); + taskParameters.setResultConditionLocation(finalConditionLocation); + } else { + log.info("All conditions are not satisfied, move to the default branch"); + moveToDefaultBranch(); } - - log.info("The switch task depend result:{}, switch branch:{}", conditionResult, finalConditionLocation); - return conditionResult; } - private boolean isValidSwitchResult(SwitchResultVo switchResult) { - if (CollectionUtils.isEmpty(switchResult.getNextNode())) { - return false; + private void checkIfBranchExist(List branchNode) { + if (CollectionUtils.isEmpty(branchNode)) { + throw new IllegalArgumentException("The branchNode is empty, please check the switch task configuration"); } - for (Long nextNode : switchResult.getNextNode()) { - if (nextNode == null) { - return false; + for (Long branch : branchNode) { + if (branch == null) { + throw new IllegalArgumentException("The branch is empty, please check the switch task configuration"); + } + if (!workflowExecuteRunnable.getWorkflowExecuteContext().getWorkflowGraph().getDag().containsNode(branch)) { + throw new IllegalArgumentException( + "The branch(code= " + branchNode + + ") is not in the dag, please check the switch task configuration"); } } - return true; } }