diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 8e20f92ead..8f99f1bb93 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -84,7 +84,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -642,6 +641,18 @@ public class WorkflowExecuteThread { return null; } + public TaskInstance getTaskInstance(long taskCode) { + if (taskInstanceMap == null || taskInstanceMap.size() == 0) { + return null; + } + for (TaskInstance taskInstance : taskInstanceMap.values()) { + if (taskInstance.getTaskCode() == taskCode) { + return taskInstance; + } + } + return null; + } + public TaskInstance getActiveTaskInstanceByTaskCode(long taskCode) { if (activeTaskProcessorMaps.containsKey(taskCode)) { return activeTaskProcessorMaps.get(taskCode).taskInstance(); @@ -1281,7 +1292,9 @@ public class WorkflowExecuteThread { List taskInstances = new ArrayList<>(); for (String taskNode : submitTaskNodeList) { TaskNode taskNodeObject = dag.getNode(taskNode); - if (checkTaskInstanceByCode(taskNodeObject.getCode())) { + TaskInstance existTaskInstance = getTaskInstance(taskNodeObject.getCode()); + if (existTaskInstance != null) { + taskInstances.add(existTaskInstance); continue; } TaskInstance task = createTaskInstance(processInstance, taskNodeObject); @@ -1693,12 +1706,6 @@ public class WorkflowExecuteThread { logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode()); return; } - // need to check if the tasks with same task code is active - boolean active = hadNotFailTask(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); - if (active) { - logger.warn("task was found in active task list, task code:{}", taskInstance.getTaskCode()); - return; - } logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}", taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode()); readyToSubmitTaskQueue.put(taskInstance); @@ -1950,25 +1957,4 @@ public class WorkflowExecuteThread { } } - /** - * check if had not fail task by taskCode and version - * - * @param taskCode - * @param version - * @return - */ - private boolean hadNotFailTask(long taskCode, int version) { - boolean result = false; - for (Entry entry : taskInstanceMap.entrySet()) { - TaskInstance taskInstance = entry.getValue(); - if (taskInstance.getTaskCode() == taskCode && taskInstance.getTaskDefinitionVersion() == version) { - if (!taskInstance.getState().typeIsFailure()) { - result = true; - break; - } - } - } - return result; - } - } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 7fee2416df..1b1421ea6b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -899,7 +899,7 @@ public class ProcessServiceImpl implements ProcessService { } else { processInstance = this.findProcessInstanceDetailById(processInstanceId); if (processInstance == null) { - return processInstance; + return null; } } if (cmdParam != null) { @@ -1482,12 +1482,13 @@ public class ProcessServiceImpl implements ProcessService { @Override public TaskInstance submitTaskInstanceToDB(TaskInstance taskInstance, ProcessInstance processInstance) { ExecutionStatus processInstanceState = processInstance.getState(); - if (processInstanceState.typeIsFinished() - || processInstanceState == ExecutionStatus.READY_PAUSE - || processInstanceState == ExecutionStatus.READY_STOP) { + if (processInstanceState.typeIsFinished() || processInstanceState == ExecutionStatus.READY_STOP) { logger.warn("processInstance {} was {}, skip submit task", processInstance.getProcessDefinitionCode(), processInstanceState); return null; } + if (processInstanceState == ExecutionStatus.READY_PAUSE) { + taskInstance.setState(ExecutionStatus.PAUSE); + } taskInstance.setExecutorId(processInstance.getExecutorId()); taskInstance.setProcessInstancePriority(processInstance.getProcessInstancePriority()); taskInstance.setState(getSubmitTaskState(taskInstance, processInstance));