diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index d7de840dfa..69058a49b8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -62,6 +62,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -124,12 +125,11 @@ public class TaskPriorityQueueConsumer extends Thread { int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); failedDispatchTasks.clear(); for (int i = 0; i < fetchTaskNum; i++) { - if (taskPriorityQueue.size() <= 0) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); + TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS); + if (Objects.isNull(taskPriority)) { continue; } - // if not task , blocking here - TaskPriority taskPriority = taskPriorityQueue.take(); + boolean dispatchResult = dispatch(taskPriority); if (!dispatchResult) { failedDispatchTasks.add(taskPriority); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 3088080849..aafd7a1524 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -97,7 +97,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private void setTaskCache(TaskExecutionContext taskExecutionContext) { TaskExecutionContext preTaskCache = new TaskExecutionContext(); preTaskCache.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); + taskExecutionContextCacheManager.cacheTaskExecutionContext(preTaskCache); } public TaskExecuteProcessor(AlertClientService alertClientService) { diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 5ce42242ee..aa278a624e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -23,6 +23,8 @@ import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException import java.util.Comparator; import java.util.Iterator; import java.util.PriorityQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * Task instances priority queue implementation @@ -39,6 +41,11 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); + /** + * Lock used for all public operations + */ + private final ReentrantLock lock = new ReentrantLock(true); + /** * put task instance to priority queue * @@ -61,6 +68,23 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue + * WARN: Please use PriorityBlockingQueue if you want to use poll(timeout, unit) + * because this method of override interface used without considering accuracy of timeout + * + * @param timeout + * @param unit + * @return + * @throws TaskPriorityQueueException + * @throws InterruptedException + */ + @Override + public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException { + throw new TaskPriorityQueueException("This operation is not currently supported and suggest to use PriorityBlockingQueue if you want!"); + } + /** * peek taskInfo * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java index 14c6b382d4..1325afe9b2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; +import java.util.concurrent.TimeUnit; + /** * task priority queue * @param @@ -41,6 +43,17 @@ public interface TaskPriorityQueue { */ T take() throws TaskPriorityQueueException, InterruptedException; + + /** + * poll taskInfo with timeout + * @param timeout + * @param unit + * @return + * @throws TaskPriorityQueueException + * @throws InterruptedException + */ + T poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException; + /** * size * diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java index 694d4c4763..8775a272e5 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.TimeUnit; import org.springframework.stereotype.Service; @@ -61,6 +62,20 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue { return queue.take(); } + /** + * poll taskInfo with timeout + * + * @param timeout + * @param unit + * @return + * @throws TaskPriorityQueueException + * @throws InterruptedException + */ + @Override + public TaskPriority poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException { + return queue.poll(timeout,unit); + } + /** * queue size * diff --git a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java similarity index 60% rename from dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index cf39d57b8b..550c4bea3a 100644 --- a/dolphinscheduler-service/src/test/java/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -15,46 +15,76 @@ * limitations under the License. */ -package queue; +package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue; +import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; + +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; -/** - * Task instances priority queue implementation - * All the task instances are in the same process instance. - */ public class PeerTaskInstancePriorityQueueTest { @Test - public void testPut() throws Exception { + public void put() throws TaskPriorityQueueException { PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH); TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM); queue.put(taskInstanceHigPriority); queue.put(taskInstanceMediumPriority); - Assert.assertEquals(2,queue.size()); + Assert.assertEquals(2, queue.size()); } @Test - public void testPeek() throws Exception { + public void take() throws Exception { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); - queue.peek(); - Assert.assertEquals(peekBeforeLength,queue.size()); + queue.take(); + Assert.assertTrue(queue.size() < peekBeforeLength); + } + @Test + public void poll() throws Exception { + PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); + try { + queue.poll(1000, TimeUnit.MILLISECONDS); + } catch (TaskPriorityQueueException e) { + e.printStackTrace(); + } } @Test - public void testTake() throws Exception { + public void peek() throws Exception { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); int peekBeforeLength = queue.size(); - queue.take(); - Assert.assertTrue(queue.size() < peekBeforeLength); + queue.peek(); + Assert.assertEquals(peekBeforeLength, queue.size()); + } + + @Test + public void size() throws Exception { + Assert.assertEquals(2, getPeerTaskInstancePriorityQueue().size()); + } + + @Test + public void contains() throws Exception { + PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); + queue.put(taskInstanceMediumPriority); + Assert.assertTrue(queue.contains(taskInstanceMediumPriority)); + } + + @Test + public void remove() throws Exception { + PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); + queue.put(taskInstanceMediumPriority); + int peekBeforeLength = queue.size(); + queue.remove(taskInstanceMediumPriority); + Assert.assertNotEquals(peekBeforeLength, queue.size()); } /** @@ -66,7 +96,7 @@ public class PeerTaskInstancePriorityQueueTest { private PeerTaskInstancePriorityQueue getPeerTaskInstancePriorityQueue() throws Exception { PeerTaskInstancePriorityQueue queue = new PeerTaskInstancePriorityQueue(); TaskInstance taskInstanceHigPriority = createTaskInstance("high", Priority.HIGH); - TaskInstance taskInstanceMediumPriority = createTaskInstance("high", Priority.MEDIUM); + TaskInstance taskInstanceMediumPriority = createTaskInstance("medium", Priority.MEDIUM); queue.put(taskInstanceHigPriority); queue.put(taskInstanceMediumPriority); return queue; @@ -75,8 +105,8 @@ public class PeerTaskInstancePriorityQueueTest { /** * create task instance * - * @param name name - * @param priority priority + * @param name name + * @param priority priority * @return */ private TaskInstance createTaskInstance(String name, Priority priority) { @@ -85,5 +115,4 @@ public class PeerTaskInstancePriorityQueueTest { taskInstance.setTaskInstancePriority(priority); return taskInstance; } - } \ No newline at end of file diff --git a/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java similarity index 52% rename from dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java rename to dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java index 151177016f..d90011b847 100644 --- a/dolphinscheduler-service/src/test/java/queue/TaskPriorityTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImplTest.java @@ -15,18 +15,19 @@ * limitations under the License. */ -package queue; +package org.apache.dolphinscheduler.service.queue; -import org.apache.dolphinscheduler.service.queue.TaskPriority; +import org.apache.dolphinscheduler.common.enums.Priority; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; -public class TaskPriorityTest { +public class TaskPriorityQueueImplTest { @Test public void testSort() { @@ -36,8 +37,8 @@ public class TaskPriorityTest { List taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys ); priorityOne = new TaskPriority(0, 1, 0, 0, "default"); @@ -46,8 +47,8 @@ public class TaskPriorityTest { taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys ); priorityOne = new TaskPriority(0, 0, 1, 0, "default"); @@ -56,8 +57,8 @@ public class TaskPriorityTest { taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys ); priorityOne = new TaskPriority(0, 0, 0, 1, "default"); @@ -66,8 +67,8 @@ public class TaskPriorityTest { taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys ); priorityOne = new TaskPriority(0, 0, 0, 0, "default_1"); @@ -76,8 +77,66 @@ public class TaskPriorityTest { taskPrioritys = Arrays.asList(priorityOne, priorityThree, priorityTwo); Collections.sort(taskPrioritys); Assert.assertEquals( - Arrays.asList(priorityOne, priorityTwo, priorityThree), - taskPrioritys + Arrays.asList(priorityOne, priorityTwo, priorityThree), + taskPrioritys ); } -} + + @Test + public void put() throws Exception { + TaskPriorityQueue queue = getPriorityQueue(); + Assert.assertEquals(2, queue.size()); + } + + @Test + public void take() throws Exception { + TaskPriorityQueue queue = getPriorityQueue(); + int peekBeforeLength = queue.size(); + queue.take(); + Assert.assertTrue(queue.size() < peekBeforeLength); + } + + @Test + public void poll() throws Exception { + TaskPriorityQueue queue = getPriorityQueue(); + int peekBeforeLength = queue.size(); + queue.poll(1000, TimeUnit.MILLISECONDS); + queue.poll(1000, TimeUnit.MILLISECONDS); + Assert.assertTrue(queue.size() == 0); + System.out.println(System.currentTimeMillis()); + queue.poll(1000, TimeUnit.MILLISECONDS); + System.out.println(System.currentTimeMillis()); + } + + @Test + public void size() throws Exception { + Assert.assertTrue(getPriorityQueue().size() == 2); + } + + /** + * get queue + * + * @return queue + * @throws Exception + */ + private TaskPriorityQueue getPriorityQueue() throws Exception { + TaskPriorityQueue queue = new TaskPriorityQueueImpl(); + TaskPriority taskInstanceHigPriority = createTaskPriority(Priority.HIGH.getCode(), 1); + TaskPriority taskInstanceMediumPriority = createTaskPriority(Priority.MEDIUM.getCode(), 2); + queue.put(taskInstanceHigPriority); + queue.put(taskInstanceMediumPriority); + return queue; + } + + /** + * create task priority + * + * @param priority + * @param processInstanceId + * @return + */ + private TaskPriority createTaskPriority(Integer priority, Integer processInstanceId) { + TaskPriority priorityOne = new TaskPriority(priority, processInstanceId, 0, 0, "default"); + return priorityOne; + } +} \ No newline at end of file diff --git a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java b/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java deleted file mode 100644 index 2c13afa227..0000000000 --- a/dolphinscheduler-service/src/test/java/queue/TaskUpdateQueueTest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package queue; - -import org.apache.dolphinscheduler.service.queue.TaskPriority; -import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; -import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TaskUpdateQueueTest { - - /** - * test put - */ - @Test - public void testQueue() throws Exception{ - - /** - * 1_1_2_1_default - * 1_1_2_2_default - * 1_1_0_3_default - * 1_1_0_4_default - */ - TaskPriority taskInfo1 = new TaskPriority(1, 1, 2, 1, "default"); - TaskPriority taskInfo2 = new TaskPriority(1, 1, 2, 2, "default"); - TaskPriority taskInfo3 = new TaskPriority(1, 1, 0, 3, "default"); - TaskPriority taskInfo4 = new TaskPriority(1, 1, 0, 4, "default"); - - TaskPriorityQueue queue = new TaskPriorityQueueImpl(); - queue.put(taskInfo1); - queue.put(taskInfo2); - queue.put(taskInfo3); - queue.put(taskInfo4); - - assertEquals(taskInfo3, queue.take()); - assertEquals(taskInfo4, queue.take()); - assertEquals(taskInfo1, queue.take()); - assertEquals(taskInfo2, queue.take()); - } -}