Browse Source

Fix condition task will cause workflow instance failed (#16152)

dev
Wenjun Ruan 6 months ago committed by GitHub
parent
commit
5df4b1cbc3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 44
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  2. 36
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  3. 29
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java
  4. 11
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java
  5. 9
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java
  6. 52
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java
  7. 20
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java
  8. 51
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java
  9. 18
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java

44
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -31,11 +31,15 @@ import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.commons.lang3.StringUtils;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -192,6 +196,9 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
private DependentParameters dependency;
@TableField(exist = false)
private ConditionsParameters conditionsParameters;
/**
* switch dependency
*/
@ -318,6 +325,43 @@ public class TaskInstance implements Serializable {
this.dependency = dependency;
}
public ConditionsParameters getConditionsParameters() {
if (this.conditionsParameters == null) {
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
this.conditionsParameters =
JSONUtils.parseObject((String) taskParamsMap.get(Constants.DEPENDENCE), ConditionsParameters.class);
}
return conditionsParameters;
}
public ConditionsParameters.ConditionResult getConditionResult() {
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
String conditionResult = (String) taskParamsMap.getOrDefault(Constants.CONDITION_RESULT, "");
if (StringUtils.isNotEmpty(conditionResult)) {
return JSONUtils.parseObject(conditionResult, new TypeReference<ConditionsParameters.ConditionResult>() {
});
}
return null;
}
public void setConditionResult(ConditionsParameters conditionsParameters) {
if (conditionsParameters == null) {
return;
}
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
if (taskParamsMap == null) {
taskParamsMap = new HashMap<>();
}
taskParamsMap.put(Constants.CONDITION_RESULT, JSONUtils.toJsonString(conditionsParameters));
this.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
}
public SwitchParameters getSwitchDependency() {
// todo: We need to directly use Jackson to deserialize the taskParam, rather than parse the map and get from
// field.

36
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -385,18 +385,19 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
// retry task
log.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().isFailure()) {
} else if (taskInstance.getState().isFailure() || taskInstance.getState().isKill()
|| taskInstance.getState().isStop()) {
completeTaskSet.add(taskInstance.getTaskCode());
ProjectUser projectUser =
processService.queryProjectWithUserByProcessInstanceId(workflowInstance.getId());
listenerEventAlertManager.publishTaskFailListenerEvent(workflowInstance, taskInstance, projectUser);
listenerEventAlertManager.publishTaskFailListenerEvent(workflowInstance, taskInstance);
if (isTaskNeedPutIntoErrorMap(taskInstance)) {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
}
// There are child nodes and the failure policy is: CONTINUE
if (workflowInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode(
taskInstance.getTaskCode(),
workflowExecuteContext.getWorkflowGraph().getDag())) {
submitPostNode(taskInstance.getTaskCode());
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
if (workflowInstance.getFailureStrategy() == FailureStrategy.END) {
killAllTasks();
}
@ -805,10 +806,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
completeTaskSet.add(task.getTaskCode());
continue;
}
if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(task.getTaskCode(),
workflowExecuteContext.getWorkflowGraph().getDag())) {
continue;
}
if (task.taskCanRetry()) {
if (task.getState().isNeedFaultTolerance()) {
log.info("TaskInstance needs fault tolerance, will be added to standby list.");
@ -824,7 +822,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
}
continue;
}
if (task.getState().isFailure()) {
if (isTaskNeedPutIntoErrorMap(task)) {
errorTaskMap.put(task.getTaskCode(), task.getId());
}
} finally {
@ -2015,6 +2013,24 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable {
}
}
/**
* Whether the task instance need to put into {@link #errorTaskMap}.
* Only the task instance is failed or killed, and it is parent of condition task.
* Then it should be put into {@link #errorTaskMap}.
* <p> Once a task instance is put into {@link #errorTaskMap}, it will be thought as failed and make the workflow be failed.
*/
private boolean isTaskNeedPutIntoErrorMap(TaskInstance taskInstance) {
if (!taskInstance.getState().isFailure() && !taskInstance.getState().isStop()
&& !taskInstance.getState().isKill()) {
return false;
}
TaskNode taskNode = workflowExecuteContext.getWorkflowGraph().getTaskNodeByCode(taskInstance.getTaskCode());
if (DagHelper.haveConditionsAfterNode(taskNode.getCode(), workflowExecuteContext.getWorkflowGraph().getDag())) {
return false;
}
return true;
}
private enum WorkflowRunnableStatus {
CREATED, INITIALIZE_QUEUE, STARTED,
;

29
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTask.java

@ -25,10 +25,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.DependentUtils;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.exception.LogicTaskInitializeException;
import org.apache.dolphinscheduler.server.master.runner.task.BaseSyncLogicTask;
import java.util.List;
@ -40,37 +38,34 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ConditionLogicTask extends BaseSyncLogicTask<DependentParameters> {
public class ConditionLogicTask extends BaseSyncLogicTask<ConditionsParameters> {
public static final String TASK_TYPE = "CONDITIONS";
private final TaskInstanceDao taskInstanceDao;
private final ProcessInstanceDao workflowInstanceDao;
private final TaskInstance taskInstance;
public ConditionLogicTask(TaskExecutionContext taskExecutionContext,
ProcessInstanceExecCacheManager processInstanceExecCacheManager,
TaskInstance taskInstance,
TaskInstanceDao taskInstanceDao,
ProcessInstanceDao workflowInstanceDao) throws LogicTaskInitializeException {
ProcessInstanceDao workflowInstanceDao) {
// todo: we need to change the parameter in front-end, so that we can directly use json to parse
super(taskExecutionContext,
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId())
.getTaskInstance(taskExecutionContext.getTaskInstanceId())
.orElseThrow(() -> new LogicTaskInitializeException(
"Cannot find the task instance in workflow execute runnable"))
.getDependency());
// todo:check the parameters, why we don't use conditionTask? taskInstance.getDependency();
super(taskExecutionContext, taskInstance.getConditionsParameters());
this.taskInstanceDao = taskInstanceDao;
this.workflowInstanceDao = workflowInstanceDao;
this.taskInstance = taskInstance;
}
@Override
public void handle() {
// calculate the conditionResult
DependResult conditionResult = calculateConditionResult();
TaskExecutionStatus taskExecutionStatus =
(conditionResult == DependResult.SUCCESS) ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE;
log.info("The condition result is {}, task instance statue will be: {}", conditionResult, taskExecutionStatus);
taskExecutionContext.setCurrentExecutionStatus(taskExecutionStatus);
log.info("The condition result is {}", conditionResult);
taskParameters.setConditionSuccess(conditionResult == DependResult.SUCCESS);
taskInstance.setConditionsParameters(taskParameters);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
}
private DependResult calculateConditionResult() {

11
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/condition/ConditionLogicTaskPluginFactory.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.task.condition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.repository.ProcessInstanceDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -43,7 +44,15 @@ public class ConditionLogicTaskPluginFactory implements ILogicTaskPluginFactory<
@Override
public ConditionLogicTask createLogicTask(TaskExecutionContext taskExecutionContext) throws LogicTaskInitializeException {
return new ConditionLogicTask(taskExecutionContext, processInstanceExecCacheManager, taskInstanceDao,
TaskInstance taskInstance =
processInstanceExecCacheManager.getByProcessInstanceId(taskExecutionContext.getProcessInstanceId())
.getTaskInstance(taskExecutionContext.getTaskInstanceId())
.orElseThrow(() -> new LogicTaskInitializeException(
"Cannot find the task instance in workflow execute runnable"));
return new ConditionLogicTask(
taskExecutionContext,
taskInstance,
taskInstanceDao,
processInstanceDao);
}

9
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManager.java

@ -43,6 +43,7 @@ import org.apache.dolphinscheduler.dao.entity.event.TaskFailListenerEvent;
import org.apache.dolphinscheduler.dao.entity.event.TaskStartListenerEvent;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.collections4.CollectionUtils;
@ -71,6 +72,9 @@ public class ListenerEventAlertManager {
@Autowired
private AlertPluginInstanceMapper alertPluginInstanceMapper;
@Autowired
private ProcessService processService;
public void publishServerDownListenerEvent(String host, String type) {
ServerDownListenerEvent event = new ServerDownListenerEvent();
event.setEventTime(new Date());
@ -214,8 +218,9 @@ public class ListenerEventAlertManager {
}
public void publishTaskFailListenerEvent(ProcessInstance processInstance,
TaskInstance taskInstance,
ProjectUser projectUser) {
TaskInstance taskInstance) {
ProjectUser projectUser = processService.queryProjectWithUserByProcessInstanceId(processInstance.getId());
TaskFailListenerEvent event = new TaskFailListenerEvent();
event.setProjectCode(projectUser.getProjectCode());
event.setProjectName(projectUser.getProjectName());

52
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/DagHelper.java

@ -76,10 +76,10 @@ public class DagHelper {
/**
* generate task nodes needed by dag
*
* @param taskNodeList taskNodeList
* @param startNodeNameList startNodeNameList
* @param taskNodeList taskNodeList
* @param startNodeNameList startNodeNameList
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param taskDependType taskDependType
* @param taskDependType taskDependType
* @return task node list
*/
public static List<TaskNode> generateFlowNodeListByStartNode(List<TaskNode> taskNodeList,
@ -139,7 +139,7 @@ public class DagHelper {
/**
* find all the nodes that depended on the start node
*
* @param startNode startNode
* @param startNode startNode
* @param taskNodeList taskNodeList
* @return task node list
*/
@ -166,9 +166,9 @@ public class DagHelper {
/**
* find all nodes that start nodes depend on.
*
* @param startNode startNode
* @param startNode startNode
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param taskNodeList taskNodeList
* @param taskNodeList taskNodeList
* @return task node list
*/
private static List<TaskNode> getFlowNodeListPre(TaskNode startNode,
@ -204,10 +204,10 @@ public class DagHelper {
/**
* generate dag by start nodes and recovery nodes
*
* @param totalTaskNodeList totalTaskNodeList
* @param startNodeNameList startNodeNameList
* @param totalTaskNodeList totalTaskNodeList
* @param startNodeNameList startNodeNameList
* @param recoveryNodeCodeList recoveryNodeCodeList
* @param depNodeType depNodeType
* @param depNodeType depNodeType
* @return process dag
* @throws Exception if error throws Exception
*/
@ -232,7 +232,7 @@ public class DagHelper {
* find node by node name
*
* @param nodeDetails nodeDetails
* @param nodeName nodeName
* @param nodeName nodeName
* @return task node
*/
public static TaskNode findNodeByName(List<TaskNode> nodeDetails, String nodeName) {
@ -248,7 +248,7 @@ public class DagHelper {
* find node by node code
*
* @param nodeDetails nodeDetails
* @param nodeCode nodeCode
* @param nodeCode nodeCode
* @return task node
*/
public static TaskNode findNodeByCode(List<TaskNode> nodeDetails, Long nodeCode) {
@ -263,8 +263,8 @@ public class DagHelper {
/**
* the task can be submit when all the depends nodes are forbidden or complete
*
* @param taskNode taskNode
* @param dag dag
* @param taskNode taskNode
* @param dag dag
* @param completeTaskList completeTaskList
* @return can submit
*/
@ -369,22 +369,20 @@ public class DagHelper {
return conditionTaskList;
}
TaskInstance taskInstance = completeTaskList.get(nodeCode);
ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskNode.getConditionResult(), ConditionsParameters.class);
ConditionsParameters conditionsParameters = taskInstance.getConditionsParameters();
ConditionsParameters.ConditionResult conditionResult = taskInstance.getConditionResult();
List<Long> skipNodeList = new ArrayList<>();
if (taskInstance.getState().isSuccess()) {
conditionTaskList = conditionsParameters.getSuccessNode();
skipNodeList = conditionsParameters.getFailedNode();
} else if (taskInstance.getState().isFailure()) {
conditionTaskList = conditionsParameters.getFailedNode();
skipNodeList = conditionsParameters.getSuccessNode();
if (conditionsParameters.isConditionSuccess()) {
conditionTaskList = conditionResult.getSuccessNode();
skipNodeList = conditionResult.getFailedNode();
} else {
conditionTaskList.add(nodeCode);
conditionTaskList = conditionResult.getFailedNode();
skipNodeList = conditionResult.getSuccessNode();
}
// the skipNodeList maybe null if no next task
skipNodeList = Optional.ofNullable(skipNodeList).orElse(new ArrayList<>());
for (Long failedNode : skipNodeList) {
setTaskNodeSkip(failedNode, dag, completeTaskList, skipTaskNodeList);
if (CollectionUtils.isNotEmpty(skipNodeList)) {
skipNodeList.forEach(skipNode -> setTaskNodeSkip(skipNode, dag, completeTaskList, skipTaskNodeList));
}
// the conditionTaskList maybe null if no next task
conditionTaskList = Optional.ofNullable(conditionTaskList).orElse(new ArrayList<>());
@ -447,6 +445,7 @@ public class DagHelper {
/**
* get all downstream nodes of the branch that the switch node needs to execute
*
* @param taskCode
* @param dag
* @param switchNeedWorkCodes
@ -480,6 +479,7 @@ public class DagHelper {
}
}
}
/**
* set task node and the post nodes skip flag
*/

20
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/alert/ListenerEventAlertManagerTest.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.service.alert;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@ -30,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ListenerEventMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.List;
@ -40,8 +42,6 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ProcessAlertManager Test
@ -49,8 +49,6 @@ import org.slf4j.LoggerFactory;
@ExtendWith(MockitoExtension.class)
public class ListenerEventAlertManagerTest {
private static final Logger logger = LoggerFactory.getLogger(ListenerEventAlertManagerTest.class);
@InjectMocks
ListenerEventAlertManager listenerEventAlertManager;
@ -60,6 +58,9 @@ public class ListenerEventAlertManagerTest {
@Mock
ListenerEventMapper listenerEventMapper;
@Mock
ProcessService processService;
@Test
public void sendServerDownListenerEventTest() {
String host = "127.0.0.1";
@ -67,7 +68,7 @@ public class ListenerEventAlertManagerTest {
List<AlertPluginInstance> globalPluginInstanceList = new ArrayList<>();
AlertPluginInstance instance = new AlertPluginInstance(1, "instanceParams", "instanceName");
globalPluginInstanceList.add(instance);
Mockito.when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
.thenReturn(globalPluginInstanceList);
Mockito.doNothing().when(listenerEventMapper).insertServerDownEvent(any(), any());
listenerEventAlertManager.publishServerDownListenerEvent(host, type);
@ -82,9 +83,9 @@ public class ListenerEventAlertManagerTest {
AlertPluginInstance instance = new AlertPluginInstance(1, "instanceParams", "instanceName");
List<AlertPluginInstance> globalPluginInstanceList = new ArrayList<>();
globalPluginInstanceList.add(instance);
Mockito.when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
when(alertPluginInstanceMapper.queryAllGlobalAlertPluginInstanceList())
.thenReturn(globalPluginInstanceList);
Mockito.when(listenerEventMapper.insert(any())).thenReturn(1);
when(listenerEventMapper.insert(any())).thenReturn(1);
listenerEventAlertManager.publishProcessDefinitionCreatedListenerEvent(user, processDefinition,
taskDefinitionLogs, processTaskRelationLogs);
}
@ -142,7 +143,8 @@ public class ListenerEventAlertManagerTest {
public void sendTaskFailListenerEvent() {
ProcessInstance processInstance = Mockito.mock(ProcessInstance.class);
TaskInstance taskInstance = Mockito.mock(TaskInstance.class);
ProjectUser projectUser = Mockito.mock(ProjectUser.class);
listenerEventAlertManager.publishTaskFailListenerEvent(processInstance, taskInstance, projectUser);
when(processService.queryProjectWithUserByProcessInstanceId(processInstance.getId()))
.thenReturn(new ProjectUser());
listenerEventAlertManager.publishTaskFailListenerEvent(processInstance, taskInstance);
}
}

51
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/utils/DagHelperTest.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.SwitchResultVo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.service.model.TaskNode;
import org.apache.dolphinscheduler.service.process.ProcessDag;
@ -259,9 +260,14 @@ public class DagHelperTest {
completeTaskList.put(1L, new TaskInstance());
completeTaskList.put(2L, new TaskInstance());
completeTaskList.put(4L, new TaskInstance());
TaskNode node3 = dag.getNode(3L);
node3.setType(TASK_TYPE_CONDITIONS);
node3.setConditionResult("{\n"
TaskInstance taskInstance3 = new TaskInstance();
taskInstance3.setTaskType(TASK_TYPE_CONDITIONS);
Map<String, Object> params = new HashMap<>();
ConditionsParameters conditionsParameters = new ConditionsParameters();
conditionsParameters.setConditionSuccess(true);
params.put(Constants.DEPENDENCE, "{\"conditionSuccess\": true}");
params.put(Constants.CONDITION_RESULT, "{\n"
+
" \"successNode\": [5\n"
+
@ -272,11 +278,12 @@ public class DagHelperTest {
" ]\n"
+
" }");
completeTaskList.remove(3L);
TaskInstance taskInstance = new TaskInstance();
taskInstance.setState(TaskExecutionStatus.SUCCESS);
taskInstance3.setTaskParams(JSONUtils.toJsonString(params));
taskInstance3.setState(TaskExecutionStatus.SUCCESS);
TaskNode node3 = dag.getNode(3L);
node3.setType(TASK_TYPE_CONDITIONS);
// complete 1/2/3/4 expect:8
completeTaskList.put(3L, taskInstance);
completeTaskList.put(3L, taskInstance3);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(1, postNodes.size());
Assertions.assertTrue(postNodes.contains(8L));
@ -291,7 +298,6 @@ public class DagHelperTest {
// 3.complete 1/2/3/4/5/8 expect post:7 skip:6
skipNodeList.clear();
TaskInstance taskInstance1 = new TaskInstance();
taskInstance.setState(TaskExecutionStatus.SUCCESS);
completeTaskList.put(5L, taskInstance1);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(1, postNodes.size());
@ -299,35 +305,6 @@ public class DagHelperTest {
Assertions.assertEquals(1, skipNodeList.size());
Assertions.assertTrue(skipNodeList.containsKey(6L));
// dag: 1-2-3-5-7 4-3-6
// 3-if , complete:1/2/3/4
// 1.failure:3 expect post:6 skip:5/7
skipNodeList.clear();
completeTaskList.remove(3L);
taskInstance = new TaskInstance();
Map<String, Object> taskParamsMap = new HashMap<>();
taskParamsMap.put(Constants.SWITCH_RESULT, "");
taskInstance.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
taskInstance.setState(TaskExecutionStatus.FAILURE);
completeTaskList.put(3L, taskInstance);
postNodes = DagHelper.parsePostNodes(null, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(1, postNodes.size());
Assertions.assertTrue(postNodes.contains(6L));
Assertions.assertEquals(2, skipNodeList.size());
Assertions.assertTrue(skipNodeList.containsKey(5L));
Assertions.assertTrue(skipNodeList.containsKey(7L));
// dag: 1-2-3-5-7 4-3-6
// 3-if , complete:1/2/3/4
// 1.failure:3 expect post:6 skip:5/7
dag = generateDag2();
skipNodeList.clear();
completeTaskList.clear();
taskInstance.setSwitchDependency(getSwitchNode());
completeTaskList.put(1L, taskInstance);
postNodes = DagHelper.parsePostNodes(1L, skipNodeList, dag, completeTaskList);
Assertions.assertEquals(1, postNodes.size());
}
@Test

18
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/ConditionsParameters.java

@ -31,13 +31,9 @@ public class ConditionsParameters extends AbstractParameters {
// depend node list and state, only need task name
private List<DependentTaskModel> dependTaskList;
private DependentRelation dependRelation;
private DependentRelation relation;
// node list to run when success
private List<Long> successNode;
// node list to run when failed
private List<Long> failedNode;
private boolean conditionSuccess;
@Override
public boolean checkParameters() {
@ -49,11 +45,11 @@ public class ConditionsParameters extends AbstractParameters {
return new ArrayList<>();
}
public String getConditionResult() {
return "{"
+ "\"successNode\": [\"" + successNode.get(0)
+ "\"],\"failedNode\": [\"" + failedNode.get(0)
+ "\"]}";
@Data
public static class ConditionResult {
private List<Long> successNode;
private List<Long> failedNode;
}
}

Loading…
Cancel
Save