Browse Source

[Fix-4222][Master]Add the priority queue to ensure that tasks are submitted according to priority.

lgcareer 4 years ago
parent
commit
c5bb72c6e3
  1. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  2. 65
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
  3. 124
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
  4. 6
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
  5. 7
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -71,7 +71,7 @@ public class TaskPriorityQueueConsumer extends Thread{
* taskUpdateQueue
*/
@Autowired
private TaskPriorityQueue taskPriorityQueue;
private TaskPriorityQueue<String> taskPriorityQueue;
/**
* processService

65
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java

@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.runner;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.graph.DAG;
@ -38,12 +37,10 @@ import org.apache.dolphinscheduler.server.utils.AlertManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@ -97,9 +94,9 @@ public class MasterExecThread implements Runnable {
private Map<String, TaskInstance> completeTaskList = new ConcurrentHashMap<>();
/**
* ready to submit task list
* ready to submit task queue
*/
private Map<String, TaskInstance> readyToSubmitTaskList = new ConcurrentHashMap<>();
private PeerTaskInstancePriorityQueue readyToSubmitTaskQueue = new PeerTaskInstancePriorityQueue();
/**
* depend failed task map
@ -491,7 +488,7 @@ public class MasterExecThread implements Runnable {
}
// if previous node success , post node submit
for(TaskInstance task : taskInstances){
if(readyToSubmitTaskList.containsKey(task.getName())){
if(readyToSubmitTaskQueue.contains(task)){
continue;
}
if(completeTaskList.containsKey(task.getName())){
@ -625,7 +622,7 @@ public class MasterExecThread implements Runnable {
return true;
}
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
return readyToSubmitTaskList.size() == 0 || activeTaskNode.size() == 0;
return readyToSubmitTaskQueue.size() == 0 || activeTaskNode.size() == 0;
}
}
return false;
@ -655,7 +652,7 @@ public class MasterExecThread implements Runnable {
List<TaskInstance> pauseList = getCompleteTaskByState(ExecutionStatus.PAUSE);
if(CollectionUtils.isNotEmpty(pauseList)
|| !isComplementEnd()
|| readyToSubmitTaskList.size() > 0){
|| readyToSubmitTaskQueue.size() > 0){
return ExecutionStatus.PAUSE;
}else{
return ExecutionStatus.SUCCESS;
@ -706,7 +703,7 @@ public class MasterExecThread implements Runnable {
// success
if(state == ExecutionStatus.RUNNING_EXEUTION){
List<TaskInstance> killTasks = getCompleteTaskByState(ExecutionStatus.KILL);
if(readyToSubmitTaskList.size() > 0){
if(readyToSubmitTaskQueue.size() > 0){
//tasks currently pending submission, no retries, indicating that depend is waiting to complete
return ExecutionStatus.RUNNING_EXEUTION;
}else if(CollectionUtils.isNotEmpty(killTasks)){
@ -729,8 +726,8 @@ public class MasterExecThread implements Runnable {
boolean result = false;
for(String taskName : readyToSubmitTaskList.keySet()){
TaskInstance task = readyToSubmitTaskList.get(taskName);
for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
TaskInstance task = iter.next();
if(task.getState().typeIsFailure()){
result = true;
break;
@ -794,7 +791,11 @@ public class MasterExecThread implements Runnable {
*/
private void addTaskToStandByList(TaskInstance taskInstance){
logger.info("add task to stand by list: {}", taskInstance.getName());
readyToSubmitTaskList.putIfAbsent(taskInstance.getName(), taskInstance);
try {
readyToSubmitTaskQueue.put(taskInstance);
} catch (Exception e) {
logger.error("add task instance to readyToSubmitTaskQueue error");
}
}
/**
@ -803,7 +804,11 @@ public class MasterExecThread implements Runnable {
*/
private void removeTaskFromStandbyList(TaskInstance taskInstance){
logger.info("remove task from stand by list: {}", taskInstance.getName());
readyToSubmitTaskList.remove(taskInstance.getName());
try {
readyToSubmitTaskQueue.remove(taskInstance);
} catch (Exception e) {
logger.error("remove task instance from readyToSubmitTaskQueue error");
}
}
/**
@ -811,10 +816,11 @@ public class MasterExecThread implements Runnable {
* @return Boolean whether has retry task in standby
*/
private boolean hasRetryTaskInStandBy(){
for (Map.Entry<String, TaskInstance> entry: readyToSubmitTaskList.entrySet()) {
if(entry.getValue().getState().typeIsFailure()){
for (Iterator<TaskInstance> iter = readyToSubmitTaskQueue.iterator(); iter.hasNext();) {
if(iter.next().getState().typeIsFailure()){
return true;
}
}
return false;
}
@ -996,20 +1002,25 @@ public class MasterExecThread implements Runnable {
* handling the list of tasks to be submitted
*/
private void submitStandByTask(){
for(Map.Entry<String, TaskInstance> entry: readyToSubmitTaskList.entrySet()) {
TaskInstance task = entry.getValue();
DependResult dependResult = getDependResultForTask(task);
if(DependResult.SUCCESS == dependResult){
if(retryTaskIntervalOverTime(task)){
submitTaskExec(task);
try {
for (int i=0;i<readyToSubmitTaskQueue.size();i++) {
TaskInstance task = readyToSubmitTaskQueue.peek();
DependResult dependResult = getDependResultForTask(task);
if(DependResult.SUCCESS == dependResult){
if(retryTaskIntervalOverTime(task)){
submitTaskExec(task);
removeTaskFromStandbyList(task);
}
}else if(DependResult.FAILED == dependResult){
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTask.put(task.getName(), task);
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult);
}
}else if(DependResult.FAILED == dependResult){
// if the dependency fails, the current node is not submitted and the state changes to failure.
dependFailedTask.put(entry.getKey(), task);
removeTaskFromStandbyList(task);
logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult);
}
} catch (Exception e) {
e.printStackTrace();
}
}

124
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
/**
* Task instances priority queue implementation
* All the task instances are in the same process instance.
*/
public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInstance> {
/**
* queue size
*/
private static final Integer QUEUE_MAX_SIZE = 3000;
/**
* queue
*/
private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
/**
* put task instance to priority queue
*
* @param taskInstance taskInstance
* @throws Exception
*/
public void put(TaskInstance taskInstance) throws Exception {
queue.add(taskInstance);
}
/**
* take task info
* @return task instance
* @throws Exception
*/
@Override
public TaskInstance take() throws Exception {
return queue.poll();
}
/**
* peek taskInfo
*
* @return task instance
* @throws Exception
*/
public TaskInstance peek() throws Exception {
return queue.peek();
}
/**
* queue size
*
* @return size
*/
public int size() {
return queue.size();
}
/**
* whether contains the task instance
* @param taskInstance task instance
* @return true is contains
*/
public boolean contains(TaskInstance taskInstance) {
return queue.contains(taskInstance);
}
/**
* remove task
* @param taskInstance task instance
* @return true if remove success
* @throws Exception
*/
public boolean remove(TaskInstance taskInstance) throws Exception {
return queue.remove(taskInstance);
}
/**
* get iterator
* @return Iterator
*/
public Iterator iterator(){
return queue.iterator();
}
/**
* TaskInfoComparator
*/
private class TaskInfoComparator implements Comparator<TaskInstance> {
/**
* compare o1 o2
*
* @param o1 o1
* @param o2 o2
* @return compare result
*/
@Override
public int compare(TaskInstance o1, TaskInstance o2) {
return o1.getTaskInstancePriority().compareTo(o2.getTaskInstancePriority());
}
}
}

6
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.service.queue;
public interface TaskPriorityQueue {
public interface TaskPriorityQueue<T> {
/**
* put task info
@ -25,14 +25,14 @@ public interface TaskPriorityQueue {
* @param taskInfo taskInfo
* @throws Exception
*/
void put(String taskInfo) throws Exception;
void put(T taskInfo) throws Exception;
/**
* take taskInfo
* @return taskInfo
* @throws Exception
*/
String take()throws Exception;
T take()throws Exception;
/**
* size

7
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java

@ -19,17 +19,18 @@ package org.apache.dolphinscheduler.service.queue;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
import static org.apache.dolphinscheduler.common.Constants.*;
import static org.apache.dolphinscheduler.common.Constants.TASK_INFO_LENGTH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
/**
* A singleton of a task queue implemented with zookeeper
* tasks queue implementation
*/
@Service
public class TaskPriorityQueueImpl implements TaskPriorityQueue {
public class TaskPriorityQueueImpl implements TaskPriorityQueue<String> {
/**
* queue size
*/

Loading…
Cancel
Save