From 8986c845a91069ff998728f9bd22ed29f817eecf Mon Sep 17 00:00:00 2001 From: wuxiaofei Date: Wed, 24 Feb 2021 11:47:19 +0800 Subject: [PATCH] [BUG][PeerTaskInstancePriorityQueue]poll with timeout is not currently supported --- .../queue/PeerTaskInstancePriorityQueue.java | 17 +++---------- .../PeerTaskInstancePriorityQueueTest.java | 25 ++++--------------- 2 files changed, 9 insertions(+), 33 deletions(-) diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java index 7a1bc799ed..40edfc36a2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java @@ -70,6 +70,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue * WARN: Please use PriorityBlockingQueue if you want to use poll(timeout, unit) * because this method of override interface used without considering accuracy of timeout * @@ -80,19 +81,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue 0) { - nanos--; - } - } finally { - lock.unlock(); - } - return result; + public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException { + throw new TaskPriorityQueueException("This operation is not currently supported," + + "and suggest to use PriorityBlockingQueue if you want!"); } /** diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java index 0d9833354e..550c4bea3a 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java @@ -49,26 +49,11 @@ public class PeerTaskInstancePriorityQueueTest { @Test public void poll() throws Exception { PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); - int peekBeforeLength = queue.size(); - queue.poll(1000, TimeUnit.MILLISECONDS); - queue.poll(1000, TimeUnit.MILLISECONDS); - Assert.assertEquals(0, queue.size()); - Thread producer = new Thread(() -> { - System.out.println(String.format("Ready to producing...,now time is %s ", System.currentTimeMillis())); - try { - Thread.sleep(100); - TaskInstance task = createTaskInstance("low_task", Priority.LOW); - queue.put(task); - } catch (Exception e) { - e.printStackTrace(); - } - System.out.println(String.format("End to produce %s at time %s", - queue.peek() != null ? queue.peek().getName() : null, System.currentTimeMillis())); - }); - producer.start(); - System.out.println("Begin to consume at " + System.currentTimeMillis()); - queue.poll(1000, TimeUnit.MILLISECONDS); - System.out.println("End to consume at " + System.currentTimeMillis()); + try { + queue.poll(1000, TimeUnit.MILLISECONDS); + } catch (TaskPriorityQueueException e) { + e.printStackTrace(); + } } @Test