Browse Source

Fix WorkerTaskExecutorThreadPool#isOverload is incorrect (#16027)

upstream-dev
Wenjun Ruan 5 months ago committed by GitHub
parent
commit
b184364561
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
  2. 11
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
  3. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
  4. 252
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java

@ -48,6 +48,10 @@ public class WorkerTaskExecutorHolder {
workerTaskExecutorMap.clear();
}
public static int size() {
return workerTaskExecutorMap.size();
}
public static Collection<WorkerTaskExecutor> getAllTaskExecutor() {
return workerTaskExecutorMap.values();
}

11
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java

@ -39,6 +39,7 @@ public class WorkerTaskExecutorThreadPool {
public WorkerTaskExecutorThreadPool(WorkerConfig workerConfig) {
this.threadPoolExecutor =
ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool", workerConfig.getExecThreads());
threadPoolExecutor.prestartAllCoreThreads();
this.workerConfig = workerConfig;
WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(this::getWaitingTaskExecutorSize);
@ -64,15 +65,19 @@ public class WorkerTaskExecutorThreadPool {
}
public boolean isOverload() {
return threadPoolExecutor.getQueue().size() > 0;
return WorkerTaskExecutorHolder.size() >= workerConfig.getExecThreads();
}
public int getWaitingTaskExecutorSize() {
return threadPoolExecutor.getQueue().size();
if (WorkerTaskExecutorHolder.size() <= workerConfig.getExecThreads()) {
return 0;
} else {
return WorkerTaskExecutorHolder.size() - workerConfig.getExecThreads();
}
}
public int getRunningTaskExecutorSize() {
return threadPoolExecutor.getActiveCount();
return Math.min(WorkerTaskExecutorHolder.size(), workerConfig.getExecThreads());
}
/**

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java

@ -89,8 +89,8 @@ public class TaskInstanceKillOperationFunction
taskExecutionContext
.setCurrentExecutionStatus(result ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
WorkerTaskExecutorHolder.remove(taskInstanceId);
messageRetryRunner.removeRetryMessages(taskInstanceId);
return TaskInstanceKillResponse.success(taskExecutionContext);
} finally {
LogUtils.removeTaskInstanceIdMDC();

252
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java

@ -17,48 +17,127 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.io.IOException;
import java.util.List;
import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.google.common.truth.Truth;
class WorkerTaskExecutorThreadPoolTest {
@BeforeEach
public void setUp() {
WorkerTaskExecutorHolder.clear();
}
@Test
public void testIsOverload() {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setExecThreads(1);
workerConfig.setTaskExecuteThreadsFullPolicy(TaskExecuteThreadsFullPolicy.CONTINUE);
final int execThreadCount = RandomUtils.nextInt(1, 100);
final int totalTaskCount = RandomUtils.nextInt(1, 10000);
final WorkerConfig workerConfig = createWorkerConfig(execThreadCount, TaskExecuteThreadsFullPolicy.CONTINUE);
final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
new WorkerTaskExecutorThreadPool(workerConfig);
// submit totalTaskCount task, the thread pool size is execThreadCount, reject policy is CONTINUE
// after submit execThreadCount task, the thread pool is overload
for (int i = 1; i <= totalTaskCount; i++) {
MockWorkerTaskExecutor mockWorkerTaskExecutor =
new MockWorkerTaskExecutor(() -> ThreadUtils.sleep(10_000L));
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
if (i >= execThreadCount) {
Truth.assertThat(workerTaskExecutorThreadPool.isOverload()).isTrue();
} else {
Truth.assertThat(workerTaskExecutorThreadPool.isOverload()).isFalse();
}
}
}
@Test
public void testSubmitWorkerTaskExecutorWithContinuePolicy() {
final int execThreadCount = RandomUtils.nextInt(1, 100);
final int totalTaskCount = RandomUtils.nextInt(1, 10000);
final WorkerConfig workerConfig = createWorkerConfig(execThreadCount, TaskExecuteThreadsFullPolicy.CONTINUE);
final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
new WorkerTaskExecutorThreadPool(workerConfig);
// submit totalTaskCount task, the thread pool size is execThreadCount, reject policy is CONTINUE
// all task will be submitted success
for (int i = 1; i <= totalTaskCount; i++) {
MockWorkerTaskExecutor mockWorkerTaskExecutor =
new MockWorkerTaskExecutor(() -> ThreadUtils.sleep(10_000L));
Truth.assertThat(workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor)).isTrue();
}
}
@Test
public void testSubmitWorkerTaskExecutorWithRejectPolicy() {
final int execThreadCount = RandomUtils.nextInt(1, 100);
final int totalTaskCount = RandomUtils.nextInt(1, 10000);
final WorkerConfig workerConfig = createWorkerConfig(execThreadCount, TaskExecuteThreadsFullPolicy.REJECT);
final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
new WorkerTaskExecutorThreadPool(workerConfig);
// submit totalTaskCount task, the thread pool size is execThreadCount, reject policy is REJECT
// only the front execThreadCount task will be submitted success
for (int i = 1; i <= totalTaskCount; i++) {
MockWorkerTaskExecutor mockWorkerTaskExecutor =
new MockWorkerTaskExecutor(() -> ThreadUtils.sleep(10_000L));
boolean submitResult = workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
if (i <= execThreadCount) {
Assertions.assertTrue(submitResult, "The " + i + " task should submit success");
} else {
Assertions.assertFalse(submitResult, "The " + i + " task should submit failed");
}
}
}
@Test
public void testGetWaitingTaskExecutorSize() {
final int execThreadCount = RandomUtils.nextInt(1, 100);
final int totalTaskCount = RandomUtils.nextInt(1, 10000);
final WorkerConfig workerConfig = createWorkerConfig(execThreadCount, TaskExecuteThreadsFullPolicy.CONTINUE);
final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
new WorkerTaskExecutorThreadPool(workerConfig);
Truth.assertThat(workerTaskExecutorThreadPool.getWaitingTaskExecutorSize()).isEqualTo(0);
for (int i = 1; i <= totalTaskCount; i++) {
MockWorkerTaskExecutor mockWorkerTaskExecutor =
new MockWorkerTaskExecutor(() -> ThreadUtils.sleep(10_000L));
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
if (i <= execThreadCount) {
Truth.assertThat(workerTaskExecutorThreadPool.getWaitingTaskExecutorSize()).isEqualTo(0);
} else {
Truth.assertThat(workerTaskExecutorThreadPool.getWaitingTaskExecutorSize())
.isEqualTo(i - execThreadCount);
}
}
}
@Test
public void testGetRunningTaskExecutorSize() {
final int execThreadCount = RandomUtils.nextInt(1, 100);
final int totalTaskCount = RandomUtils.nextInt(1, 10000);
WorkerConfig workerConfig = createWorkerConfig(execThreadCount, TaskExecuteThreadsFullPolicy.CONTINUE);
WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool = new WorkerTaskExecutorThreadPool(workerConfig);
// submit 100 task, the thread pool size is 1
// assert the overload should be true
// assert the submitQueue should be 99
for (int i = 0; i < 100; i++) {
boolean submitResult =
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(new MockWorkerTaskExecutor(() -> {
try {
Thread.sleep(10_000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}));
Assertions.assertTrue(submitResult);
Truth.assertThat(workerTaskExecutorThreadPool.getRunningTaskExecutorSize()).isEqualTo(0);
for (int i = 1; i <= totalTaskCount; i++) {
MockWorkerTaskExecutor mockWorkerTaskExecutor =
new MockWorkerTaskExecutor(() -> ThreadUtils.sleep(10_000L));
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
if (i <= execThreadCount) {
Truth.assertThat(workerTaskExecutorThreadPool.getRunningTaskExecutorSize()).isEqualTo(i);
} else {
Truth.assertThat(workerTaskExecutorThreadPool.getRunningTaskExecutorSize()).isEqualTo(execThreadCount);
}
}
Assertions.assertTrue(workerTaskExecutorThreadPool.isOverload());
Assertions.assertEquals(99, workerTaskExecutorThreadPool.getWaitingTaskExecutorSize());
Assertions.assertEquals(1, workerTaskExecutorThreadPool.getRunningTaskExecutorSize());
}
static class MockWorkerTaskExecutor extends WorkerTaskExecutor {
@ -66,116 +145,11 @@ class WorkerTaskExecutorThreadPoolTest {
private final Runnable runnable;
protected MockWorkerTaskExecutor(Runnable runnable) {
super(TaskExecutionContext.builder().taskInstanceId((int) System.nanoTime()).build(), new WorkerConfig(),
new WorkerMessageSender(), new StorageOperate() {
@Override
public void createTenantDirIfNotExists(String tenantCode) {
}
@Override
public String getResDir(String tenantCode) {
return null;
}
@Override
public String getUdfDir(String tenantCode) {
return null;
}
@Override
public boolean mkdir(String tenantCode, String path) throws IOException {
return false;
}
@Override
public String getResourceFullName(String tenantCode, String fileName) {
return null;
}
@Override
public String getResourceFileName(String tenantCode, String fullName) {
return null;
}
@Override
public String getFileName(ResourceType resourceType, String tenantCode, String fileName) {
return null;
}
@Override
public boolean exists(String fullName) {
return false;
}
@Override
public boolean delete(String filePath, boolean recursive) {
return false;
}
@Override
public boolean delete(String filePath, List<String> childrenPathArray,
boolean recursive) {
return false;
}
@Override
public boolean copy(String srcPath, String dstPath, boolean deleteSource,
boolean overwrite) {
return false;
}
@Override
public String getDir(ResourceType resourceType, String tenantCode) {
return null;
}
@Override
public boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource,
boolean overwrite) {
return false;
}
@Override
public void download(String srcFilePath, String dstFile, boolean overwrite) {
}
@Override
public List<String> vimFile(String tenantCode, String filePath, int skipLineNums,
int limit) {
return null;
}
@Override
public void deleteTenant(String tenantCode) {
}
@Override
public ResUploadType returnStorageType() {
return null;
}
@Override
public List<StorageEntity> listFilesStatusRecursively(String path, String defaultPath,
String tenantCode, ResourceType type) {
return null;
}
@Override
public List<StorageEntity> listFilesStatus(String path, String defaultPath, String tenantCode,
ResourceType type) throws Exception {
return null;
}
@Override
public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode,
ResourceType type) throws Exception {
return null;
}
}, new WorkerRegistryClient());
super(TaskExecutionContext.builder().taskInstanceId((int) System.nanoTime()).build(),
new WorkerConfig(),
new WorkerMessageSender(),
null,
new WorkerRegistryClient());
this.runnable = runnable;
}
@ -190,4 +164,12 @@ class WorkerTaskExecutorThreadPoolTest {
}
}
private WorkerConfig createWorkerConfig(int execThreads,
TaskExecuteThreadsFullPolicy taskExecuteThreadsFullPolicy) {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setExecThreads(execThreads);
workerConfig.setTaskExecuteThreadsFullPolicy(taskExecuteThreadsFullPolicy);
return workerConfig;
}
}

Loading…
Cancel
Save