diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 57b5c92970..19fbdb90ca 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -66,6 +66,8 @@ public final class Constants { public static final String BUCKET_NAME = "dolphinscheduler-test"; + public static final String EMPTY_STRING = ""; + /** * fs.defaultFS */ @@ -422,6 +424,8 @@ public final class Constants { */ public static final int DEFINITION_FAILURE = -1; + public static final int OPPOSITE_VALUE = -1; + /** * process or task definition first version */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index 6ab0c340f1..4ec1dfde2b 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -713,6 +713,8 @@ public class TaskInstance implements Serializable { + ", executorName='" + executorName + '\'' + ", delayTime=" + delayTime + ", dryRun=" + dryRun + + ", taskGroupId=" + taskGroupId + + ", taskGroupPriority=" + taskGroupPriority + '}'; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 5833bc56c1..ffeb89a0d2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -114,7 +114,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor { TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(), processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(), - taskInstance.getId(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP); + taskInstance.getId(), taskInstance.getTaskGroupPriority(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP); TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance); if (taskExecutionContext == null) { diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index ec3f58aca0..05590e4541 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -98,7 +98,7 @@ public class TaskPriorityQueueConsumerTest { processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default"); taskPriorityQueue.put(taskPriority); TimeUnit.SECONDS.sleep(10); @@ -125,7 +125,7 @@ public class TaskPriorityQueueConsumerTest { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default"); taskPriorityQueue.put(taskPriority); DataSource dataSource = new DataSource(); @@ -166,7 +166,7 @@ public class TaskPriorityQueueConsumerTest { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default"); taskPriorityQueue.put(taskPriority); DataSource dataSource = new DataSource(); @@ -205,7 +205,7 @@ public class TaskPriorityQueueConsumerTest { ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setUserId(2); taskInstance.setProcessDefine(processDefinition); - TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default"); taskPriorityQueue.put(taskPriority); DataSource dataSource = new DataSource(); @@ -266,7 +266,7 @@ public class TaskPriorityQueueConsumerTest { Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup"); taskPriorityQueue.put(taskPriority); TimeUnit.SECONDS.sleep(10); @@ -335,7 +335,7 @@ public class TaskPriorityQueueConsumerTest { Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1); - TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup"); + TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup"); taskPriorityQueue.put(taskPriority); taskPriorityQueueConsumer.run(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 7502607bcf..231fd2a20f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.service.queue; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; @@ -168,6 +169,10 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue { */ private long checkpoint; + private int taskGroupPriority; + public TaskPriority() { this.checkpoint = System.currentTimeMillis(); } @@ -74,11 +78,13 @@ public class TaskPriority implements Comparable { public TaskPriority(int processInstancePriority, int processInstanceId, int taskInstancePriority, - int taskId, String groupName) { + int taskId, + int taskGroupPriority, String groupName) { this.processInstancePriority = processInstancePriority; this.processInstanceId = processInstanceId; this.taskInstancePriority = taskInstancePriority; this.taskId = taskId; + this.taskGroupPriority = taskGroupPriority; this.groupName = groupName; this.checkpoint = System.currentTimeMillis(); } @@ -147,6 +153,14 @@ public class TaskPriority implements Comparable { this.checkpoint = checkpoint; } + public int getTaskGroupPriority() { + return taskGroupPriority; + } + + public void setTaskGroupPriority(int taskGroupPriority) { + this.taskGroupPriority = taskGroupPriority; + } + @Override public int compareTo(TaskPriority other) { if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) { @@ -169,15 +183,22 @@ public class TaskPriority implements Comparable { if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) { return -1; } - + if(this.getTaskGroupPriority() != other.getTaskGroupPriority()){ + // larger number, higher priority + return Constants.OPPOSITE_VALUE * Integer.compare(this.getTaskGroupPriority(), other.getTaskGroupPriority()); + } if (this.getTaskId() > other.getTaskId()) { return 1; } if (this.getTaskId() < other.getTaskId()) { return -1; } - - return this.getGroupName().compareTo(other.getGroupName()); + String thisGroupName = StringUtils.isNotBlank(this.getGroupName()) ? this.getGroupName() : Constants.EMPTY_STRING; + String otherGroupName = StringUtils.isNotBlank(other.getGroupName()) ? other.getGroupName() : Constants.EMPTY_STRING; + if(!thisGroupName.equals(otherGroupName)){ + return thisGroupName.compareTo(otherGroupName); + } + return Long.compare(this.getCheckpoint(), other.getCheckpoint()); } @Override @@ -193,11 +214,13 @@ public class TaskPriority implements Comparable { && processInstanceId == that.processInstanceId && taskInstancePriority == that.taskInstancePriority && taskId == that.taskId + && taskGroupPriority == that.taskGroupPriority && Objects.equals(groupName, that.groupName); } @Override public int hashCode() { - return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, groupName); + return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, taskGroupPriority, groupName); } + } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index 7feb4c676b..8da3a6c194 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -31,8 +31,8 @@ public class PeerTaskInstancePriorityQueueTest { @Test public void put() throws TaskPriorityQueueException { PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); - TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH); - TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); + TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); queue.put(taskInstanceHigPriority); queue.put(taskInstanceMediumPriority); Assert.assertEquals(2, queue.size()); @@ -60,10 +60,46 @@ public class PeerTaskInstancePriorityQueueTest { public void peek() throws Exception { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); - queue.peek(); Assert.assertEquals(peekBeforeLength, queue.size()); } + @Test + public void peekTaskGroupPriority() throws Exception{ + PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 2); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 1); + queue.put(taskInstanceMediumPriority); + queue.put(taskInstanceHigPriority); + TaskInstance taskInstance = queue.peek(); + queue.clear(); + Assert.assertEquals(taskInstance.getName(), "high"); + + taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); + taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 2); + queue.put(taskInstanceMediumPriority); + queue.put(taskInstanceHigPriority); + taskInstance = queue.peek(); + queue.clear(); + Assert.assertEquals(taskInstance.getName(), "medium"); + + taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); + taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 2); + queue.put(taskInstanceMediumPriority); + queue.put(taskInstanceHigPriority); + taskInstance = queue.peek(); + queue.clear(); + Assert.assertEquals(taskInstance.getName(), "high"); + + taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); + taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); + queue.put(taskInstanceMediumPriority); + queue.put(taskInstanceHigPriority); + taskInstance = queue.peek(); + queue.clear(); + Assert.assertEquals(taskInstance.getName(), "high"); + + } + @Test public void size() throws Exception { Assert.assertEquals(2, getPeerTaskInstancePriorityQueue().size()); @@ -72,7 +108,7 @@ public class PeerTaskInstancePriorityQueueTest { @Test public void contains() throws Exception { PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); - TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); queue.put(taskInstanceMediumPriority); Assert.assertTrue(queue.contains(taskInstanceMediumPriority)); } @@ -80,7 +116,7 @@ public class PeerTaskInstancePriorityQueueTest { @Test public void remove() throws Exception { PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); - TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); queue.put(taskInstanceMediumPriority); int peekBeforeLength = queue.size(); queue.remove(taskInstanceMediumPriority); @@ -95,10 +131,12 @@ public class PeerTaskInstancePriorityQueueTest { */ private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception { PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); - TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH); - TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); - queue.put(taskInstanceHigPriority); + TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1); + taskInstanceHigPriority.setTaskGroupPriority(3); + taskInstanceMediumPriority.setTaskGroupPriority(2); queue.put(taskInstanceMediumPriority); + queue.put(taskInstanceHigPriority); return queue; } @@ -109,10 +147,11 @@ public class PeerTaskInstancePriorityQueueTest { * @param priority priority * @return */ - private TaskInstance createTaskInstance(String name, Priority priority) { + private TaskInstance createTaskInstance(String name, Priority priority, int taskGroupPriority) { TaskInstance taskInstance = new TaskInstance(); taskInstance.setName(name); taskInstance.setTaskInstancePriority(priority); + taskInstance.setTaskGroupPriority(taskGroupPriority); return taskInstance; } } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java index 3888d3d93c..c2fb14acfa 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java @@ -31,9 +31,9 @@ public class TaskPriorityQueueImplTest { @Test public void testSort() { - TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, "default"); - TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, "default"); - TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, "default"); + TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, 1, "default"); + TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, 1, "default"); + TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, 1, "default"); List taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( @@ -41,9 +41,9 @@ public class TaskPriorityQueueImplTest { taskPrioritys ); - priorityOne = new TaskPriority(0, 1, 0, 0, "default"); - priorityTwo = new TaskPriority(0, 2, 0, 0, "default"); - priorityThree = new TaskPriority(0, 3, 0, 0, "default"); + priorityOne = new TaskPriority(0, 1, 0, 0, 1, "default"); + priorityTwo = new TaskPriority(0, 2, 0, 0, 1, "default"); + priorityThree = new TaskPriority(0, 3, 0, 0, 1, "default"); taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( @@ -51,9 +51,9 @@ public class TaskPriorityQueueImplTest { taskPrioritys ); - priorityOne = new TaskPriority(0, 0, 1, 0, "default"); - priorityTwo = new TaskPriority(0, 0, 2, 0, "default"); - priorityThree = new TaskPriority(0, 0, 3, 0, "default"); + priorityOne = new TaskPriority(0, 0, 1, 0, 1, "default"); + priorityTwo = new TaskPriority(0, 0, 2, 0, 1, "default"); + priorityThree = new TaskPriority(0, 0, 3, 0, 1, "default"); taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( @@ -61,9 +61,9 @@ public class TaskPriorityQueueImplTest { taskPrioritys ); - priorityOne = new TaskPriority(0, 0, 0, 1, "default"); - priorityTwo = new TaskPriority(0, 0, 0, 2, "default"); - priorityThree = new TaskPriority(0, 0, 0, 3, "default"); + priorityOne = new TaskPriority(0, 0, 0, 1, 1, "default"); + priorityTwo = new TaskPriority(0, 0, 0, 2, 1, "default"); + priorityThree = new TaskPriority(0, 0, 0, 3, 1, "default"); taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( @@ -71,15 +71,45 @@ public class TaskPriorityQueueImplTest { taskPrioritys ); - priorityOne = new TaskPriority(0, 0, 0, 0, "default_1"); - priorityTwo = new TaskPriority(0, 0, 0, 0, "default_2"); - priorityThree = new TaskPriority(0, 0, 0, 0, "default_3"); + priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1"); + priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_2"); + priorityThree = new TaskPriority(0, 0, 0, 0, 1, "default_3"); taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( Arrays.asList(priorityOne, priorityTwo, priorityThree), taskPrioritys ); + + priorityOne = new TaskPriority(0, 0, 0, 0, 2, "default_1"); + priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_2"); + priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityThree, priorityOne, priorityTwo), + taskPrioritys + ); + + priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1"); + priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_2"); + priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3"); + taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityThree, priorityOne, priorityTwo), + taskPrioritys + ); + + priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_1"); + priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1"); + priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_1"); + taskPrioritys = Arrays.asList(priorityTwo, priorityOne, priorityThree); + Collections.sort(taskPrioritys); + Assert.assertEquals( + Arrays.asList(priorityThree, priorityTwo, priorityOne), + taskPrioritys + ); } @Test @@ -134,7 +164,7 @@ public class TaskPriorityQueueImplTest { * @return */ private TaskPriority createTaskPriority(Integer priority, Integer processInstanceId) { - TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, "default"); + TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, 1, "default"); return priorityOne; } } \ No newline at end of file