diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java index 53ad4e6ef7..c54aa5aff8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java @@ -79,6 +79,7 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple taskDispatcher.dispatchTask(defaultTaskExecuteRunnable); DISPATCHED_TIMES.set(0); } catch (Exception e) { + defaultTaskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes(); globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(defaultTaskExecuteRunnable); if (DISPATCHED_TIMES.incrementAndGet() > MAX_DISPATCHED_FAILED_TIMES) { ThreadUtils.sleep(10 * 1000L); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java index b78762b21d..2e23feb7bd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnable.java @@ -56,6 +56,13 @@ public abstract class PriorityTaskExecuteRunnable implements TaskExecuteRunnable @Override public int compareTo(@NotNull TaskExecuteRunnable other) { + // the smaller dispatch fail times, the higher priority + int dispatchFailTimesCompareResult = taskExecutionContext.getDispatchFailTimes() + - other.getTaskExecutionContext().getDispatchFailTimes(); + if (dispatchFailTimesCompareResult != 0) { + return dispatchFailTimesCompareResult; + } + int workflowInstancePriorityCompareResult = workflowInstance.getProcessInstancePriority().getCode() - other.getWorkflowInstance().getProcessInstancePriority().getCode(); if (workflowInstancePriorityCompareResult != 0) { @@ -67,7 +74,7 @@ public abstract class PriorityTaskExecuteRunnable implements TaskExecuteRunnable } int taskInstancePriorityCompareResult = taskInstance.getTaskInstancePriority().getCode() - other.getTaskInstance().getTaskInstancePriority().getCode(); - if (taskInstancePriorityCompareResult > 0) { + if (taskInstancePriorityCompareResult != 0) { return taskInstancePriorityCompareResult; } // larger number, higher priority diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java new file mode 100644 index 0000000000..979a467148 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/execute/PriorityTaskExecuteRunnableTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.execute; + +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.runner.operator.TaskOperatorManager; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class PriorityTaskExecuteRunnableTest { + + @Test + public void testCompareTo() { + TaskOperatorManager taskOperatorManager = new TaskOperatorManager(); + + ProcessInstance workflowInstance = new ProcessInstance(); + workflowInstance.setId(1); + workflowInstance.setProcessInstancePriority(Priority.HIGH); + + TaskInstance t1 = new TaskInstance(); + t1.setId(1); + t1.setTaskInstancePriority(Priority.HIGH); + + TaskInstance t2 = new TaskInstance(); + t2.setId(1); + t2.setTaskInstancePriority(Priority.HIGH); + + TaskExecutionContext context1 = new TaskExecutionContext(); + TaskExecutionContext context2 = new TaskExecutionContext(); + PriorityTaskExecuteRunnable p1 = + new DefaultTaskExecuteRunnable(workflowInstance, t1, context1, taskOperatorManager); + PriorityTaskExecuteRunnable p2 = + new DefaultTaskExecuteRunnable(workflowInstance, t2, context2, taskOperatorManager); + + Assertions.assertEquals(0, p1.compareTo(p2)); + + // the higher priority, the higher priority + t2.setTaskInstancePriority(Priority.MEDIUM); + Assertions.assertTrue(p1.compareTo(p2) < 0); + + // the smaller dispatch fail times, the higher priority + context1.setDispatchFailTimes(1); + Assertions.assertTrue(p1.compareTo(p2) > 0); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index de9a727567..61f61fbf9c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -262,4 +262,13 @@ public class TaskExecutionContext implements Serializable { private int testFlag; private boolean logBufferEnable; + + /** + * dispatch fail times + */ + private int dispatchFailTimes; + + public void increaseDispatchFailTimes() { + this.dispatchFailTimes++; + } }