Browse Source

[DS-6961][MasterServer] batch dispatch (#6962)

* [DS-6961][MasterServer] batch dispatch

* fix test

Co-authored-by: caishunfeng <534328519@qq.com>
3.0.0/version-upgrade
wind 3 years ago committed by GitHub
parent
commit
e6fe39ea47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  2. 58
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  3. 1
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java
  4. 2
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

9
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -31,7 +31,6 @@ public class MasterConfig {
private int fetchCommandNum;
private int preExecThreads;
private int execThreads;
private int execTaskNum;
private int dispatchTaskNumber;
private HostSelector hostSelector;
private int heartbeatInterval;
@ -74,14 +73,6 @@ public class MasterConfig {
this.execThreads = execThreads;
}
public int getExecTaskNum() {
return execTaskNum;
}
public void setExecTaskNum(int execTaskNum) {
this.execTaskNum = execTaskNum;
}
public int getDispatchTaskNumber() {
return dispatchTaskNumber;
}

58
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -19,12 +19,14 @@ package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
@ -33,6 +35,8 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
@ -78,30 +82,24 @@ public class TaskPriorityQueueConsumer extends Thread {
@Autowired
private MasterConfig masterConfig;
/**
* consumer thread pool
*/
private ThreadPoolExecutor consumerThreadPoolExecutor;
@PostConstruct
public void init() {
super.setName("TaskUpdateQueueConsumerThread");
this.consumerThreadPoolExecutor = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("TaskUpdateQueueConsumerThread", masterConfig.getDispatchTaskNumber());
super.start();
}
@Override
public void run() {
List<TaskPriority> failedDispatchTasks = new ArrayList<>();
int fetchTaskNum = masterConfig.getDispatchTaskNumber();
while (Stopper.isRunning()) {
try {
int fetchTaskNum = masterConfig.getDispatchTaskNumber();
failedDispatchTasks.clear();
for (int i = 0; i < fetchTaskNum; i++) {
TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (Objects.isNull(taskPriority)) {
continue;
}
List<TaskPriority> failedDispatchTasks = this.batchDispatch(fetchTaskNum);
boolean dispatchResult = dispatch(taskPriority);
if (!dispatchResult) {
failedDispatchTasks.add(taskPriority);
}
}
if (!failedDispatchTasks.isEmpty()) {
for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
@ -118,13 +116,41 @@ public class TaskPriorityQueueConsumer extends Thread {
}
}
/**
* batch dispatch with thread pool
*/
private List<TaskPriority> batchDispatch(int fetchTaskNum) throws TaskPriorityQueueException, InterruptedException {
List<TaskPriority> failedDispatchTasks = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(fetchTaskNum);
for (int i = 0; i < fetchTaskNum; i++) {
TaskPriority taskPriority = taskPriorityQueue.poll(Constants.SLEEP_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (Objects.isNull(taskPriority)) {
latch.countDown();
continue;
}
consumerThreadPoolExecutor.submit(() -> {
boolean dispatchResult = this.dispatchTask(taskPriority);
if (!dispatchResult) {
failedDispatchTasks.add(taskPriority);
}
latch.countDown();
});
}
latch.await();
return failedDispatchTasks;
}
/**
* dispatch task
*
* @param taskPriority taskPriority
* @return result
*/
protected boolean dispatch(TaskPriority taskPriority) {
protected boolean dispatchTask(TaskPriority taskPriority) {
boolean result = false;
try {
TaskExecutionContext context = taskPriority.getTaskExecutionContext();
@ -158,8 +184,6 @@ public class TaskPriorityQueueConsumer extends Thread {
/**
* check if task need to check state, if true, refresh the checkpoint
* @param taskPriority
* @return
*/
private boolean isTaskNeedToCheck(TaskPriority taskPriority) {
long now = System.currentTimeMillis();

1
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java

@ -88,7 +88,6 @@ public class WorkflowExecuteThreadTest {
applicationContext = mock(ApplicationContext.class);
config = new MasterConfig();
config.setExecTaskNum(1);
Mockito.when(applicationContext.getBean(MasterConfig.class)).thenReturn(config);
processInstance = mock(ProcessInstance.class);

2
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -307,7 +307,7 @@ public class TaskPriorityQueueConsumerTest {
TaskPriority taskPriority = new TaskPriority();
taskPriority.setTaskId(1);
boolean res = taskPriorityQueueConsumer.dispatch(taskPriority);
boolean res = taskPriorityQueueConsumer.dispatchTask(taskPriority);
Assert.assertFalse(res);
}

Loading…
Cancel
Save