diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index f7051dbabc..5caab6b709 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -180,6 +180,11 @@ public class WorkflowExecuteRunnable implements Callable { */ private final Map taskInstanceMap = new ConcurrentHashMap<>(); + /** + * task instance hash map, taskCode as key + */ + private final Map taskCodeInstanceMap = new ConcurrentHashMap<>(); + /** * TaskCode as Key, TaskExecuteRunnable as Value */ @@ -570,6 +575,7 @@ public class WorkflowExecuteRunnable implements Callable { } processService.packageTaskInstance(taskInstance, processInstance); taskInstanceMap.put(taskInstance.getId(), taskInstance); + taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance); validTaskMap.remove(taskInstance.getTaskCode()); if (Flag.YES == taskInstance.getFlag()) { @@ -617,15 +623,7 @@ public class WorkflowExecuteRunnable implements Callable { } public Optional getTaskInstance(long taskCode) { - if (taskInstanceMap.isEmpty()) { - return Optional.empty(); - } - for (TaskInstance taskInstance : taskInstanceMap.values()) { - if (taskInstance.getTaskCode() == taskCode) { - return Optional.of(taskInstance); - } - } - return Optional.empty(); + return Optional.ofNullable(taskCodeInstanceMap.get(taskCode)); } public Optional getActiveTaskInstanceByTaskCode(long taskCode) { @@ -901,6 +899,7 @@ public class WorkflowExecuteRunnable implements Callable { validTaskMap.put(task.getTaskCode(), task.getId()); taskInstanceMap.put(task.getId(), task); + taskCodeInstanceMap.put(task.getTaskCode(), task); if (task.isTaskComplete()) { log.info("TaskInstance is already complete."); @@ -1012,6 +1011,7 @@ public class WorkflowExecuteRunnable implements Callable { validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); taskInstanceMap.put(taskInstance.getId(), taskInstance); + taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance); taskExecuteRunnableMap.put(taskInstance.getTaskCode(), taskExecuteRunnable); // 3. acquire the task group. // if we use task group, then need to acquire the task group resource @@ -1307,14 +1307,12 @@ public class WorkflowExecuteRunnable implements Callable { completeTaskSet.forEach(taskCode -> { Optional existTaskInstanceOptional = getTaskInstance(taskCode); if (existTaskInstanceOptional.isPresent()) { - TaskInstance taskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId()); - if (taskInstance == null) { - // This case will happen when we submit to db failed, then the taskInstanceId is 0 - log.warn("Cannot find the taskInstance from taskInstanceMap, taskConde: {}", - taskCode); - } else { - completeTaskInstanceMap.put(Long.toString(taskCode), taskInstance); - } + TaskInstance taskInstance = existTaskInstanceOptional.get(); + completeTaskInstanceMap.put(Long.toString(taskCode), taskInstance); + } else { + // This case will happen when we submit to db failed, then the taskInstanceId is 0 + log.warn("Cannot find the taskInstance from taskInstanceMap, taskConde: {}", + taskCode); } }); @@ -1436,6 +1434,7 @@ public class WorkflowExecuteRunnable implements Callable { defaultTaskExecuteRunnableFactory.createTaskExecuteRunnable(taskInstance)); taskInstanceMap.put(taskInstance.getId(), taskInstance); + taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance); stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance); stateWheelExecuteThread.addTask4RetryCheck(processInstance, taskInstance); return true; @@ -1552,7 +1551,7 @@ public class WorkflowExecuteRunnable implements Callable { return false; } - TaskExecutionStatus depTaskState = taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState(); + TaskExecutionStatus depTaskState = existTaskInstanceOptional.get().getState(); return !depTaskState.isFailure(); } @@ -1568,8 +1567,8 @@ public class WorkflowExecuteRunnable implements Callable { completeTaskSet.forEach(taskCode -> { Optional existTaskInstanceOptional = getTaskInstance(taskCode); if (existTaskInstanceOptional.isPresent()) { - TaskInstance taskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId()); - if (taskInstance != null && taskInstance.getState() == state) { + TaskInstance taskInstance = existTaskInstanceOptional.get(); + if (taskInstance.getState() == state) { resultList.add(taskInstance); } } @@ -1928,6 +1927,7 @@ public class WorkflowExecuteRunnable implements Callable { removeTaskFromStandbyList(task); completeTaskSet.add(task.getTaskCode()); taskInstanceMap.put(task.getId(), task); + taskCodeInstanceMap.put(task.getTaskCode(), task); submitPostNode(Long.toString(task.getTaskCode())); continue; } @@ -1954,6 +1954,7 @@ public class WorkflowExecuteRunnable implements Callable { } completeTaskSet.add(task.getTaskCode()); taskInstanceMap.put(task.getId(), task); + taskCodeInstanceMap.put(task.getTaskCode(), task); errorTaskMap.put(task.getTaskCode(), task.getId()); taskExecuteRunnableMap.remove(task.getTaskCode()); diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java index 26b637fd1c..332a2617d7 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java @@ -209,6 +209,10 @@ public class WorkflowExecuteRunnableTest { taskInstanceMap.put(taskInstance1.getId(), taskInstance1); taskInstanceMap.put(taskInstance2.getId(), taskInstance2); + Map taskCodeInstanceMap = new ConcurrentHashMap<>(); + taskCodeInstanceMap.put(taskInstance1.getTaskCode(), taskInstance1); + taskCodeInstanceMap.put(taskInstance2.getTaskCode(), taskInstance2); + Set completeTaskSet = Sets.newConcurrentHashSet(); completeTaskSet.add(taskInstance1.getTaskCode()); completeTaskSet.add(taskInstance2.getTaskCode()); @@ -223,6 +227,10 @@ public class WorkflowExecuteRunnableTest { taskInstanceMapField.setAccessible(true); taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap); + Field taskCodeInstanceMapField = masterExecThreadClass.getDeclaredField("taskCodeInstanceMap"); + taskCodeInstanceMapField.setAccessible(true); + taskCodeInstanceMapField.set(workflowExecuteThread, taskCodeInstanceMap); + workflowExecuteThread.getPreVarPool(taskInstance, preTaskName); Assertions.assertNotNull(taskInstance.getVarPool()); @@ -281,6 +289,10 @@ public class WorkflowExecuteRunnableTest { taskInstanceMap.put(taskInstance1.getId(), taskInstance1); taskInstanceMap.put(taskInstance2.getId(), taskInstance2); + Map taskCodeInstanceMap = new ConcurrentHashMap<>(); + taskCodeInstanceMap.put(taskInstance1.getTaskCode(), taskInstance1); + taskCodeInstanceMap.put(taskInstance2.getTaskCode(), taskInstance2); + Set completeTaskSet = Sets.newConcurrentHashSet(); completeTaskSet.add(taskInstance1.getTaskCode()); completeTaskSet.add(taskInstance2.getTaskCode()); @@ -295,6 +307,10 @@ public class WorkflowExecuteRunnableTest { taskInstanceMapField.setAccessible(true); taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap); + Field taskCodeInstanceMapField = masterExecThreadClass.getDeclaredField("taskCodeInstanceMap"); + taskCodeInstanceMapField.setAccessible(true); + taskCodeInstanceMapField.set(workflowExecuteThread, taskCodeInstanceMap); + Mockito.when(processInstance.getCommandType()).thenReturn(CommandType.EXECUTE_TASK); Mockito.when(processInstance.getId()).thenReturn(123);