Browse Source

[BUG][PeerTaskInstancePriorityQueue]poll with timeout is not currently supported

pull/3/MERGE
wuxiaofei 4 years ago
parent
commit
8986c845a9
  1. 17
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
  2. 25
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java

17
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@ -70,6 +70,7 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
/** /**
* poll task info with timeout * poll task info with timeout
* <p>
* WARN: Please use PriorityBlockingQueue if you want to use poll(timeout, unit) * WARN: Please use PriorityBlockingQueue if you want to use poll(timeout, unit)
* because this method of override interface used without considering accuracy of timeout * because this method of override interface used without considering accuracy of timeout
* *
@ -80,19 +81,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
* @throws InterruptedException * @throws InterruptedException
*/ */
@Override @Override
public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException { public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException {
long nanos = unit.toNanos(timeout); throw new TaskPriorityQueueException("This operation is not currently supported," +
final ReentrantLock lock = this.lock; "and suggest to use PriorityBlockingQueue if you want!");
lock.lockInterruptibly();
TaskInstance result;
try {
while ((result = queue.poll()) == null && nanos > 0) {
nanos--;
}
} finally {
lock.unlock();
}
return result;
} }
/** /**

25
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java

@ -49,26 +49,11 @@ public class PeerTaskInstancePriorityQueueTest {
@Test @Test
public void poll() throws Exception { public void poll() throws Exception {
PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue(); PeerTaskInstancePriorityQueue queue = getPeerTaskInstancePriorityQueue();
int peekBeforeLength = queue.size(); try {
queue.poll(1000, TimeUnit.MILLISECONDS); queue.poll(1000, TimeUnit.MILLISECONDS);
queue.poll(1000, TimeUnit.MILLISECONDS); } catch (TaskPriorityQueueException e) {
Assert.assertEquals(0, queue.size()); e.printStackTrace();
Thread producer = new Thread(() -> { }
System.out.println(String.format("Ready to producing...,now time is %s ", System.currentTimeMillis()));
try {
Thread.sleep(100);
TaskInstance task = createTaskInstance("low_task", Priority.LOW);
queue.put(task);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(String.format("End to produce %s at time %s",
queue.peek() != null ? queue.peek().getName() : null, System.currentTimeMillis()));
});
producer.start();
System.out.println("Begin to consume at " + System.currentTimeMillis());
queue.poll(1000, TimeUnit.MILLISECONDS);
System.out.println("End to consume at " + System.currentTimeMillis());
} }
@Test @Test

Loading…
Cancel
Save