Browse Source

[BUG][TaskGroup] Task group does not take effect (#10093)

* fix 10092: Task group does not take effect

* fix 10092: Task group does not take effect

* fix 10092: Task group does not take effect

(cherry picked from commit ee2b855ced)
3.0.0/version-upgrade
BaoLiang 3 years ago committed by Jiajie Zhong
parent
commit
b016037a6f
  1. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
  3. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java
  4. 12
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  5. 5
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
  6. 33
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java
  7. 57
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java
  8. 62
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -66,6 +66,8 @@ public final class Constants {
public static final String BUCKET_NAME = "dolphinscheduler-test";
public static final String EMPTY_STRING = "";
/**
* fs.defaultFS
*/
@ -422,6 +424,8 @@ public final class Constants {
*/
public static final int DEFINITION_FAILURE = -1;
public static final int OPPOSITE_VALUE = -1;
/**
* process or task definition first version
*/

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java

@ -713,6 +713,8 @@ public class TaskInstance implements Serializable {
+ ", executorName='" + executorName + '\''
+ ", delayTime=" + delayTime
+ ", dryRun=" + dryRun
+ ", taskGroupId=" + taskGroupId
+ ", taskGroupPriority=" + taskGroupPriority
+ '}';
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java

@ -114,7 +114,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
TaskPriority taskPriority = new TaskPriority(processInstance.getProcessInstancePriority().getCode(),
processInstance.getId(), taskInstance.getProcessInstancePriority().getCode(),
taskInstance.getId(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
taskInstance.getId(), taskInstance.getTaskGroupPriority(), org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);
TaskExecutionContext taskExecutionContext = getTaskExecutionContext(taskInstance);
if (taskExecutionContext == null) {

12
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -98,7 +98,7 @@ public class TaskPriorityQueueConsumerTest {
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default");
taskPriorityQueue.put(taskPriority);
TimeUnit.SECONDS.sleep(10);
@ -125,7 +125,7 @@ public class TaskPriorityQueueConsumerTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default");
taskPriorityQueue.put(taskPriority);
DataSource dataSource = new DataSource();
@ -166,7 +166,7 @@ public class TaskPriorityQueueConsumerTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default");
taskPriorityQueue.put(taskPriority);
DataSource dataSource = new DataSource();
@ -205,7 +205,7 @@ public class TaskPriorityQueueConsumerTest {
ProcessDefinition processDefinition = new ProcessDefinition();
processDefinition.setUserId(2);
taskInstance.setProcessDefine(processDefinition);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "default");
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "default");
taskPriorityQueue.put(taskPriority);
DataSource dataSource = new DataSource();
@ -266,7 +266,7 @@ public class TaskPriorityQueueConsumerTest {
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup");
taskPriorityQueue.put(taskPriority);
TimeUnit.SECONDS.sleep(10);
@ -335,7 +335,7 @@ public class TaskPriorityQueueConsumerTest {
Mockito.doReturn(taskInstance).when(processService).findTaskInstanceById(1);
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, "NoWorkGroup");
TaskPriority taskPriority = new TaskPriority(2, 1, 2, 1, 1, "NoWorkGroup");
taskPriorityQueue.put(taskPriority);
taskPriorityQueueConsumer.run();

5
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
@ -168,6 +169,10 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
*/
@Override
public int compare(TaskInstance o1, TaskInstance o2) {
if(o1.getTaskInstancePriority().equals(o2.getTaskInstancePriority())){
// larger number, higher priority
return Constants.OPPOSITE_VALUE * Integer.compare(o1.getTaskGroupPriority(),o2.getTaskGroupPriority());
}
return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority());
}
}

33
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriority.java

@ -17,7 +17,9 @@
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Map;
import java.util.Objects;
@ -67,6 +69,8 @@ public class TaskPriority implements Comparable<TaskPriority> {
*/
private long checkpoint;
private int taskGroupPriority;
public TaskPriority() {
this.checkpoint = System.currentTimeMillis();
}
@ -74,11 +78,13 @@ public class TaskPriority implements Comparable<TaskPriority> {
public TaskPriority(int processInstancePriority,
int processInstanceId,
int taskInstancePriority,
int taskId, String groupName) {
int taskId,
int taskGroupPriority, String groupName) {
this.processInstancePriority = processInstancePriority;
this.processInstanceId = processInstanceId;
this.taskInstancePriority = taskInstancePriority;
this.taskId = taskId;
this.taskGroupPriority = taskGroupPriority;
this.groupName = groupName;
this.checkpoint = System.currentTimeMillis();
}
@ -147,6 +153,14 @@ public class TaskPriority implements Comparable<TaskPriority> {
this.checkpoint = checkpoint;
}
public int getTaskGroupPriority() {
return taskGroupPriority;
}
public void setTaskGroupPriority(int taskGroupPriority) {
this.taskGroupPriority = taskGroupPriority;
}
@Override
public int compareTo(TaskPriority other) {
if (this.getProcessInstancePriority() > other.getProcessInstancePriority()) {
@ -169,15 +183,22 @@ public class TaskPriority implements Comparable<TaskPriority> {
if (this.getTaskInstancePriority() < other.getTaskInstancePriority()) {
return -1;
}
if(this.getTaskGroupPriority() != other.getTaskGroupPriority()){
// larger number, higher priority
return Constants.OPPOSITE_VALUE * Integer.compare(this.getTaskGroupPriority(), other.getTaskGroupPriority());
}
if (this.getTaskId() > other.getTaskId()) {
return 1;
}
if (this.getTaskId() < other.getTaskId()) {
return -1;
}
return this.getGroupName().compareTo(other.getGroupName());
String thisGroupName = StringUtils.isNotBlank(this.getGroupName()) ? this.getGroupName() : Constants.EMPTY_STRING;
String otherGroupName = StringUtils.isNotBlank(other.getGroupName()) ? other.getGroupName() : Constants.EMPTY_STRING;
if(!thisGroupName.equals(otherGroupName)){
return thisGroupName.compareTo(otherGroupName);
}
return Long.compare(this.getCheckpoint(), other.getCheckpoint());
}
@Override
@ -193,11 +214,13 @@ public class TaskPriority implements Comparable<TaskPriority> {
&& processInstanceId == that.processInstanceId
&& taskInstancePriority == that.taskInstancePriority
&& taskId == that.taskId
&& taskGroupPriority == that.taskGroupPriority
&& Objects.equals(groupName, that.groupName);
}
@Override
public int hashCode() {
return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, groupName);
return Objects.hash(processInstancePriority, processInstanceId, taskInstancePriority, taskId, taskGroupPriority, groupName);
}
}

57
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java

@ -31,8 +31,8 @@ public class PeerTaskInstancePriorityQueueTest {
@Test
public void put() throws TaskPriorityQueueException {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM);
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceHigPriority);
queue.put(taskInstanceMediumPriority);
Assert.assertEquals(2, queue.size());
@ -60,10 +60,46 @@ public class PeerTaskInstancePriorityQueueTest {
public void peek() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size();
queue.peek();
Assert.assertEquals(peekBeforeLength, queue.size());
}
@Test
public void peekTaskGroupPriority() throws Exception{
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 2);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 1);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
TaskInstance taskInstance = queue.peek();
queue.clear();
Assert.assertEquals(taskInstance.getName(), "high");
taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
taskInstanceMediumPriority = createTaskInstance("medium", Priority.HIGH, 2);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
Assert.assertEquals(taskInstance.getName(), "medium");
taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 2);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
Assert.assertEquals(taskInstance.getName(), "high");
taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
taskInstance = queue.peek();
queue.clear();
Assert.assertEquals(taskInstance.getName(), "high");
}
@Test
public void size() throws Exception {
Assert.assertEquals(2, getPeerTaskInstancePriorityQueue().size());
@ -72,7 +108,7 @@ public class PeerTaskInstancePriorityQueueTest {
@Test
public void contains() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
Assert.assertTrue(queue.contains(taskInstanceMediumPriority));
}
@ -80,7 +116,7 @@ public class PeerTaskInstancePriorityQueueTest {
@Test
public void remove() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
queue.put(taskInstanceMediumPriority);
int peekBeforeLength = queue.size();
queue.remove(taskInstanceMediumPriority);
@ -95,10 +131,12 @@ public class PeerTaskInstancePriorityQueueTest {
*/
private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM);
queue.put(taskInstanceHigPriority);
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH, 1);
TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM, 1);
taskInstanceHigPriority.setTaskGroupPriority(3);
taskInstanceMediumPriority.setTaskGroupPriority(2);
queue.put(taskInstanceMediumPriority);
queue.put(taskInstanceHigPriority);
return queue;
}
@ -109,10 +147,11 @@ public class PeerTaskInstancePriorityQueueTest {
* @param priority priority
* @return
*/
private TaskInstance createTaskInstance(String name, Priority priority) {
private TaskInstance createTaskInstance(String name, Priority priority, int taskGroupPriority) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setName(name);
taskInstance.setTaskInstancePriority(priority);
taskInstance.setTaskGroupPriority(taskGroupPriority);
return taskInstance;
}
}

62
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java

@ -31,9 +31,9 @@ public class TaskPriorityQueueImplTest {
@Test
public void testSort() {
TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, "default");
TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, "default");
TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, "default");
TaskPriority priorityOne = new TaskPriority(1, 0, 0, 0, 1, "default");
TaskPriority priorityTwo = new TaskPriority(2, 0, 0, 0, 1, "default");
TaskPriority priorityThree = new TaskPriority(3, 0, 0, 0, 1, "default");
List<TaskPriority> taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
@ -41,9 +41,9 @@ public class TaskPriorityQueueImplTest {
taskPrioritys
);
priorityOne = new TaskPriority(0, 1, 0, 0, "default");
priorityTwo = new TaskPriority(0, 2, 0, 0, "default");
priorityThree = new TaskPriority(0, 3, 0, 0, "default");
priorityOne = new TaskPriority(0, 1, 0, 0, 1, "default");
priorityTwo = new TaskPriority(0, 2, 0, 0, 1, "default");
priorityThree = new TaskPriority(0, 3, 0, 0, 1, "default");
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
@ -51,9 +51,9 @@ public class TaskPriorityQueueImplTest {
taskPrioritys
);
priorityOne = new TaskPriority(0, 0, 1, 0, "default");
priorityTwo = new TaskPriority(0, 0, 2, 0, "default");
priorityThree = new TaskPriority(0, 0, 3, 0, "default");
priorityOne = new TaskPriority(0, 0, 1, 0, 1, "default");
priorityTwo = new TaskPriority(0, 0, 2, 0, 1, "default");
priorityThree = new TaskPriority(0, 0, 3, 0, 1, "default");
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
@ -61,9 +61,9 @@ public class TaskPriorityQueueImplTest {
taskPrioritys
);
priorityOne = new TaskPriority(0, 0, 0, 1, "default");
priorityTwo = new TaskPriority(0, 0, 0, 2, "default");
priorityThree = new TaskPriority(0, 0, 0, 3, "default");
priorityOne = new TaskPriority(0, 0, 0, 1, 1, "default");
priorityTwo = new TaskPriority(0, 0, 0, 2, 1, "default");
priorityThree = new TaskPriority(0, 0, 0, 3, 1, "default");
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
@ -71,15 +71,45 @@ public class TaskPriorityQueueImplTest {
taskPrioritys
);
priorityOne = new TaskPriority(0, 0, 0, 0, "default_1");
priorityTwo = new TaskPriority(0, 0, 0, 0, "default_2");
priorityThree = new TaskPriority(0, 0, 0, 0, "default_3");
priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_2");
priorityThree = new TaskPriority(0, 0, 0, 0, 1, "default_3");
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
Arrays.asList(priorityOne, priorityTwo, priorityThree),
taskPrioritys
);
priorityOne = new TaskPriority(0, 0, 0, 0, 2, "default_1");
priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_2");
priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3");
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
Arrays.asList(priorityThree, priorityOne, priorityTwo),
taskPrioritys
);
priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_2");
priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_3");
taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo);
Collections.sort(taskPrioritys);
Assert.assertEquals(
Arrays.asList(priorityThree, priorityOne, priorityTwo),
taskPrioritys
);
priorityTwo = new TaskPriority(0, 0, 0, 0, 1,"default_1");
priorityOne = new TaskPriority(0, 0, 0, 0, 1, "default_1");
priorityThree = new TaskPriority(0, 0, 0, 0, 3, "default_1");
taskPrioritys = Arrays.asList(priorityTwo, priorityOne, priorityThree);
Collections.sort(taskPrioritys);
Assert.assertEquals(
Arrays.asList(priorityThree, priorityTwo, priorityOne),
taskPrioritys
);
}
@Test
@ -134,7 +164,7 @@ public class TaskPriorityQueueImplTest {
* @return
*/
private TaskPriority createTaskPriority(Integer priority, Integer processInstanceId) {
TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, "default");
TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, 1, "default");
return priorityOne;
}
}
Loading…
Cancel
Save