diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java index 524d0d4a6d..cf9fc262ad 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskStateEventHandler.java @@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.remote.exceptions.RemotingException; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; -import java.util.Map; import java.util.Optional; +import java.util.Set; import lombok.extern.slf4j.Slf4j; @@ -55,7 +55,7 @@ public class TaskStateEventHandler implements StateEventHandler { "Handle task instance state event, the current task instance state {} will be changed to {}", task.getState().name(), taskStateEvent.getStatus().name()); - Map completeTaskMap = workflowExecuteRunnable.getCompleteTaskMap(); + Set completeTaskSet = workflowExecuteRunnable.getCompleteTaskCodes(); if (task.getState().isFinished() && (taskStateEvent.getStatus() != null && taskStateEvent.getStatus().isRunning())) { String errorMessage = String.format( @@ -67,8 +67,7 @@ public class TaskStateEventHandler implements StateEventHandler { } if (task.getState().isFinished()) { - if (completeTaskMap.containsKey(task.getTaskCode()) - && completeTaskMap.get(task.getTaskCode()).equals(task.getId())) { + if (completeTaskSet.contains(task.getTaskCode())) { log.warn("The task instance is already complete, stateEvent: {}", stateEvent); return true; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index 1564adbc14..88a1937b59 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -199,10 +199,10 @@ public class WorkflowExecuteRunnable implements Callable { private final Map errorTaskMap = new ConcurrentHashMap<>(); /** - * complete task map, taskCode as key, taskInstanceId as value + * complete task set * in a DAG, only one taskInstance per taskCode is valid */ - private final Map completeTaskMap = new ConcurrentHashMap<>(); + private final Set completeTaskSet = Sets.newConcurrentHashSet(); /** * depend failed task set @@ -444,7 +444,7 @@ public class WorkflowExecuteRunnable implements Callable { stateWheelExecuteThread.removeTask4RetryCheck(processInstance, taskInstance); if (taskInstance.getState().isSuccess()) { - completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + completeTaskSet.add(taskInstance.getTaskCode()); mergeTaskInstanceVarPool(taskInstance); processInstanceDao.upsertProcessInstance(processInstance); // save the cacheKey only if the task is defined as cache task and the task is success @@ -459,7 +459,7 @@ public class WorkflowExecuteRunnable implements Callable { log.info("Retry taskInstance taskInstance state: {}", taskInstance.getState()); retryTaskInstance(taskInstance); } else if (taskInstance.getState().isFailure()) { - completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + completeTaskSet.add(taskInstance.getTaskCode()); // There are child nodes and the failure policy is: CONTINUE if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE && DagHelper.haveAllNodeAfterNode( Long.toString(taskInstance.getTaskCode()), @@ -472,8 +472,8 @@ public class WorkflowExecuteRunnable implements Callable { } } } else if (taskInstance.getState().isFinished()) { - // todo: when the task instance type is pause, then it should not in completeTaskMap - completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); + // todo: when the task instance type is pause, then it should not in completeTaskSet + completeTaskSet.add(taskInstance.getTaskCode()); } log.info("TaskInstance finished will try to update the workflow instance state, task code:{} state:{}", taskInstance.getTaskCode(), @@ -482,9 +482,9 @@ public class WorkflowExecuteRunnable implements Callable { sendTaskLogOnMasterToRemoteIfNeeded(taskInstance.getLogPath(), taskInstance.getHost()); } catch (Exception ex) { - log.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskMap", ex); + log.error("Task finish failed, get a exception, will remove this taskInstance from completeTaskSet", ex); // remove the task from complete map, so that we can finish in the next time. - completeTaskMap.remove(taskInstance.getTaskCode()); + completeTaskSet.remove(taskInstance.getTaskCode()); throw ex; } } @@ -873,7 +873,7 @@ public class WorkflowExecuteRunnable implements Callable { // do we need to clear? taskExecuteRunnableMap.clear(); dependFailedTaskSet.clear(); - completeTaskMap.clear(); + completeTaskSet.clear(); errorTaskMap.clear(); if (!isNewProcessInstance()) { @@ -909,7 +909,7 @@ public class WorkflowExecuteRunnable implements Callable { if (task.isTaskComplete()) { log.info("TaskInstance is already complete."); - completeTaskMap.put(task.getTaskCode(), task.getId()); + completeTaskSet.add(task.getTaskCode()); continue; } if (task.isConditionsTask() || DagHelper.haveConditionsAfterNode(Long.toString(task.getTaskCode()), @@ -980,9 +980,9 @@ public class WorkflowExecuteRunnable implements Callable { } } } - log.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskMap: {}, errorTaskMap: {}", + log.info("Initialize task queue, dependFailedTaskSet: {}, completeTaskSet: {}, errorTaskMap: {}", dependFailedTaskSet, - completeTaskMap, + completeTaskSet, errorTaskMap); } @@ -1236,7 +1236,12 @@ public class WorkflowExecuteRunnable implements Callable { Map allTaskInstance = new HashMap<>(); if (CollectionUtils.isNotEmpty(preTask)) { for (String preTaskCode : preTask) { - Integer taskId = completeTaskMap.get(Long.parseLong(preTaskCode)); + Optional existTaskInstanceOptional = getTaskInstance(Long.parseLong(preTaskCode)); + if (!existTaskInstanceOptional.isPresent()) { + continue; + } + + Integer taskId = existTaskInstanceOptional.get().getId(); if (taskId == null) { continue; } @@ -1303,20 +1308,21 @@ public class WorkflowExecuteRunnable implements Callable { */ private Map getCompleteTaskInstanceMap() { Map completeTaskInstanceMap = new HashMap<>(); - for (Map.Entry entry : completeTaskMap.entrySet()) { - Long taskConde = entry.getKey(); - Integer taskInstanceId = entry.getValue(); - TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId); - if (taskInstance == null) { - log.warn("Cannot find the taskInstance from taskInstanceMap, taskInstanceId: {}, taskConde: {}", - taskInstanceId, - taskConde); - // This case will happen when we submit to db failed, then the taskInstanceId is 0 - continue; + + completeTaskSet.forEach(taskCode -> { + Optional existTaskInstanceOptional = getTaskInstance(taskCode); + if (existTaskInstanceOptional.isPresent()) { + TaskInstance taskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId()); + if (taskInstance == null) { + // This case will happen when we submit to db failed, then the taskInstanceId is 0 + log.warn("Cannot find the taskInstance from taskInstanceMap, taskConde: {}", + taskCode); + } else { + completeTaskInstanceMap.put(Long.toString(taskCode), taskInstance); + } } - completeTaskInstanceMap.put(Long.toString(taskInstance.getTaskCode()), taskInstance); + }); - } return completeTaskInstanceMap; } @@ -1364,17 +1370,21 @@ public class WorkflowExecuteRunnable implements Callable { } // the end node of the branch of the dag if (StringUtils.isNotEmpty(parentNodeCode) && dag.getEndNode().contains(parentNodeCode)) { - TaskInstance endTaskInstance = taskInstanceMap.get(completeTaskMap.get(NumberUtils.toLong(parentNodeCode))); - String taskInstanceVarPool = endTaskInstance.getVarPool(); - if (StringUtils.isNotEmpty(taskInstanceVarPool)) { - Set taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class)); - String processInstanceVarPool = processInstance.getVarPool(); - if (StringUtils.isNotEmpty(processInstanceVarPool)) { - Set properties = new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class)); - properties.addAll(taskProperties); - processInstance.setVarPool(JSONUtils.toJsonString(properties)); - } else { - processInstance.setVarPool(JSONUtils.toJsonString(taskProperties)); + Optional existTaskInstanceOptional = getTaskInstance(NumberUtils.toLong(parentNodeCode)); + if (existTaskInstanceOptional.isPresent()) { + TaskInstance endTaskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId()); + String taskInstanceVarPool = endTaskInstance.getVarPool(); + if (StringUtils.isNotEmpty(taskInstanceVarPool)) { + Set taskProperties = new HashSet<>(JSONUtils.toList(taskInstanceVarPool, Property.class)); + String processInstanceVarPool = processInstance.getVarPool(); + if (StringUtils.isNotEmpty(processInstanceVarPool)) { + Set properties = + new HashSet<>(JSONUtils.toList(processInstanceVarPool, Property.class)); + properties.addAll(taskProperties); + processInstance.setVarPool(JSONUtils.toJsonString(properties)); + } else { + processInstance.setVarPool(JSONUtils.toJsonString(taskProperties)); + } } } } @@ -1387,7 +1397,7 @@ public class WorkflowExecuteRunnable implements Callable { continue; } - if (task.getId() != null && completeTaskMap.containsKey(task.getTaskCode())) { + if (task.getId() != null && completeTaskSet.contains(task.getTaskCode())) { log.info("Task has already run success, taskName: {}", task.getName()); continue; } @@ -1459,12 +1469,18 @@ public class WorkflowExecuteRunnable implements Callable { for (String depsNode : indirectDepCodeList) { if (dag.containsNode(depsNode) && !skipTaskNodeMap.containsKey(depsNode)) { // dependencies must be fully completed - Long despNodeTaskCode = Long.parseLong(depsNode); - if (!completeTaskMap.containsKey(despNodeTaskCode)) { + long despNodeTaskCode = Long.parseLong(depsNode); + if (!completeTaskSet.contains(despNodeTaskCode)) { return DependResult.WAITING; } - Integer depsTaskId = completeTaskMap.get(despNodeTaskCode); - TaskExecutionStatus depTaskState = taskInstanceMap.get(depsTaskId).getState(); + + Optional existTaskInstanceOptional = getTaskInstance(despNodeTaskCode); + if (!existTaskInstanceOptional.isPresent()) { + return DependResult.NON_EXEC; + } + + TaskExecutionStatus depTaskState = + taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState(); if (depTaskState.isKill()) { return DependResult.NON_EXEC; } @@ -1484,7 +1500,7 @@ public class WorkflowExecuteRunnable implements Callable { } } log.info("The dependTasks of task all success, currentTaskCode: {}, dependTaskCodes: {}", - taskCode, Arrays.toString(completeTaskMap.keySet().toArray())); + taskCode, Arrays.toString(completeTaskSet.toArray())); return DependResult.SUCCESS; } @@ -1535,8 +1551,12 @@ public class WorkflowExecuteRunnable implements Callable { .contains(nextNodeCode); } long taskCode = Long.parseLong(dependNodeCode); - Integer taskInstanceId = completeTaskMap.get(taskCode); - TaskExecutionStatus depTaskState = taskInstanceMap.get(taskInstanceId).getState(); + Optional existTaskInstanceOptional = getTaskInstance(taskCode); + if (!existTaskInstanceOptional.isPresent()) { + return false; + } + + TaskExecutionStatus depTaskState = taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState(); return !depTaskState.isFailure(); } @@ -1548,12 +1568,17 @@ public class WorkflowExecuteRunnable implements Callable { */ private List getCompleteTaskByState(TaskExecutionStatus state) { List resultList = new ArrayList<>(); - for (Integer taskInstanceId : completeTaskMap.values()) { - TaskInstance taskInstance = taskInstanceMap.get(taskInstanceId); - if (taskInstance != null && taskInstance.getState() == state) { - resultList.add(taskInstance); + + completeTaskSet.forEach(taskCode -> { + Optional existTaskInstanceOptional = getTaskInstance(taskCode); + if (existTaskInstanceOptional.isPresent()) { + TaskInstance taskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId()); + if (taskInstance != null && taskInstance.getState() == state) { + resultList.add(taskInstance); + } } - } + }); + return resultList; } @@ -1905,7 +1930,7 @@ public class WorkflowExecuteRunnable implements Callable { "Task {} has been forced success, put it into complete task list and stop retrying, taskInstanceId: {}", task.getName(), task.getId()); removeTaskFromStandbyList(task); - completeTaskMap.put(task.getTaskCode(), task.getId()); + completeTaskSet.add(task.getTaskCode()); taskInstanceMap.put(task.getId(), task); submitPostNode(Long.toString(task.getTaskCode())); continue; @@ -1931,7 +1956,7 @@ public class WorkflowExecuteRunnable implements Callable { processInstance.getId(), task.getTaskCode()); } - completeTaskMap.put(task.getTaskCode(), task.getId()); + completeTaskSet.add(task.getTaskCode()); taskInstanceMap.put(task.getId(), task); errorTaskMap.put(task.getTaskCode(), task.getId()); @@ -2066,8 +2091,8 @@ public class WorkflowExecuteRunnable implements Callable { return false; } - public Map getCompleteTaskMap() { - return completeTaskMap; + public Set getCompleteTaskCodes() { + return completeTaskSet; } public Map getTaskExecuteRunnableMap() { @@ -2117,7 +2142,7 @@ public class WorkflowExecuteRunnable implements Callable { * 1. find all task code from sub dag (only contains related task) * 2. set the flag of tasks to Flag.NO * 3. clear varPool data from re-execute task instance in process instance - * 4. remove related task instance from taskInstanceMap, completeTaskMap, validTaskMap, errorTaskMap + * 4. remove related task instance from taskInstanceMap, completeTaskSet, validTaskMap, errorTaskMap * * @return task instance */ @@ -2176,9 +2201,14 @@ public class WorkflowExecuteRunnable implements Callable { processInstance.setVarPool(JSONUtils.toJsonString(processProperties)); processInstanceDao.updateById(processInstance); - // remove task instance from taskInstanceMap, completeTaskMap, validTaskMap, errorTaskMap + // remove task instance from taskInstanceMap, completeTaskSet, validTaskMap, errorTaskMap + // completeTaskSet remove dependency taskInstanceMap, so the sort can't change + completeTaskSet.removeIf(set -> { + Optional existTaskInstanceOptional = getTaskInstance(set); + return existTaskInstanceOptional + .filter(taskInstance -> dag.containsNode(Integer.toString(taskInstance.getId()))).isPresent(); + }); taskInstanceMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getValue().getTaskCode()))); - completeTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey()))); validTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey()))); errorTaskMap.entrySet().removeIf(map -> dag.containsNode(Long.toString(map.getKey()))); } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index f224ff36c6..26b637fd1c 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -69,6 +69,8 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.springframework.context.ApplicationContext; +import com.google.common.collect.Sets; + @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class WorkflowExecuteRunnableTest { @@ -207,15 +209,15 @@ public class WorkflowExecuteRunnableTest { taskInstanceMap.put(taskInstance1.getId(), taskInstance1); taskInstanceMap.put(taskInstance2.getId(), taskInstance2); - Map completeTaskList = new ConcurrentHashMap<>(); - completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId()); - completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId()); + Set completeTaskSet = Sets.newConcurrentHashSet(); + completeTaskSet.add(taskInstance1.getTaskCode()); + completeTaskSet.add(taskInstance2.getTaskCode()); Class masterExecThreadClass = WorkflowExecuteRunnable.class; - Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap"); - completeTaskMapField.setAccessible(true); - completeTaskMapField.set(workflowExecuteThread, completeTaskList); + Field completeTaskSetField = masterExecThreadClass.getDeclaredField("completeTaskSet"); + completeTaskSetField.setAccessible(true); + completeTaskSetField.set(workflowExecuteThread, completeTaskSet); Field taskInstanceMapField = masterExecThreadClass.getDeclaredField("taskInstanceMap"); taskInstanceMapField.setAccessible(true); @@ -225,10 +227,10 @@ public class WorkflowExecuteRunnableTest { Assertions.assertNotNull(taskInstance.getVarPool()); taskInstance2.setVarPool("[{\"direct\":\"OUT\",\"prop\":\"test1\",\"type\":\"VARCHAR\",\"value\":\"2\"}]"); - completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId()); + completeTaskSet.add(taskInstance2.getTaskCode()); - completeTaskMapField.setAccessible(true); - completeTaskMapField.set(workflowExecuteThread, completeTaskList); + completeTaskSetField.setAccessible(true); + completeTaskSetField.set(workflowExecuteThread, completeTaskSet); taskInstanceMapField.setAccessible(true); taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap); @@ -279,15 +281,15 @@ public class WorkflowExecuteRunnableTest { taskInstanceMap.put(taskInstance1.getId(), taskInstance1); taskInstanceMap.put(taskInstance2.getId(), taskInstance2); - Map completeTaskList = new ConcurrentHashMap<>(); - completeTaskList.put(taskInstance1.getTaskCode(), taskInstance1.getId()); - completeTaskList.put(taskInstance2.getTaskCode(), taskInstance2.getId()); + Set completeTaskSet = Sets.newConcurrentHashSet(); + completeTaskSet.add(taskInstance1.getTaskCode()); + completeTaskSet.add(taskInstance2.getTaskCode()); Class masterExecThreadClass = WorkflowExecuteRunnable.class; - Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskMap"); + Field completeTaskMapField = masterExecThreadClass.getDeclaredField("completeTaskSet"); completeTaskMapField.setAccessible(true); - completeTaskMapField.set(workflowExecuteThread, completeTaskList); + completeTaskMapField.set(workflowExecuteThread, completeTaskSet); Field taskInstanceMapField = masterExecThreadClass.getDeclaredField("taskInstanceMap"); taskInstanceMapField.setAccessible(true); @@ -318,7 +320,7 @@ public class WorkflowExecuteRunnableTest { workflowExecuteThread.clearDataIfExecuteTask(); Assertions.assertEquals(1, taskInstanceMap.size()); - Assertions.assertEquals(1, completeTaskList.size()); + Assertions.assertEquals(1, completeTaskSet.size()); }