From 44c356db4dda185e585de7727316e756002bb720 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Thu, 18 Jan 2024 10:59:54 +0800 Subject: [PATCH] TaskGroupPriority only compare When TaskGroup is same (#15486) --- .../runner/WorkflowExecuteRunnable.java | 33 ++++++++--------- ... => StandByTaskInstancePriorityQueue.java} | 35 ++++++++++--------- ...StandByTaskInstancePriorityQueueTest.java} | 29 +++++++-------- 3 files changed, 50 insertions(+), 47 deletions(-) rename dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/{PeerTaskInstancePriorityQueue.java => StandByTaskInstancePriorityQueue.java} (79%) rename dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/{PeerTaskInstancePriorityQueueTest.java => StandByTaskInstancePriorityQueueTest.java} (84%) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java index b558d6cf37..3759f2209c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java @@ -91,7 +91,7 @@ import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.expand.CuringParamsService; import org.apache.dolphinscheduler.service.model.TaskNode; import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; +import org.apache.dolphinscheduler.service.queue.StandByTaskInstancePriorityQueue; import org.apache.dolphinscheduler.service.utils.DagHelper; import org.apache.commons.collections4.CollectionUtils; @@ -208,7 +208,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { /** * The StandBy task list, will be executed, need to know, the taskInstance in this queue may doesn't have id. */ - private final PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue(); + private final StandByTaskInstancePriorityQueue standByTaskInstancePriorityQueue = + new StandByTaskInstancePriorityQueue(); /** * wait to retry taskInstance map, taskCode as key, taskInstance as value @@ -249,7 +250,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { this.taskInstanceDao = taskInstanceDao; this.defaultTaskExecuteRunnableFactory = defaultTaskExecuteRunnableFactory; this.listenerEventAlertManager = listenerEventAlertManager; - TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size); + TaskMetrics.registerTaskPrepared(standByTaskInstancePriorityQueue::size); } /** @@ -1430,7 +1431,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { // if previous node success , post node submit for (TaskInstance task : taskInstances) { - if (readyToSubmitTaskQueue.contains(task)) { + if (standByTaskInstancePriorityQueue.contains(task)) { log.warn("Task is already at submit queue, taskInstanceName: {}", task.getName()); continue; } @@ -1665,7 +1666,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { return true; } if (workflowInstance.getFailureStrategy() == FailureStrategy.CONTINUE) { - return readyToSubmitTaskQueue.size() == 0 && taskExecuteRunnableMap.size() == 0 + return standByTaskInstancePriorityQueue.size() == 0 && taskExecuteRunnableMap.size() == 0 && waitToRetryTaskInstanceMap.size() == 0; } } @@ -1688,7 +1689,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { List pauseList = getCompleteTaskByState(TaskExecutionStatus.PAUSE); if (CollectionUtils.isNotEmpty(pauseList) || workflowInstance.isBlocked() || !isComplementEnd() - || readyToSubmitTaskQueue.size() > 0) { + || standByTaskInstancePriorityQueue.size() > 0) { return WorkflowExecutionStatus.PAUSE; } else { return WorkflowExecutionStatus.SUCCESS; @@ -1711,8 +1712,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { } } } - if (readyToSubmitTaskQueue.size() > 0) { - for (Iterator iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) { + if (standByTaskInstancePriorityQueue.size() > 0) { + for (Iterator iter = standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) { iter.next().setState(TaskExecutionStatus.PAUSE); } } @@ -1773,7 +1774,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { // success if (state == WorkflowExecutionStatus.RUNNING_EXECUTION) { List killTasks = getCompleteTaskByState(TaskExecutionStatus.KILL); - if (readyToSubmitTaskQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) { + if (standByTaskInstancePriorityQueue.size() > 0 || waitToRetryTaskInstanceMap.size() > 0) { // tasks currently pending submission, no retries, indicating that depend is waiting to complete return WorkflowExecutionStatus.RUNNING_EXECUTION; } else if (CollectionUtils.isNotEmpty(killTasks)) { @@ -1878,7 +1879,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { * @param taskInstance task instance */ public void addTaskToStandByList(TaskInstance taskInstance) { - if (readyToSubmitTaskQueue.contains(taskInstance)) { + if (standByTaskInstancePriorityQueue.contains(taskInstance)) { log.warn("Task already exists in ready submit queue, no need to add again, task code:{}", taskInstance.getTaskCode()); return; @@ -1888,7 +1889,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { taskInstance.getId(), taskInstance.getTaskCode()); TaskMetrics.incTaskInstanceByState("submit"); - readyToSubmitTaskQueue.put(taskInstance); + standByTaskInstancePriorityQueue.put(taskInstance); } /** @@ -1897,7 +1898,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { * @param taskInstance task instance */ private boolean removeTaskFromStandbyList(TaskInstance taskInstance) { - return readyToSubmitTaskQueue.remove(taskInstance); + return standByTaskInstancePriorityQueue.remove(taskInstance); } /** @@ -1906,7 +1907,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { * @return Boolean whether has retry task in standby */ private boolean hasRetryTaskInStandBy() { - for (Iterator iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) { + for (Iterator iter = standByTaskInstancePriorityQueue.iterator(); iter.hasNext();) { if (iter.next().getState().isFailure()) { return true; } @@ -1923,8 +1924,8 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { workflowInstance.getId(), taskExecuteRunnableMap.size()); - if (readyToSubmitTaskQueue.size() > 0) { - readyToSubmitTaskQueue.clear(); + if (standByTaskInstancePriorityQueue.size() > 0) { + standByTaskInstancePriorityQueue.clear(); } for (long taskCode : taskExecuteRunnableMap.keySet()) { @@ -1965,7 +1966,7 @@ public class WorkflowExecuteRunnable implements IWorkflowExecuteRunnable { public void submitStandByTask() throws StateEventHandleException { ProcessInstance workflowInstance = workflowExecuteContext.getWorkflowInstance(); TaskInstance task; - while ((task = readyToSubmitTaskQueue.peek()) != null) { + while ((task = standByTaskInstancePriorityQueue.peek()) != null) { // stop tasks which is retrying if forced success happens if (task.getId() != null && task.taskCanRetry()) { TaskInstance retryTask = taskInstanceDao.queryById(task.getId()); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java similarity index 79% rename from dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java rename to dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java index 5bd89ffad5..c11c4fe5a9 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueue.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.service.queue; -import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; @@ -35,7 +34,7 @@ import com.google.common.base.Preconditions; * Task instances priority queue implementation * All the task instances are in the same process instance. */ -public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue { +public class StandByTaskInstancePriorityQueue implements TaskPriorityQueue { /** * queue size @@ -45,7 +44,8 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); + private final PriorityQueue queue = + new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInstancePriorityComparator()); private final Set taskInstanceIdentifySet = Collections.synchronizedSet(new HashSet<>()); /** @@ -163,24 +163,25 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue { - - /** - * compare o1 o2 - * - * @param o1 o1 - * @param o2 o2 - * @return compare result - */ + private static class TaskInstancePriorityComparator implements Comparator { + @Override public int compare(TaskInstance o1, TaskInstance o2) { - if (o1.getTaskInstancePriority().equals(o2.getTaskInstancePriority())) { - // larger number, higher priority - return Constants.OPPOSITE_VALUE * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority()); + int taskPriorityInTaskGroup = -1 * Integer.compare(o1.getTaskGroupPriority(), o2.getTaskGroupPriority()); + int taskInstancePriorityInWorkflow = + Long.compare(o1.getTaskInstancePriority().getCode(), o2.getTaskInstancePriority().getCode()); + + if (o1.getTaskGroupId() == o2.getTaskGroupId()) { + // If at the same taskGroup + if (taskPriorityInTaskGroup != 0) { + return taskPriorityInTaskGroup; + } } - return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority()); + return taskInstancePriorityInWorkflow; } } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java similarity index 84% rename from dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java index a430ab3c95..80a2513929 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/StandByTaskInstancePriorityQueueTest.java @@ -26,11 +26,11 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -public class PeerTaskInstancePriorityQueueTest { +public class StandByTaskInstancePriorityQueueTest { @Test public void put() throws TaskPriorityQueueException { - PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); queue.put(taskInstanceHigPriority); @@ -42,7 +42,7 @@ public class PeerTaskInstancePriorityQueueTest { @Test public void take() throws Exception { - PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); queue.take(); Assertions.assertTrue(queue.size() < peekBeforeLength); @@ -50,7 +50,7 @@ public class PeerTaskInstancePriorityQueueTest { @Test public void poll() throws Exception { - PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); Assertions.assertThrows(TaskPriorityQueueException.class, () -> { queue.poll(1000, TimeUnit.MILLISECONDS); }); @@ -58,14 +58,15 @@ public class PeerTaskInstancePriorityQueueTest { @Test public void peek() throws Exception { - PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); Assertions.assertEquals(peekBeforeLength, queue.size()); } @Test public void peekTaskGroupPriority() throws Exception { - PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); + TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 2); TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 1); queue.put(taskInstanceMediumPriority); @@ -80,7 +81,7 @@ public class PeerTaskInstancePriorityQueueTest { queue.put(taskInstanceHigPriority); taskInstance = queue.peek(); queue.clear(); - Assertions.assertEquals(taskInstance.getName(), "medium"); + Assertions.assertEquals("medium", taskInstance.getName()); taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 2); @@ -88,7 +89,7 @@ public class PeerTaskInstancePriorityQueueTest { queue.put(taskInstanceHigPriority); taskInstance = queue.peek(); queue.clear(); - Assertions.assertEquals(taskInstance.getName(), "high"); + Assertions.assertEquals("medium", taskInstance.getName()); taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); @@ -96,7 +97,7 @@ public class PeerTaskInstancePriorityQueueTest { queue.put(taskInstanceHigPriority); taskInstance = queue.peek(); queue.clear(); - Assertions.assertEquals(taskInstance.getName(), "high"); + Assertions.assertEquals("high", taskInstance.getName()); } @@ -107,7 +108,7 @@ public class PeerTaskInstancePriorityQueueTest { @Test public void contains() throws Exception { - PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); queue.put(taskInstanceMediumPriority); Assertions.assertTrue(queue.contains(taskInstanceMediumPriority)); @@ -117,8 +118,8 @@ public class PeerTaskInstancePriorityQueueTest { } @Test - public void remove() throws Exception { - PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + public void remove() { + StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); queue.put(taskInstanceMediumPriority); int peekBeforeLength = queue.size(); @@ -133,8 +134,8 @@ public class PeerTaskInstancePriorityQueueTest { * @return queue * @throws Exception */ - private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception { - PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + private StandByTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception { + StandByTaskInstancePriorityQueue queue = new StandByTaskInstancePriorityQueue(); TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); taskInstanceHigPriority.setTaskGroupPriority(3);