diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index d7de840dfa..c4af5c0b03 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -58,11 +58,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -124,12 +120,11 @@ public class TaskPriorityQueueConsumer extends Thread { int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); failedDispatchTasks.clear(); for (int i = 0; i < fetchTaskNum; i++) { - if (taskPriorityQueue.size() <= 0) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); + TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); + if (Objects.isNull(taskPriority)) { continue; } - // if not task , blocking here - TaskPriority taskPriority = taskPriorityQueue.take(); + boolean dispatchResult = dispatch(taskPriority); if (!dispatchResult) { failedDispatchTasks.add(taskPriority); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java index 14c6b382d4..1325afe9b2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; +import java.util.concurrent.TimeUnit; + /** * task priority queue * @param @@ -41,6 +43,17 @@ public interface TaskPriorityQueue { */ T take() throws TaskPriorityQueueException, InterruptedException; + + /** + * poll taskInfo with timeout + * @param timeout + * @param unit + * @return + * @throws TaskPriorityQueueException + * @throws InterruptedException + */ + T poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException; + /** * size * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java index 694d4c4763..8775a272e5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; import org.springframework.stereotype.Service; @@ -61,6 +62,20 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { return queue.take(); } + /** + * poll taskInfo with timeout + * + * @param timeout + * @param unit + * @return + * @throws TaskPriorityQueueException + * @throws InterruptedException + */ + @Override + public TaskPriority poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException { + return queue.poll(timeout,unit); + } + /** * queue size *