diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 4897449753..6fc14d5cac 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -31,11 +31,15 @@ import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; +import org.apache.commons.lang3.StringUtils; + import java.io.Serializable; import java.util.Date; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -192,6 +196,9 @@ public class TaskInstance implements Serializable { @TableField(exist = false) private DependentParameters dependency; + @TableField(exist = false) + private ConditionsParameters conditionsParameters; + /** * switch dependency */ @@ -318,6 +325,43 @@ public class TaskInstance implements Serializable { this.dependency = dependency; } + public ConditionsParameters getConditionsParameters() { + if (this.conditionsParameters == null) { + Map taskParamsMap = + JSONUtils.parseObject(this.getTaskParams(), new TypeReference>() { + }); + this.conditionsParameters = + JSONUtils.parseObject((String) taskParamsMap.get(Constants.DEPENDENCE), ConditionsParameters.class); + } + return conditionsParameters; + } + + public ConditionsParameters.ConditionResult getConditionResult() { + Map taskParamsMap = + JSONUtils.parseObject(this.getTaskParams(), new TypeReference>() { + }); + String conditionResult = (String) taskParamsMap.getOrDefault(Constants.CONDITION_RESULT, ""); + if (StringUtils.isNotEmpty(conditionResult)) { + return JSONUtils.parseObject(conditionResult, new TypeReference() { + }); + } + return null; + } + + public void setConditionResult(ConditionsParameters conditionsParameters) { + if (conditionsParameters == null) { + return; + } + Map taskParamsMap = + JSONUtils.parseObject(this.getTaskParams(), new TypeReference>() { + }); + if (taskParamsMap == null) { + taskParamsMap = new HashMap<>(); + } + taskParamsMap.put(Constants.CONDITION_RESULT, JSONUtils.toJsonString(conditionsParameters)); + this.setTaskParams(JSONUtils.toJsonString(taskParamsMap)); + } + public SwitchParameters getSwitchDependency() { // todo: We need to directly use Jackson to deserialize the taskParam, rather than parse the map and get from // field. diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 919ca844c4..21a23fbe5b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -385,18 +385,19 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { // retry task log.info("Retry taskInstance taskInstance state: {}", taskInstance.getState()); retryTaskInstance(taskInstance); - } else if (taskInstance.getState().isFailure()) { + } else if (taskInstance.getState().isFailure() || taskInstance.getState().isKill() + || taskInstance.getState().isStop()) { completeTaskSet.add(taskInstance.getTaskCode()); - ProjectUser projectUser = - processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId()); - listenerEventAlertManager.publishTaskFailListenerEvent(workflowInstance, taskInstance, projectUser); + listenerEventAlertManager.publishTaskFailListenerEvent(workflowInstance, taskInstance); + if (isTaskNeedPutIntoErrorMap(taskInstance)) { + errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + } // There are child nodes and the failure policy is: CONTINUE if (workflowInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode( taskInstance.getTaskCode(), workflowExecuteContext.getWorkflowGraph().getDag())) { submitPostNode(taskInstance.getTaskCode()); } else { - errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); if (workflowInstance.getFailureStrategy() == FailureStrategy.END) { killAllTasks(); } @@ -805,10 +806,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { completeTaskSet.add(task.getTaskCode()); continue; } - if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(task.getTaskCode(), - workflowExecuteContext.getWorkflowGraph().getDag())) { - continue; - } + if (task.taskCanRetry()) { if (task.getState().isNeedFaultTolerance()) { log.info("TaskInstance needs fault tolerance, will be added to standby list."); @@ -824,7 +822,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { } continue; } - if (task.getState().isFailure()) { + if (isTaskNeedPutIntoErrorMap(task)) { errorTaskMap.put(task.getTaskCode(), task.getId()); } } finally { @@ -2015,6 +2013,24 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { } } + /** + * Whether the task instance need to put into {@link #errorTaskMap}. + * Only the task instance is failed or killed, and it is parent of condition task. + * Then it should be put into {@link #errorTaskMap}. + *

Once a task instance is put into {@link #errorTaskMap}, it will be thought as failed and make the workflow be failed. + */ + private boolean isTaskNeedPutIntoErrorMap(TaskInstance taskInstance) { + if (!taskInstance.getState().isFailure() && !taskInstance.getState().isStop() + && !taskInstance.getState().isKill()) { + return false; + } + TaskNode taskNode = workflowExecuteContext.getWorkflowGraph().getTaskNodeByCode(taskInstance.getTaskCode()); + if (DagHelper.haveConditionsAfterNode(taskNode.getCode(), workflowExecuteContext.getWorkflowGraph().getDag())) { + return false; + } + return true; + } + private enum WorkflowRunnableStatus { CREATED, INITIALIZE_QUEUE, STARTED, ; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java index 803a8043ff..b54f10d48d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java @@ -25,10 +25,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem; -import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters; +import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils; -import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; -import org.apache.dolphinscheduler.server.master.exception.LogicTaskInitializeException; import org.apache.dolphinscheduler.server.master.runner.task.BaseSyncLogicTask; import java.util.List; @@ -40,37 +38,34 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @Slf4j -public class ConditionLogicTask extends BaseSyncLogicTask { +public class ConditionLogicTask extends BaseSyncLogicTask { public static final String TASK_TYPE = "CONDITIONS"; private final TaskInstanceDao taskInstanceDao; private final ProcessInstanceDao workflowInstanceDao; + private final TaskInstance taskInstance; + public ConditionLogicTask(TaskExecutionContext taskExecutionContext, - ProcessInstanceExecCacheManager processInstanceExecCacheManager, + TaskInstance taskInstance, TaskInstanceDao taskInstanceDao, - ProcessInstanceDao workflowInstanceDao) throws LogicTaskInitializeException { + ProcessInstanceDao workflowInstanceDao) { // todo: we need to change the parameter in front-end, so that we can directly use json to parse - super(taskExecutionContext, - processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId()) - .getTaskInstance(taskExecutionContext.getTaskInstanceId()) - .orElseThrow(() -> new LogicTaskInitializeException( - "Cannot find the task instance in workflow execute runnable")) - .getDependency()); - // todoļ¼šcheck the parameters, why we don't use conditionTask? taskInstance.getDependency(); + super(taskExecutionContext, taskInstance.getConditionsParameters()); this.taskInstanceDao = taskInstanceDao; this.workflowInstanceDao = workflowInstanceDao; + this.taskInstance = taskInstance; } @Override public void handle() { // calculate the conditionResult DependResult conditionResult = calculateConditionResult(); - TaskExecutionStatus taskExecutionStatus = - (conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE; - log.info("The condition result is {}, task instance statue will be: {}", conditionResult, taskExecutionStatus); - taskExecutionContext.setCurrentExecutionStatus(taskExecutionStatus); + log.info("The condition result is {}", conditionResult); + taskParameters.setConditionSuccess(conditionResult == DependResult.SUCCESS); + taskInstance.setConditionsParameters(taskParameters); + taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); } private DependResult calculateConditionResult() { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java index d6887df6b5..4aee27f36d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.task.condition; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -43,7 +44,15 @@ public class ConditionLogicTaskPluginFactory implements ILogicTaskPluginFactory< @Override public ConditionLogicTask createLogicTask(TaskExecutionContext taskExecutionContext) throws LogicTaskInitializeException { - return new ConditionLogicTask(taskExecutionContext, processInstanceExecCacheManager, taskInstanceDao, + TaskInstance taskInstance = + processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId()) + .getTaskInstance(taskExecutionContext.getTaskInstanceId()) + .orElseThrow(() -> new LogicTaskInitializeException( + "Cannot find the task instance in workflow execute runnable")); + return new ConditionLogicTask( + taskExecutionContext, + taskInstance, + taskInstanceDao, processInstanceDao); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java index 182ed678a0..3056a14f5b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java @@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent; import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent; import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections4.CollectionUtils; @@ -71,6 +72,9 @@ public class ListenerEventAlertManager { @Autowired private AlertPluginInstanceMapper alertPluginInstanceMapper; + @Autowired + private ProcessService processService; + public void publishServerDownListenerEvent(String host, String type) { ServerDownListenerEvent event = new ServerDownListenerEvent(); event.setEventTime(new Date()); @@ -214,8 +218,9 @@ public class ListenerEventAlertManager { } public void publishTaskFailListenerEvent(ProcessInstance processInstance, - TaskInstance taskInstance, - ProjectUser projectUser) { + TaskInstance taskInstance) { + ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()); + TaskFailListenerEvent event = new TaskFailListenerEvent(); event.setProjectCode(projectUser.getProjectCode()); event.setProjectName(projectUser.getProjectName()); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java index ee5e97cf55..4a6235c0e0 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java @@ -76,10 +76,10 @@ public class DagHelper { /** * generate task nodes needed by dag * - * @param taskNodeList taskNodeList - * @param startNodeNameList startNodeNameList + * @param taskNodeList taskNodeList + * @param startNodeNameList startNodeNameList * @param recoveryNodeCodeList recoveryNodeCodeList - * @param taskDependType taskDependType + * @param taskDependType taskDependType * @return task node list */ public static List generateFlowNodeListByStartNode(List taskNodeList, @@ -139,7 +139,7 @@ public class DagHelper { /** * find all the nodes that depended on the start node * - * @param startNode startNode + * @param startNode startNode * @param taskNodeList taskNodeList * @return task node list */ @@ -166,9 +166,9 @@ public class DagHelper { /** * find all nodes that start nodes depend on. * - * @param startNode startNode + * @param startNode startNode * @param recoveryNodeCodeList recoveryNodeCodeList - * @param taskNodeList taskNodeList + * @param taskNodeList taskNodeList * @return task node list */ private static List getFlowNodeListPre(TaskNode startNode, @@ -204,10 +204,10 @@ public class DagHelper { /** * generate dag by start nodes and recovery nodes * - * @param totalTaskNodeList totalTaskNodeList - * @param startNodeNameList startNodeNameList + * @param totalTaskNodeList totalTaskNodeList + * @param startNodeNameList startNodeNameList * @param recoveryNodeCodeList recoveryNodeCodeList - * @param depNodeType depNodeType + * @param depNodeType depNodeType * @return process dag * @throws Exception if error throws Exception */ @@ -232,7 +232,7 @@ public class DagHelper { * find node by node name * * @param nodeDetails nodeDetails - * @param nodeName nodeName + * @param nodeName nodeName * @return task node */ public static TaskNode findNodeByName(List nodeDetails, String nodeName) { @@ -248,7 +248,7 @@ public class DagHelper { * find node by node code * * @param nodeDetails nodeDetails - * @param nodeCode nodeCode + * @param nodeCode nodeCode * @return task node */ public static TaskNode findNodeByCode(List nodeDetails, Long nodeCode) { @@ -263,8 +263,8 @@ public class DagHelper { /** * the task can be submit when all the depends nodes are forbidden or complete * - * @param taskNode taskNode - * @param dag dag + * @param taskNode taskNode + * @param dag dag * @param completeTaskList completeTaskList * @return can submit */ @@ -369,22 +369,20 @@ public class DagHelper { return conditionTaskList; } TaskInstance taskInstance = completeTaskList.get(nodeCode); - ConditionsParameters conditionsParameters = - JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class); + ConditionsParameters conditionsParameters = taskInstance.getConditionsParameters(); + ConditionsParameters.ConditionResult conditionResult = taskInstance.getConditionResult(); + List skipNodeList = new ArrayList<>(); - if (taskInstance.getState().isSuccess()) { - conditionTaskList = conditionsParameters.getSuccessNode(); - skipNodeList = conditionsParameters.getFailedNode(); - } else if (taskInstance.getState().isFailure()) { - conditionTaskList = conditionsParameters.getFailedNode(); - skipNodeList = conditionsParameters.getSuccessNode(); + if (conditionsParameters.isConditionSuccess()) { + conditionTaskList = conditionResult.getSuccessNode(); + skipNodeList = conditionResult.getFailedNode(); } else { - conditionTaskList.add(nodeCode); + conditionTaskList = conditionResult.getFailedNode(); + skipNodeList = conditionResult.getSuccessNode(); } - // the skipNodeList maybe null if no next task - skipNodeList = Optional.ofNullable(skipNodeList).orElse(new ArrayList<>()); - for (Long failedNode : skipNodeList) { - setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList); + + if (CollectionUtils.isNotEmpty(skipNodeList)) { + skipNodeList.forEach(skipNode -> setTaskNodeSkip(skipNode, dag, completeTaskList, skipTaskNodeList)); } // the conditionTaskList maybe null if no next task conditionTaskList = Optional.ofNullable(conditionTaskList).orElse(new ArrayList<>()); @@ -447,6 +445,7 @@ public class DagHelper { /** * get all downstream nodes of the branch that the switch node needs to execute + * * @param taskCode * @param dag * @param switchNeedWorkCodes @@ -480,6 +479,7 @@ public class DagHelper { } } } + /** * set task node and the post nodes skip flag */ diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java index 981b0a8050..4ba958e71d 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.service.alert; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; @@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; import java.util.List; @@ -40,8 +42,6 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * ProcessAlertManager Test @@ -49,8 +49,6 @@ import org.slf4j.LoggerFactory; @ExtendWith(MockitoExtension.class) public class ListenerEventAlertManagerTest { - private static final Logger logger = LoggerFactory.getLogger(ListenerEventAlertManagerTest.class); - @InjectMocks ListenerEventAlertManager listenerEventAlertManager; @@ -60,6 +58,9 @@ public class ListenerEventAlertManagerTest { @Mock ListenerEventMapper listenerEventMapper; + @Mock + ProcessService processService; + @Test public void sendServerDownListenerEventTest() { String host = "127.0.0.1"; @@ -67,7 +68,7 @@ public class ListenerEventAlertManagerTest { List globalPluginInstanceList = new ArrayList<>(); AlertPluginInstance instance = new AlertPluginInstance(1, "instanceParams", "instanceName"); globalPluginInstanceList.add(instance); - Mockito.when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()) + when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()) .thenReturn(globalPluginInstanceList); Mockito.doNothing().when(listenerEventMapper).insertServerDownEvent(any(), any()); listenerEventAlertManager.publishServerDownListenerEvent(host, type); @@ -82,9 +83,9 @@ public class ListenerEventAlertManagerTest { AlertPluginInstance instance = new AlertPluginInstance(1, "instanceParams", "instanceName"); List globalPluginInstanceList = new ArrayList<>(); globalPluginInstanceList.add(instance); - Mockito.when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()) + when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList()) .thenReturn(globalPluginInstanceList); - Mockito.when(listenerEventMapper.insert(any())).thenReturn(1); + when(listenerEventMapper.insert(any())).thenReturn(1); listenerEventAlertManager.publishProcessDefinitionCreatedListenerEvent(user, processDefinition, taskDefinitionLogs, processTaskRelationLogs); } @@ -142,7 +143,8 @@ public class ListenerEventAlertManagerTest { public void sendTaskFailListenerEvent() { ProcessInstance processInstance = Mockito.mock(ProcessInstance.class); TaskInstance taskInstance = Mockito.mock(TaskInstance.class); - ProjectUser projectUser = Mockito.mock(ProjectUser.class); - listenerEventAlertManager.publishTaskFailListenerEvent(processInstance, taskInstance, projectUser); + when(processService.queryProjectWithUserByProcessInstanceId(processInstance.getId())) + .thenReturn(new ProjectUser()); + listenerEventAlertManager.publishTaskFailListenerEvent(processInstance, taskInstance); } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java index c19812303c..a36c414272 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo; +import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters; import org.apache.dolphinscheduler.service.model.TaskNode; import org.apache.dolphinscheduler.service.process.ProcessDag; @@ -259,9 +260,14 @@ public class DagHelperTest { completeTaskList.put(1L, new TaskInstance()); completeTaskList.put(2L, new TaskInstance()); completeTaskList.put(4L, new TaskInstance()); - TaskNode node3 = dag.getNode(3L); - node3.setType(TASK_TYPE_CONDITIONS); - node3.setConditionResult("{\n" + + TaskInstance taskInstance3 = new TaskInstance(); + taskInstance3.setTaskType(TASK_TYPE_CONDITIONS); + Map params = new HashMap<>(); + ConditionsParameters conditionsParameters = new ConditionsParameters(); + conditionsParameters.setConditionSuccess(true); + params.put(Constants.DEPENDENCE, "{\"conditionSuccess\": true}"); + params.put(Constants.CONDITION_RESULT, "{\n" + " \"successNode\": [5\n" + @@ -272,11 +278,12 @@ public class DagHelperTest { " ]\n" + " }"); - completeTaskList.remove(3L); - TaskInstance taskInstance = new TaskInstance(); - taskInstance.setState(TaskExecutionStatus.SUCCESS); + taskInstance3.setTaskParams(JSONUtils.toJsonString(params)); + taskInstance3.setState(TaskExecutionStatus.SUCCESS); + TaskNode node3 = dag.getNode(3L); + node3.setType(TASK_TYPE_CONDITIONS); // complete 1/2/3/4 expect:8 - completeTaskList.put(3L, taskInstance); + completeTaskList.put(3L, taskInstance3); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(1, postNodes.size()); Assertions.assertTrue(postNodes.contains(8L)); @@ -291,7 +298,6 @@ public class DagHelperTest { // 3.complete 1/2/3/4/5/8 expect post:7 skip:6 skipNodeList.clear(); TaskInstance taskInstance1 = new TaskInstance(); - taskInstance.setState(TaskExecutionStatus.SUCCESS); completeTaskList.put(5L, taskInstance1); postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); Assertions.assertEquals(1, postNodes.size()); @@ -299,35 +305,6 @@ public class DagHelperTest { Assertions.assertEquals(1, skipNodeList.size()); Assertions.assertTrue(skipNodeList.containsKey(6L)); - // dag: 1-2-3-5-7 4-3-6 - // 3-if , complete:1/2/3/4 - // 1.failure:3 expect post:6 skip:5/7 - skipNodeList.clear(); - completeTaskList.remove(3L); - taskInstance = new TaskInstance(); - - Map taskParamsMap = new HashMap<>(); - taskParamsMap.put(Constants.SWITCH_RESULT, ""); - taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap)); - taskInstance.setState(TaskExecutionStatus.FAILURE); - completeTaskList.put(3L, taskInstance); - postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(1, postNodes.size()); - Assertions.assertTrue(postNodes.contains(6L)); - Assertions.assertEquals(2, skipNodeList.size()); - Assertions.assertTrue(skipNodeList.containsKey(5L)); - Assertions.assertTrue(skipNodeList.containsKey(7L)); - - // dag: 1-2-3-5-7 4-3-6 - // 3-if , complete:1/2/3/4 - // 1.failure:3 expect post:6 skip:5/7 - dag = generateDag2(); - skipNodeList.clear(); - completeTaskList.clear(); - taskInstance.setSwitchDependency(getSwitchNode()); - completeTaskList.put(1L, taskInstance); - postNodes = DagHelper.parsePostNodes(1L, skipNodeList, dag, completeTaskList); - Assertions.assertEquals(1, postNodes.size()); } @Test diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java index 15141937b0..59b0d08fc5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java @@ -31,13 +31,9 @@ public class ConditionsParameters extends AbstractParameters { // depend node list and state, only need task name private List dependTaskList; - private DependentRelation dependRelation; + private DependentRelation relation; - // node list to run when success - private List successNode; - - // node list to run when failed - private List failedNode; + private boolean conditionSuccess; @Override public boolean checkParameters() { @@ -49,11 +45,11 @@ public class ConditionsParameters extends AbstractParameters { return new ArrayList<>(); } - public String getConditionResult() { - return "{" - + "\"successNode\": [\"" + successNode.get(0) - + "\"],\"failedNode\": [\"" + failedNode.get(0) - + "\"]}"; + @Data + public static class ConditionResult { + + private List successNode; + private List failedNode; } }