Browse Source

[Performance][PeerTaskInstancePriorityQueue]Nearly realization of poll with timeout

pull/3/MERGE
wuxiaofei 4 years ago
parent
commit
3a215cb952
  1. 29
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java
  2. 21
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java

29
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueue.java

@ -24,6 +24,8 @@ import java.util.Comparator;
import java.util.Iterator; import java.util.Iterator;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* Task instances priority queue implementation * Task instances priority queue implementation
@ -40,6 +42,16 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
*/ */
private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator()); private PriorityQueue<TaskInstance> queue = new PriorityQueue<>(QUEUE_MAX_SIZE, new TaskInfoComparator());
/**
* Lock used for all public operations
*/
private final ReentrantLock lock = new ReentrantLock(true);
/**
* Condition for blocking when empty
*/
private final Condition notEmpty = lock.newCondition();
/** /**
* put task instance to priority queue * put task instance to priority queue
* *
@ -64,6 +76,9 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
/** /**
* poll task info with timeout * poll task info with timeout
* WARN: Please use PriorityBlockingQueue if you want to use poll(timeout, unit)
* because this method of override interface used without considering accuracy of timeout
*
* @param timeout * @param timeout
* @param unit * @param unit
* @return * @return
@ -72,10 +87,18 @@ public class PeerTaskInstancePriorityQueue implements TaskPriorityQueue<TaskInst
*/ */
@Override @Override
public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException { public TaskInstance poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException {
if (queue.isEmpty()) { long nanos = unit.toNanos(timeout);
unit.sleep(timeout); final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
TaskInstance result;
try {
while ((result = queue.poll()) == null && nanos > 0) {
nanos--;
} }
return queue.poll(); } finally {
lock.unlock();
}
return result;
} }
/** /**

21
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/queue/PeerTaskInstancePriorityQueueTest.java

@ -52,10 +52,23 @@ public class PeerTaskInstancePriorityQueueTest {
int peekBeforeLength = queue.size(); int peekBeforeLength = queue.size();
queue.poll(1000, TimeUnit.MILLISECONDS); queue.poll(1000, TimeUnit.MILLISECONDS);
queue.poll(1000, TimeUnit.MILLISECONDS); queue.poll(1000, TimeUnit.MILLISECONDS);
Assert.assertTrue(queue.size() == 0); Assert.assertEquals(0, queue.size());
System.out.println(System.currentTimeMillis()); Thread producer = new Thread(() -> {
System.out.println(String.format("Ready to producing...,now time is %s ", System.currentTimeMillis()));
try {
Thread.sleep(100);
TaskInstance task = createTaskInstance("low_task", Priority.LOW);
queue.put(task);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(String.format("End to produce %s at time %s",
queue.peek() != null ? queue.peek().getName() : null, System.currentTimeMillis()));
});
producer.start();
System.out.println("Begin to consume at " + System.currentTimeMillis());
queue.poll(1000, TimeUnit.MILLISECONDS); queue.poll(1000, TimeUnit.MILLISECONDS);
System.out.println(System.currentTimeMillis()); System.out.println("End to consume at " + System.currentTimeMillis());
} }
@Test @Test
@ -68,7 +81,7 @@ public class PeerTaskInstancePriorityQueueTest {
@Test @Test
public void size() throws Exception { public void size() throws Exception {
Assert.assertTrue(getPeerTaskInstancePriorityQueue().size() == 2); Assert.assertEquals(2, getPeerTaskInstancePriorityQueue().size());
} }
@Test @Test

Loading…
Cancel
Save