Browse Source

[Improvement][master] Replace sleep&task with poll in TaskPriorityQueueConsumer#4137

pull/3/MERGE
wuxiaofei 3 years ago
parent
commit
b352c52f33
  1. 13
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  2. 13
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java
  3. 15
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java

13
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -58,11 +58,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -124,12 +120,11 @@ public class TaskPriorityQueueConsumer extends Thread {
int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber(); int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();
failedDispatchTasks.clear(); failedDispatchTasks.clear();
for (int i = 0; i < fetchTaskNum; i++) { for (int i = 0; i < fetchTaskNum; i++) {
if (taskPriorityQueue.size() <= 0) { TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
Thread.sleep(Constants.SLEEP_TIME_MILLIS); if (Objects.isNull(taskPriority)) {
continue; continue;
} }
// if not task , blocking here
TaskPriority taskPriority = taskPriorityQueue.take();
boolean dispatchResult = dispatch(taskPriority); boolean dispatchResult = dispatch(taskPriority);
if (!dispatchResult) { if (!dispatchResult) {
failedDispatchTasks.add(taskPriority); failedDispatchTasks.add(taskPriority);

13
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueue.java

@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.concurrent.TimeUnit;
/** /**
* task priority queue * task priority queue
* @param <T> * @param <T>
@ -41,6 +43,17 @@ public interface TaskPriorityQueue<T> {
*/ */
T take() throws TaskPriorityQueueException, InterruptedException; T take() throws TaskPriorityQueueException, InterruptedException;
/**
* poll taskInfo with timeout
* @param timeout
* @param unit
* @return
* @throws TaskPriorityQueueException
* @throws InterruptedException
*/
T poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException;
/** /**
* size * size
* *

15
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/TaskPriorityQueueImpl.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException; import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -61,6 +62,20 @@ public class TaskPriorityQueueImpl implements TaskPriorityQueue<TaskPriority> {
return queue.take(); return queue.take();
} }
/**
* poll taskInfo with timeout
*
* @param timeout
* @param unit
* @return
* @throws TaskPriorityQueueException
* @throws InterruptedException
*/
@Override
public TaskPriority poll(long timeout, TimeUnit unit) throws TaskPriorityQueueException, InterruptedException {
return queue.poll(timeout,unit);
}
/** /**
* queue size * queue size
* *

Loading…
Cancel
Save