Browse Source

[Bug] [Mater] Not got latest task intance by task code (#14529)

* add task code->instance map

---------

Co-authored-by: Eric Gao <ericgao.apache@gmail.com>
3.2.1-prepare
旺阳 1 year ago committed by GitHub
parent
commit
c812bf9d49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 35
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java
  2. 16
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

35
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java

@ -180,6 +180,11 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
*/ */
private final Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>(); private final Map<Integer, TaskInstance> taskInstanceMap = new ConcurrentHashMap<>();
/**
* task instance hash map, taskCode as key
*/
private final Map<Long, TaskInstance> taskCodeInstanceMap = new ConcurrentHashMap<>();
/** /**
* TaskCode as Key, TaskExecuteRunnable as Value * TaskCode as Key, TaskExecuteRunnable as Value
*/ */
@ -570,6 +575,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
} }
processService.packageTaskInstance(taskInstance, processInstance); processService.packageTaskInstance(taskInstance, processInstance);
taskInstanceMap.put(taskInstance.getId(), taskInstance); taskInstanceMap.put(taskInstance.getId(), taskInstance);
taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance);
validTaskMap.remove(taskInstance.getTaskCode()); validTaskMap.remove(taskInstance.getTaskCode());
if (Flag.YES == taskInstance.getFlag()) { if (Flag.YES == taskInstance.getFlag()) {
@ -617,15 +623,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
} }
public Optional<TaskInstance> getTaskInstance(long taskCode) { public Optional<TaskInstance> getTaskInstance(long taskCode) {
if (taskInstanceMap.isEmpty()) { return Optional.ofNullable(taskCodeInstanceMap.get(taskCode));
return Optional.empty();
}
for (TaskInstance taskInstance : taskInstanceMap.values()) {
if (taskInstance.getTaskCode() == taskCode) {
return Optional.of(taskInstance);
}
}
return Optional.empty();
} }
public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long taskCode) { public Optional<TaskInstance> getActiveTaskInstanceByTaskCode(long taskCode) {
@ -901,6 +899,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
validTaskMap.put(task.getTaskCode(), task.getId()); validTaskMap.put(task.getTaskCode(), task.getId());
taskInstanceMap.put(task.getId(), task); taskInstanceMap.put(task.getId(), task);
taskCodeInstanceMap.put(task.getTaskCode(), task);
if (task.isTaskComplete()) { if (task.isTaskComplete()) {
log.info("TaskInstance is already complete."); log.info("TaskInstance is already complete.");
@ -1012,6 +1011,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId()); validTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
taskInstanceMap.put(taskInstance.getId(), taskInstance); taskInstanceMap.put(taskInstance.getId(), taskInstance);
taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance);
taskExecuteRunnableMap.put(taskInstance.getTaskCode(), taskExecuteRunnable); taskExecuteRunnableMap.put(taskInstance.getTaskCode(), taskExecuteRunnable);
// 3. acquire the task group. // 3. acquire the task group.
// if we use task group, then need to acquire the task group resource // if we use task group, then need to acquire the task group resource
@ -1307,14 +1307,12 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
completeTaskSet.forEach(taskCode -> { completeTaskSet.forEach(taskCode -> {
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskCode); Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskCode);
if (existTaskInstanceOptional.isPresent()) { if (existTaskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId()); TaskInstance taskInstance = existTaskInstanceOptional.get();
if (taskInstance == null) { completeTaskInstanceMap.put(Long.toString(taskCode), taskInstance);
} else {
// This case will happen when we submit to db failed, then the taskInstanceId is 0 // This case will happen when we submit to db failed, then the taskInstanceId is 0
log.warn("Cannot find the taskInstance from taskInstanceMap, taskConde: {}", log.warn("Cannot find the taskInstance from taskInstanceMap, taskConde: {}",
taskCode); taskCode);
} else {
completeTaskInstanceMap.put(Long.toString(taskCode), taskInstance);
}
} }
}); });
@ -1436,6 +1434,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
defaultTaskExecuteRunnableFactory.createTaskExecuteRunnable(taskInstance)); defaultTaskExecuteRunnableFactory.createTaskExecuteRunnable(taskInstance));
taskInstanceMap.put(taskInstance.getId(), taskInstance); taskInstanceMap.put(taskInstance.getId(), taskInstance);
taskCodeInstanceMap.put(taskInstance.getTaskCode(), taskInstance);
stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance); stateWheelExecuteThread.addTask4TimeoutCheck(processInstance, taskInstance);
stateWheelExecuteThread.addTask4RetryCheck(processInstance, taskInstance); stateWheelExecuteThread.addTask4RetryCheck(processInstance, taskInstance);
return true; return true;
@ -1552,7 +1551,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
return false; return false;
} }
TaskExecutionStatus depTaskState = taskInstanceMap.get(existTaskInstanceOptional.get().getId()).getState(); TaskExecutionStatus depTaskState = existTaskInstanceOptional.get().getState();
return !depTaskState.isFailure(); return !depTaskState.isFailure();
} }
@ -1568,8 +1567,8 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
completeTaskSet.forEach(taskCode -> { completeTaskSet.forEach(taskCode -> {
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskCode); Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(taskCode);
if (existTaskInstanceOptional.isPresent()) { if (existTaskInstanceOptional.isPresent()) {
TaskInstance taskInstance = taskInstanceMap.get(existTaskInstanceOptional.get().getId()); TaskInstance taskInstance = existTaskInstanceOptional.get();
if (taskInstance != null && taskInstance.getState() == state) { if (taskInstance.getState() == state) {
resultList.add(taskInstance); resultList.add(taskInstance);
} }
} }
@ -1928,6 +1927,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
removeTaskFromStandbyList(task); removeTaskFromStandbyList(task);
completeTaskSet.add(task.getTaskCode()); completeTaskSet.add(task.getTaskCode());
taskInstanceMap.put(task.getId(), task); taskInstanceMap.put(task.getId(), task);
taskCodeInstanceMap.put(task.getTaskCode(), task);
submitPostNode(Long.toString(task.getTaskCode())); submitPostNode(Long.toString(task.getTaskCode()));
continue; continue;
} }
@ -1954,6 +1954,7 @@ public class WorkflowExecuteRunnable implements Callable<WorkflowSubmitStatus> {
} }
completeTaskSet.add(task.getTaskCode()); completeTaskSet.add(task.getTaskCode());
taskInstanceMap.put(task.getId(), task); taskInstanceMap.put(task.getId(), task);
taskCodeInstanceMap.put(task.getTaskCode(), task);
errorTaskMap.put(task.getTaskCode(), task.getId()); errorTaskMap.put(task.getTaskCode(), task.getId());
taskExecuteRunnableMap.remove(task.getTaskCode()); taskExecuteRunnableMap.remove(task.getTaskCode());

16
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnableTest.java

@ -209,6 +209,10 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMap.put(taskInstance1.getId(), taskInstance1); taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
taskInstanceMap.put(taskInstance2.getId(), taskInstance2); taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
Map<Long, TaskInstance> taskCodeInstanceMap = new ConcurrentHashMap<>();
taskCodeInstanceMap.put(taskInstance1.getTaskCode(), taskInstance1);
taskCodeInstanceMap.put(taskInstance2.getTaskCode(), taskInstance2);
Set<Long> completeTaskSet = Sets.newConcurrentHashSet(); Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
completeTaskSet.add(taskInstance1.getTaskCode()); completeTaskSet.add(taskInstance1.getTaskCode());
completeTaskSet.add(taskInstance2.getTaskCode()); completeTaskSet.add(taskInstance2.getTaskCode());
@ -223,6 +227,10 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMapField.setAccessible(true); taskInstanceMapField.setAccessible(true);
taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap); taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
Field taskCodeInstanceMapField = masterExecThreadClass.getDeclaredField("taskCodeInstanceMap");
taskCodeInstanceMapField.setAccessible(true);
taskCodeInstanceMapField.set(workflowExecuteThread, taskCodeInstanceMap);
workflowExecuteThread.getPreVarPool(taskInstance, preTaskName); workflowExecuteThread.getPreVarPool(taskInstance, preTaskName);
Assertions.assertNotNull(taskInstance.getVarPool()); Assertions.assertNotNull(taskInstance.getVarPool());
@ -281,6 +289,10 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMap.put(taskInstance1.getId(), taskInstance1); taskInstanceMap.put(taskInstance1.getId(), taskInstance1);
taskInstanceMap.put(taskInstance2.getId(), taskInstance2); taskInstanceMap.put(taskInstance2.getId(), taskInstance2);
Map<Long, TaskInstance> taskCodeInstanceMap = new ConcurrentHashMap<>();
taskCodeInstanceMap.put(taskInstance1.getTaskCode(), taskInstance1);
taskCodeInstanceMap.put(taskInstance2.getTaskCode(), taskInstance2);
Set<Long> completeTaskSet = Sets.newConcurrentHashSet(); Set<Long> completeTaskSet = Sets.newConcurrentHashSet();
completeTaskSet.add(taskInstance1.getTaskCode()); completeTaskSet.add(taskInstance1.getTaskCode());
completeTaskSet.add(taskInstance2.getTaskCode()); completeTaskSet.add(taskInstance2.getTaskCode());
@ -295,6 +307,10 @@ public class WorkflowExecuteRunnableTest {
taskInstanceMapField.setAccessible(true); taskInstanceMapField.setAccessible(true);
taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap); taskInstanceMapField.set(workflowExecuteThread, taskInstanceMap);
Field taskCodeInstanceMapField = masterExecThreadClass.getDeclaredField("taskCodeInstanceMap");
taskCodeInstanceMapField.setAccessible(true);
taskCodeInstanceMapField.set(workflowExecuteThread, taskCodeInstanceMap);
Mockito.when(processInstance.getCommandType()).thenReturn(CommandType.EXECUTE_TASK); Mockito.when(processInstance.getCommandType()).thenReturn(CommandType.EXECUTE_TASK);
Mockito.when(processInstance.getId()).thenReturn(123); Mockito.when(processInstance.getId()).thenReturn(123);

Loading…
Cancel
Save