diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 92862ae619..085c1d569d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -76,7 +76,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -1172,35 +1171,13 @@ public class WorkflowExecuteThread implements Runnable { * @param taskInstance task instance */ private void addTaskToStandByList(TaskInstance taskInstance) { + logger.info("add task to stand by list, task name: {} , task id:{}", taskInstance.getName(), taskInstance.getId()); try { - if (readyToSubmitTaskQueue.contains(taskInstance)) { - logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode()); - return; + if (!readyToSubmitTaskQueue.contains(taskInstance)) { + readyToSubmitTaskQueue.put(taskInstance); } - // need to check if the tasks with same task code is active - boolean active = false; - Map taskInstanceMap = taskInstanceHashMap.column(taskInstance.getTaskCode()); - if (taskInstanceMap != null && taskInstanceMap.size() > 0) { - for (Entry entry : taskInstanceMap.entrySet()) { - Integer taskInstanceId = entry.getKey(); - if (activeTaskProcessorMaps.containsKey(taskInstanceId)) { - TaskInstance latestTaskInstance = processService.findTaskInstanceById(taskInstanceId); - if (latestTaskInstance != null && !latestTaskInstance.getState().typeIsFailure()) { - active = true; - break; - } - } - } - } - if (active) { - logger.warn("task was found in active task list, task code:{}", taskInstance.getTaskCode()); - return; - } - logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}", - taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode()); - readyToSubmitTaskQueue.put(taskInstance); } catch (Exception e) { - logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e); + logger.error("add task instance to readyToSubmitTaskQueue, taskName: {}, task id: {}", taskInstance.getName(), taskInstance.getId(), e); } }