diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 2d83f9a8e4..d8d8720b7f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -81,6 +81,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -1375,13 +1376,22 @@ public class WorkflowExecuteThread { * @param taskInstance task instance */ private void addTaskToStandByList(TaskInstance taskInstance) { - logger.info("add task to stand by list: {}", taskInstance.getName()); try { - if (!readyToSubmitTaskQueue.contains(taskInstance)) { - readyToSubmitTaskQueue.put(taskInstance); + if (readyToSubmitTaskQueue.contains(taskInstance)) { + logger.warn("task was found in ready submit queue, task code:{}", taskInstance.getTaskCode()); + return; + } + // need to check if the tasks with same task code is active + boolean active = hadNotFailTask(taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()); + if (active) { + logger.warn("task was found in active task list, task code:{}", taskInstance.getTaskCode()); + return; } + logger.info("add task to stand by list, task name:{}, task id:{}, task code:{}", + taskInstance.getName(), taskInstance.getId(), taskInstance.getTaskCode()); + readyToSubmitTaskQueue.put(taskInstance); } catch (Exception e) { - logger.error("add task instance to readyToSubmitTaskQueue error, taskName: {}", taskInstance.getName(), e); + logger.error("add task instance to readyToSubmitTaskQueue, taskName:{}, task id:{}", taskInstance.getName(), taskInstance.getId(), e); } } @@ -1626,4 +1636,25 @@ public class WorkflowExecuteThread { return false; } } + + /** + * check if had not fail task by taskCode and version + * @param taskCode + * @param version + * @return + */ + private boolean hadNotFailTask(long taskCode, int version) { + boolean result = false; + for (Entry entry : taskInstanceMap.entrySet()) { + TaskInstance taskInstance = entry.getValue(); + if (taskInstance.getTaskCode() == taskCode && taskInstance.getTaskDefinitionVersion() == version) { + if (!taskInstance.getState().typeIsFailure()) { + result = true; + break; + } + } + } + return result; + } + }