Browse Source

[Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority. (#4250)

* [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority.

* [Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority.

* [Fix-4222][Master]Remove useless import

* [Fix-4222][Master]Reformat code style

* [Fix-4222][Master]Reformat code style

* [Fix-4222][Master]Reformat code style

* [Fix-4222][Master]add PeerTaskInstancePriorityQueueTest

* [Fix-4222][Master]Fix code smell

* [Fix-4222][Master]Reformat code style

* [Fix-4222][Master]Fix code smell

Co-authored-by: xingchun-chen <55787491+xingchun-chen@users.noreply.github.com>
pull/3/MERGE
lgcareer 4 years ago committed by GitHub
parent
commit
9fe5c3717d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  2. 72
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  3. 44
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java
  4. 127
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
  5. 21
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
  6. 38
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java
  7. 89
      dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java
  8. 1
      pom.xml

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -90,7 +90,7 @@ public class TaskPriorityQueueConsumer extends Thread {
* taskUpdateQueue
*/
@Autowired
private TaskPriorityQueue taskPriorityQueue;
private TaskPriorityQueue<String> taskPriorityQueue;
/**
* processService

72
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -55,6 +55,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.apache.commons.io.FileUtils;
@ -120,9 +121,9 @@ public class MasterExecThread implements Runnable {
private Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
/**
* ready to submit task list
* ready to submit task queue
*/
private Map<String, TaskInstance> readyToSubmitTaskList = new ConcurrentHashMap<>();
private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
/**
* depend failed task map
@ -178,8 +179,8 @@ public class MasterExecThread implements Runnable {
/**
* constructor of MasterExecThread
*
* @param processInstance processInstance
* @param processService processService
* @param processInstance processInstance
* @param processService processService
* @param nettyRemotingClient nettyRemotingClient
*/
public MasterExecThread(ProcessInstance processInstance
@ -483,7 +484,7 @@ public class MasterExecThread implements Runnable {
* encapsulation task
*
* @param processInstance process instance
* @param nodeName node name
* @param nodeName node name
* @return TaskInstance
*/
private TaskInstance createTaskInstance(ProcessInstance processInstance, String nodeName,
@ -563,7 +564,7 @@ public class MasterExecThread implements Runnable {
// if previous node success , post node submit
for (TaskInstance task : taskInstances) {
if (readyToSubmitTaskList.containsKey(task.getName())) {
if (readyToSubmitTaskQueue.contains(task)) {
continue;
}
@ -699,7 +700,7 @@ public class MasterExecThread implements Runnable {
return true;
}
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
return readyToSubmitTaskList.size() == 0 || activeTaskNode.size() == 0;
return readyToSubmitTaskQueue.size() == 0 || activeTaskNode.size() == 0;
}
}
return false;
@ -731,7 +732,7 @@ public class MasterExecThread implements Runnable {
List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE);
if (CollectionUtils.isNotEmpty(pauseList)
|| !isComplementEnd()
|| readyToSubmitTaskList.size() > 0) {
|| readyToSubmitTaskQueue.size() > 0) {
return ExecutionStatus.PAUSE;
} else {
return ExecutionStatus.SUCCESS;
@ -782,7 +783,7 @@ public class MasterExecThread implements Runnable {
// success
if (state == ExecutionStatus.RUNNING_EXECUTION) {
List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
if (readyToSubmitTaskList.size() > 0) {
if (readyToSubmitTaskQueue.size() > 0) {
//tasks currently pending submission, no retries, indicating that depend is waiting to complete
return ExecutionStatus.RUNNING_EXECUTION;
} else if (CollectionUtils.isNotEmpty(killTasks)) {
@ -804,8 +805,8 @@ public class MasterExecThread implements Runnable {
boolean result = false;
for (String taskName : readyToSubmitTaskList.keySet()) {
TaskInstance task = readyToSubmitTaskList.get(taskName);
for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext(); ) {
TaskInstance task = iter.next();
if (task.getState().typeIsFailure()) {
result = true;
break;
@ -872,7 +873,11 @@ public class MasterExecThread implements Runnable {
*/
private void addTaskToStandByList(TaskInstance taskInstance) {
logger.info("add task to stand by list: {}", taskInstance.getName());
readyToSubmitTaskList.putIfAbsent(taskInstance.getName(), taskInstance);
try {
readyToSubmitTaskQueue.put(taskInstance);
} catch (Exception e) {
logger.error("add task instance to readyToSubmitTaskQueue error");
}
}
/**
@ -882,7 +887,11 @@ public class MasterExecThread implements Runnable {
*/
private void removeTaskFromStandbyList(TaskInstance taskInstance) {
logger.info("remove task from stand by list: {}", taskInstance.getName());
readyToSubmitTaskList.remove(taskInstance.getName());
try {
readyToSubmitTaskQueue.remove(taskInstance);
} catch (Exception e) {
logger.error("remove task instance from readyToSubmitTaskQueue error");
}
}
/**
@ -891,8 +900,8 @@ public class MasterExecThread implements Runnable {
* @return Boolean whether has retry task in standby
*/
private boolean hasRetryTaskInStandBy() {
for (Map.Entry<String, TaskInstance> entry : readyToSubmitTaskList.entrySet()) {
if (entry.getValue().getState().typeIsFailure()) {
for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext(); ) {
if (iter.next().getState().typeIsFailure()) {
return true;
}
}
@ -1083,20 +1092,25 @@ public class MasterExecThread implements Runnable {
* handling the list of tasks to be submitted
*/
private void submitStandByTask() {
for (Map.Entry<String, TaskInstance> entry : readyToSubmitTaskList.entrySet()) {
TaskInstance task = entry.getValue();
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
if (retryTaskIntervalOverTime(task)) {
submitTaskExec(task);
try {
int length = readyToSubmitTaskQueue.size();
for (int i = 0; i < length; i++) {
TaskInstance task = readyToSubmitTaskQueue.peek();
DependResult dependResult = getDependResultForTask(task);
if (DependResult.SUCCESS == dependResult) {
if (retryTaskIntervalOverTime(task)) {
submitTaskExec(task);
removeTaskFromStandbyList(task);
}
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTask.put(task.getName(), task);
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
}
} else if (DependResult.FAILED == dependResult) {
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTask.put(entry.getKey(), task);
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}", task.getName(), task.getId(), dependResult);
}
} catch (Exception e) {
logger.error("submit standby task error", e);
}
}
@ -1185,9 +1199,9 @@ public class MasterExecThread implements Runnable {
* generate flow dag
*
* @param processDefinitionJson process definition json
* @param startNodeNameList start node name list
* @param recoveryNodeNameList recovery node name list
* @param depNodeType depend node type
* @param startNodeNameList start node name list
* @param recoveryNodeNameList recovery node name list
* @param depNodeType depend node type
* @return ProcessDag process dag
* @throws Exception exception
*/

44
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/exceptions/TaskPriorityQueueException.java

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.exceptions;
/**
* task priority queue exception
*/
public class TaskPriorityQueueException extends Exception {
/**
* Construct a new runtime exception with the detail message
*
* @param message message
*/
public TaskPriorityQueueException(String message) {
super(message);
}
/**
* Construct a new runtime exception with the detail message and cause
*
* @param message message
* @param cause cause
*/
public TaskPriorityQueueException(String message, Throwable cause) {
super(message, cause);
}
}

127
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@ -0,0 +1,127 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
/**
* Task instances priority queue implementation
* All the task instances are in the same process instance.
*/
public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInstance> {
/**
* queue size
*/
private static final Integer QUEUE_MAX_SIZE = 3000;
/**
* queue
*/
private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
/**
* put task instance to priority queue
*
* @param taskInstance taskInstance
* @throws TaskPriorityQueueException
*/
public void put(TaskInstance taskInstance) throws TaskPriorityQueueException {
queue.add(taskInstance);
}
/**
* take task info
*
* @return task instance
* @throws TaskPriorityQueueException
*/
@Override
public TaskInstance take() throws TaskPriorityQueueException {
return queue.poll();
}
/**
* peek taskInfo
*
* @return task instance
*/
public TaskInstance peek() {
return queue.peek();
}
/**
* queue size
*
* @return size
*/
public int size() {
return queue.size();
}
/**
* whether contains the task instance
*
* @param taskInstance task instance
* @return true is contains
*/
public boolean contains(TaskInstance taskInstance) {
return queue.contains(taskInstance);
}
/**
* remove task
*
* @param taskInstance task instance
* @return true if remove success
*/
public boolean remove(TaskInstance taskInstance) {
return queue.remove(taskInstance);
}
/**
* get iterator
*
* @return Iterator
*/
public Iterator<TaskInstance> iterator() {
return queue.iterator();
}
/**
* TaskInfoComparator
*/
private class TaskInfoComparator implements Comparator<TaskInstance> {
/**
* compare o1 o2
*
* @param o1 o1
* @param o2 o2
* @return compare result
*/
@Override
public int compare(TaskInstance o1, TaskInstance o2) {
return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority());
}
}
}

21
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java

@ -14,31 +14,38 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
public interface TaskPriorityQueue {
/**
* task priority queue
* @param <T>
*/
public interface TaskPriorityQueue<T> {
/**
* put task info
*
* @param taskInfo taskInfo
* @throws Exception
* @throws TaskPriorityQueueException
*/
void put(String taskInfo) throws Exception;
void put(T taskInfo) throws TaskPriorityQueueException;
/**
* take taskInfo
*
* @return taskInfo
* @throws Exception
* @throws TaskPriorityQueueException
*/
String take()throws Exception;
T take() throws TaskPriorityQueueException, InterruptedException;
/**
* size
*
* @return size
* @throws Exception
* @throws TaskPriorityQueueException
*/
int size() throws Exception;
int size() throws TaskPriorityQueueException;
}

38
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java

@ -14,22 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import static org.apache.dolphinscheduler.common.Constants.TASK_INFO_LENGTH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import org.springframework.stereotype.Service;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.*;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
import static org.apache.dolphinscheduler.common.Constants.*;
import org.springframework.stereotype.Service;
/**
* A singleton of a task queue implemented with zookeeper
* tasks queue implementation
*/
@Service
public class TaskPriorityQueueImpl implements TaskPriorityQueue {
public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
/**
* queue size
*/
@ -44,40 +49,43 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue {
* put task takePriorityInfo
*
* @param taskPriorityInfo takePriorityInfo
* @throws Exception
* @throws TaskPriorityQueueException
*/
@Override
public void put(String taskPriorityInfo) throws Exception {
public void put(String taskPriorityInfo) throws TaskPriorityQueueException {
queue.put(taskPriorityInfo);
}
/**
* take taskInfo
*
* @return taskInfo
* @throws Exception
* @throws TaskPriorityQueueException
*/
@Override
public String take() throws Exception {
public String take() throws TaskPriorityQueueException, InterruptedException {
return queue.take();
}
/**
* queue size
*
* @return size
* @throws Exception
* @throws TaskPriorityQueueException
*/
@Override
public int size() throws Exception {
public int size() throws TaskPriorityQueueException {
return queue.size();
}
/**
* TaskInfoComparator
*/
private class TaskInfoComparator implements Comparator<String>{
private class TaskInfoComparator implements Comparator<String> {
/**
* compare o1 o2
*
* @param o1 o1
* @param o2 o2
* @return compare result
@ -87,15 +95,15 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue {
String s1 = o1;
String s2 = o2;
String[] s1Array = s1.split(UNDERLINE);
if(s1Array.length > TASK_INFO_LENGTH){
if (s1Array.length > TASK_INFO_LENGTH) {
// warning: if this length > 5, need to be changed
s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE) );
s1 = s1.substring(0, s1.lastIndexOf(UNDERLINE));
}
String[] s2Array = s2.split(UNDERLINE);
if(s2Array.length > TASK_INFO_LENGTH){
if (s2Array.length > TASK_INFO_LENGTH) {
// warning: if this length > 5, need to be changed
s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE) );
s2 = s2.substring(0, s2.lastIndexOf(UNDERLINE));
}
return s1.compareTo(s2);

89
dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package queue;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.junit.Assert;
import org.junit.Test;
/**
* Task instances priority queue implementation
* All the task instances are in the same process instance.
*/
public class PeerTaskInstancePriorityQueueTest {
@Test
public void testPut() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM);
queue.put(taskInstanceHigPriority);
queue.put(taskInstanceMediumPriority);
Assert.assertEquals(2,queue.size());
}
@Test
public void testPeek() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size();
queue.peek();
Assert.assertEquals(peekBeforeLength,queue.size());
}
@Test
public void testTake() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size();
queue.take();
Assert.assertTrue(queue.size() < peekBeforeLength);
}
/**
* get queue
*
* @return queue
* @throws Exception
*/
private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception {
PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue();
TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH);
TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM);
queue.put(taskInstanceHigPriority);
queue.put(taskInstanceMediumPriority);
return queue;
}
/**
* create task instance
*
* @param name name
* @param priority priority
* @return
*/
private TaskInstance createTaskInstance(String name, Priority priority) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setName(name);
taskInstance.setTaskInstancePriority(priority);
return taskInstance;
}
}

1
pom.xml

@ -862,6 +862,7 @@
<include>**/service/zk/ZKServerTest.java</include>
<include>**/service/zk/CuratorZookeeperClientTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/service/queue/PeerTaskInstancePriorityQueueTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include>
<!--<iTaskUpdateQueueConsumerThreadnclude>**/dao/mapper/ErrorCommandMapperTest.java</iTaskUpdateQueueConsumerThreadnclude>-->

Loading…
Cancel
Save