Browse Source

Remove taskQueue and looper in worker (#15292)

augit-log
Wenjun Ruan 5 months ago committed by GitHub
parent
commit
5523a62825
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java
  2. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  3. 16
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java
  4. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java
  5. 52
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java
  6. 84
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java
  7. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java
  8. 16
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java
  9. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java
  10. 5
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java
  11. 15
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java
  12. 25
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java
  13. 58
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java
  14. 34
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java
  15. 59
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java
  16. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java
  17. 16
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  18. 72
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java
  19. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
  20. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
  21. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java
  22. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java
  23. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java
  24. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java
  25. 38
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  26. 13
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java
  27. 6
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
  28. 11
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java
  29. 19
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java
  30. 14
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java
  31. 70
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java
  32. 101
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java
  33. 18
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java
  34. 91
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java
  35. 150
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
  36. 9
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java
  37. 33
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java
  38. 54
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
  39. 96
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
  40. 20
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java
  41. 37
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
  42. 33
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java
  43. 4
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
  44. 194
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java

@ -17,10 +17,10 @@
package org.apache.dolphinscheduler.common.thread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
@ -38,9 +38,9 @@ public class ThreadUtils {
* @param threadsNum threadsNum
* @return ExecutorService
*/
public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String threadName, int threadsNum) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build();
return Executors.newFixedThreadPool(threadsNum, threadFactory);
return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, threadFactory);
}
public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(String threadName) {

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -64,9 +64,9 @@ public class MasterConfig implements Validator {
private int execThreads = 10;
// todo: change to sync thread pool/ async thread pool ?
private int masterTaskExecuteThreadPoolSize = Runtime.getRuntime().availableProcessors();
private int masterSyncTaskExecutorThreadPoolSize = Runtime.getRuntime().availableProcessors();
private int masterAsyncTaskStateCheckThreadPoolSize = Runtime.getRuntime().availableProcessors();
private int masterAsyncTaskExecutorThreadPoolSize = Runtime.getRuntime().availableProcessors();
/**
* The task dispatch thread pool size.
*/

16
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java

@ -21,10 +21,10 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchR
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager;
import lombok.extern.slf4j.Slf4j;
@ -41,7 +41,7 @@ public class LogicITaskInstanceDispatchOperationFunction
private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder;
@Autowired
private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue;
private MasterTaskExecutorThreadPoolManager masterTaskExecutorThreadPool;
@Override
public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest taskDispatchRequest) {
@ -62,16 +62,12 @@ public class LogicITaskInstanceDispatchOperationFunction
MasterTaskExecutor masterTaskExecutor = masterTaskExecutorFactoryBuilder
.createMasterTaskExecutorFactory(taskExecutionContext.getTaskType())
.createMasterTaskExecutor(taskExecutionContext);
if (globalMasterTaskExecuteRunnableQueue
.submitMasterTaskExecuteRunnable(masterTaskExecutor)) {
log.info("Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue success", taskInstanceName);
if (masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor)) {
log.info("Submit LogicTask: {} to MasterTaskExecutorThreadPool success", taskInstanceName);
return LogicTaskDispatchResponse.success(taskInstanceId);
} else {
log.error(
"Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue failed, current task waiting queue size: {} is full",
taskInstanceName, globalMasterTaskExecuteRunnableQueue.size());
return LogicTaskDispatchResponse.failed(taskInstanceId,
"MasterDelayTaskExecuteRunnableDelayQueue is full");
log.error("Submit LogicTask: {} to MasterTaskExecutorThreadPool failed", taskInstanceName);
return LogicTaskDispatchResponse.failed(taskInstanceId, "MasterTaskExecutorThreadPool is full");
}
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java

@ -21,10 +21,10 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillReque
import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException;
import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager;
import lombok.extern.slf4j.Slf4j;
@ -38,7 +38,7 @@ public class LogicITaskInstanceKillOperationFunction
ITaskInstanceOperationFunction<LogicTaskKillRequest, LogicTaskKillResponse> {
@Autowired
private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue;
private MasterTaskExecutorThreadPoolManager masterTaskExecutorThreadPool;
@Override
public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) {
@ -54,8 +54,8 @@ public class LogicITaskInstanceKillOperationFunction
}
try {
masterTaskExecutor.cancelTask();
globalMasterTaskExecuteRunnableQueue
.removeMasterTaskExecuteRunnable(masterTaskExecutor);
// todo: if we remove success then we don't need to cancel?
masterTaskExecutorThreadPool.removeMasterTaskExecutor(masterTaskExecutor);
return LogicTaskKillResponse.success();
} catch (MasterTaskExecuteException e) {
log.error("Cancel MasterTaskExecuteRunnable failed ", e);

52
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.springframework.stereotype.Component;
/**
*
*/
@Component
public class GlobalMasterTaskExecuteRunnableQueue {
private final BlockingQueue<MasterTaskExecutor> masterTaskExecutorBlockingQueue =
new LinkedBlockingQueue<>();
public boolean submitMasterTaskExecuteRunnable(MasterTaskExecutor masterTaskExecutor) {
return masterTaskExecutorBlockingQueue.offer(masterTaskExecutor);
}
public MasterTaskExecutor takeMasterTaskExecuteRunnable() throws InterruptedException {
return masterTaskExecutorBlockingQueue.take();
}
public boolean removeMasterTaskExecuteRunnable(MasterTaskExecutor masterTaskExecutor) {
return masterTaskExecutorBlockingQueue.remove(masterTaskExecutor);
}
public int size() {
return masterTaskExecutorBlockingQueue.size();
}
}

84
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java

@ -1,84 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder;
import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPool;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class GlobalMasterTaskExecuteRunnableQueueLooper extends BaseDaemonThread implements AutoCloseable {
@Autowired
private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue;
@Autowired
private MasterTaskExecutorThreadPool masterTaskExecutorThreadPool;
private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
public GlobalMasterTaskExecuteRunnableQueueLooper() {
super("MasterDelayTaskExecuteRunnableDelayQueueLooper");
}
@Override
public synchronized void start() {
if (!RUNNING_FLAG.compareAndSet(false, true)) {
log.error("The MasterDelayTaskExecuteRunnableDelayQueueLooper already started, will not start again");
return;
}
log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper starting...");
super.start();
masterTaskExecutorThreadPool.start();
log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper started...");
}
@Override
public void run() {
while (RUNNING_FLAG.get()) {
try {
final MasterTaskExecutor masterTaskExecutor =
globalMasterTaskExecuteRunnableQueue.takeMasterTaskExecuteRunnable();
masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor);
MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has been interrupted, will stop loop");
break;
}
}
log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper stop loop...");
}
@Override
public void close() throws Exception {
if (RUNNING_FLAG.compareAndSet(true, false)) {
log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper stopping...");
log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper stopped...");
}
}
}

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java

@ -23,17 +23,21 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* The class is used to store {@link TaskExecuteRunnable} which needs to be dispatched. The {@link TaskExecuteRunnable} will be stored in a {@link DelayQueue},
* if the {@link TaskExecuteRunnable}'s delay time is 0, then it will be consumed by {@link GlobalTaskDispatchWaitingQueueLooper}.
*/
@Slf4j
@Component
public class GlobalTaskDispatchWaitingQueue {
private final DelayQueue<DefaultTaskExecuteRunnable> queue = new DelayQueue<>();
public void submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable priorityTaskExecuteRunnable) {
public void submitTaskExecuteRunnable(DefaultTaskExecuteRunnable priorityTaskExecuteRunnable) {
queue.put(priorityTaskExecuteRunnable);
}
public DefaultTaskExecuteRunnable takeNeedToDispatchTaskExecuteRunnable() throws InterruptedException {
public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() throws InterruptedException {
return queue.take();
}

16
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java

@ -42,7 +42,7 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple
private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
private final AtomicInteger DISPATCHED_TIMES = new AtomicInteger();
private final AtomicInteger DISPATCHED_CONSECUTIVE_FAILURE_TIMES = new AtomicInteger();
private static final Integer MAX_DISPATCHED_FAILED_TIMES = 100;
@ -66,24 +66,24 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple
DefaultTaskExecuteRunnable defaultTaskExecuteRunnable;
while (RUNNING_FLAG.get()) {
try {
defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeNeedToDispatchTaskExecuteRunnable();
defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable();
} catch (InterruptedException e) {
log.warn("Get waiting dispatch task failed, the current thread has been interrupted, will stop loop");
Thread.currentThread().interrupt();
break;
}
try {
final TaskDispatcher taskDispatcher = taskDispatchFactory
.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance().getTaskType());
TaskDispatcher taskDispatcher =
taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance());
taskDispatcher.dispatchTask(defaultTaskExecuteRunnable);
DISPATCHED_TIMES.set(0);
DISPATCHED_CONSECUTIVE_FAILURE_TIMES.set(0);
} catch (Exception e) {
defaultTaskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes();
globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(defaultTaskExecuteRunnable);
if (DISPATCHED_TIMES.incrementAndGet() > MAX_DISPATCHED_FAILED_TIMES) {
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(defaultTaskExecuteRunnable);
if (DISPATCHED_CONSECUTIVE_FAILURE_TIMES.incrementAndGet() > MAX_DISPATCHED_FAILED_TIMES) {
ThreadUtils.sleep(10 * 1000L);
}
log.error("Dispatch task failed", e);
log.error("Dispatch Task: {} failed", defaultTaskExecuteRunnable.getTaskInstance().getName(), e);
}
}
log.info("GlobalTaskDispatchWaitingQueueLooper started...");

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java

@ -31,16 +31,12 @@ public class MasterTaskExecutorBootstrap implements AutoCloseable {
@Autowired
private GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper;
@Autowired
private GlobalMasterTaskExecuteRunnableQueueLooper globalMasterTaskExecuteRunnableQueueLooper;
@Autowired
private AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper;
public synchronized void start() {
log.info("MasterTaskExecutorBootstrap starting...");
globalTaskDispatchWaitingQueueLooper.start();
globalMasterTaskExecuteRunnableQueueLooper.start();
asyncMasterTaskDelayQueueLooper.start();
log.info("MasterTaskExecutorBootstrap started...");
}
@ -51,8 +47,6 @@ public class MasterTaskExecutorBootstrap implements AutoCloseable {
try (
final GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper1 =
globalTaskDispatchWaitingQueueLooper;
final GlobalMasterTaskExecuteRunnableQueueLooper globalMasterTaskExecuteRunnableQueueLooper1 =
globalMasterTaskExecuteRunnableQueueLooper;
final AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper1 =
asyncMasterTaskDelayQueueLooper) {
// closed the resource

5
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.runner.dispatcher;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.utils.TaskUtils;
import lombok.extern.slf4j.Slf4j;
@ -38,4 +39,8 @@ public class TaskDispatchFactory {
return TaskUtils.isMasterTask(taskType) ? masterTaskDispatcher : workerTaskDispatcher;
}
public TaskDispatcher getTaskDispatcher(TaskInstance taskInstance) {
return getTaskDispatcher(taskInstance.getTaskType());
}
}

15
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java

@ -18,12 +18,9 @@
package org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
@ -39,12 +36,9 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements
private AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue;
@Autowired
private MasterConfig masterConfig;
private MasterAsyncTaskExecutorThreadPool masterAsyncTaskExecutorThreadPool;
private static final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false);
private ExecutorService asyncTaskStateCheckThreadPool;
public AsyncMasterTaskDelayQueueLooper() {
super("AsyncMasterTaskDelayQueueLooper");
}
@ -63,8 +57,6 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements
@Override
public void run() {
asyncTaskStateCheckThreadPool = ThreadUtils.newDaemonFixedThreadExecutor("AsyncTaskStateCheckThreadPool",
masterConfig.getMasterAsyncTaskStateCheckThreadPoolSize());
while (RUNNING_FLAG.get()) {
AsyncTaskExecutionContext asyncTaskExecutionContext;
try {
@ -86,7 +78,7 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements
"Cannot find the taskInstance from TaskExecutionContextCacheManager, the task may already been killed, will stop the async master task");
continue;
}
asyncTaskStateCheckThreadPool.submit(() -> {
masterAsyncTaskExecutorThreadPool.getThreadPool().submit(() -> {
final AsyncTaskExecuteFunction asyncTaskExecuteFunction =
asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
final AsyncTaskCallbackFunction asyncTaskCallbackFunction =
@ -131,8 +123,5 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements
log.warn("The AsyncMasterTaskDelayQueueLooper is not started, will not close");
return;
}
log.info("AsyncMasterTaskDelayQueueLooper closing...");
asyncTaskStateCheckThreadPool.shutdown();
log.info("AsyncMasterTaskDelayQueueLooper closed...");
}
}

25
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.execute;
public interface IMasterTaskExecutorThreadPool<T extends MasterTaskExecutor> {
boolean submitMasterTaskExecutor(T masterTaskExecutor);
boolean removeMasterTaskExecutor(T masterTaskExecutor);
}

58
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MasterAsyncTaskExecutorThreadPool implements IMasterTaskExecutorThreadPool<AsyncMasterTaskExecutor> {
private final ThreadPoolExecutor threadPoolExecutor;
public MasterAsyncTaskExecutorThreadPool(MasterConfig masterConfig) {
this.threadPoolExecutor = ThreadUtils.newDaemonFixedThreadExecutor("MasterAsyncTaskExecutorThreadPool",
masterConfig.getMasterSyncTaskExecutorThreadPoolSize());
}
@Override
public boolean submitMasterTaskExecutor(AsyncMasterTaskExecutor asyncMasterTaskExecutor) {
synchronized (MasterAsyncTaskExecutorThreadPool.class) {
// todo: check if the thread pool is overload
threadPoolExecutor.submit(asyncMasterTaskExecutor);
return true;
}
}
@Override
public boolean removeMasterTaskExecutor(AsyncMasterTaskExecutor asyncMasterTaskExecutor) {
return threadPoolExecutor.remove(asyncMasterTaskExecutor);
}
// todo: remove this method, it's not a good idea to expose the ThreadPoolExecutor to out side.
ThreadPoolExecutor getThreadPool() {
return threadPoolExecutor;
}
}

34
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java → dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java

@ -20,32 +20,34 @@ package org.apache.dolphinscheduler.server.master.runner.execute;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@Slf4j
@Component
public class MasterTaskExecutorThreadPool {
public class MasterSyncTaskExecutorThreadPool implements IMasterTaskExecutorThreadPool<SyncMasterTaskExecutor> {
@Autowired
private MasterConfig masterConfig;
private final ThreadPoolExecutor threadPoolExecutor;
private ListeningExecutorService listeningExecutorService;
public synchronized void start() {
log.info("MasterTaskExecuteRunnableThreadPool starting...");
this.listeningExecutorService = MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor(
"MasterTaskExecuteRunnableThread", masterConfig.getMasterTaskExecuteThreadPoolSize()));
log.info("MasterTaskExecuteRunnableThreadPool started...");
public MasterSyncTaskExecutorThreadPool(MasterConfig masterConfig) {
this.threadPoolExecutor = ThreadUtils.newDaemonFixedThreadExecutor("MasterSyncTaskExecutorThreadPool",
masterConfig.getMasterSyncTaskExecutorThreadPoolSize());
}
public void submitMasterTaskExecutor(MasterTaskExecutor masterTaskExecutor) {
listeningExecutorService.submit(masterTaskExecutor);
@Override
public boolean submitMasterTaskExecutor(SyncMasterTaskExecutor syncMasterTaskExecutor) {
synchronized (MasterSyncTaskExecutorThreadPool.class) {
// todo: check if the thread pool is overload
threadPoolExecutor.submit(syncMasterTaskExecutor);
return true;
}
}
@Override
public boolean removeMasterTaskExecutor(SyncMasterTaskExecutor syncMasterTaskExecutor) {
return threadPoolExecutor.remove(syncMasterTaskExecutor);
}
}

59
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner.execute;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MasterTaskExecutorThreadPoolManager {
@Autowired
private MasterSyncTaskExecutorThreadPool masterSyncTaskExecutorThreadPool;
@Autowired
private MasterAsyncTaskExecutorThreadPool masterAsyncTaskExecutorThreadPool;
public boolean submitMasterTaskExecutor(MasterTaskExecutor masterTaskExecutor) {
if (masterTaskExecutor instanceof SyncMasterTaskExecutor) {
return masterSyncTaskExecutorThreadPool
.submitMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor);
}
if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) {
return masterAsyncTaskExecutorThreadPool
.submitMasterTaskExecutor((AsyncMasterTaskExecutor) masterTaskExecutor);
}
throw new IllegalArgumentException("Unknown type of MasterTaskExecutor: " + masterTaskExecutor);
}
public boolean removeMasterTaskExecutor(MasterTaskExecutor masterTaskExecutor) {
if (masterTaskExecutor instanceof SyncMasterTaskExecutor) {
return masterSyncTaskExecutorThreadPool
.removeMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor);
}
if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) {
return masterAsyncTaskExecutorThreadPool
.removeMasterTaskExecutor((AsyncMasterTaskExecutor) masterTaskExecutor);
}
throw new IllegalArgumentException("Unknown type of MasterTaskExecutor: " + masterTaskExecutor);
}
}

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java

@ -52,6 +52,6 @@ public abstract class BaseTaskExecuteRunnableDispatchOperator implements TaskExe
taskInstance.getName(),
taskInstance.getDelayTime(), remainTime);
}
globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable);
globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(taskExecuteRunnable);
}
}

16
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -105,13 +105,7 @@ public abstract class AbstractCommandExecutor {
TaskCallBack taskCallBack) throws Exception {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
log.warn(
"Cannot find the taskInstance: {} from TaskExecutionContextCacheManager, the task might already been killed",
taskInstanceId);
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
// todo: we need to use state like JDK Thread to make sure the killed task should not be executed
iShellInterceptorBuilder = iShellInterceptorBuilder
.shellDirectory(taskRequest.getExecutePath())
.shellName(taskRequest.getTaskAppId());
@ -155,13 +149,7 @@ public abstract class AbstractCommandExecutor {
// cache processId
taskRequest.setProcessId(processId);
boolean updateTaskExecutionContextStatus =
TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest);
if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) {
result.setExitStatusCode(EXIT_CODE_KILL);
cancelApplication();
return result;
}
// print process id
log.info("process start, process id is: {}", processId);

72
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java

@ -1,72 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TaskExecutionContextCacheManager {
private TaskExecutionContextCacheManager() {
throw new IllegalStateException("Utility class");
}
/**
* taskInstance cache
*/
private static final Map<Integer, TaskExecutionContext> taskRequestContextCache = new ConcurrentHashMap<>();
/**
* get taskInstance by taskInstance id
*
* @param taskInstanceId taskInstanceId
* @return taskInstance
*/
public static TaskExecutionContext getByTaskInstanceId(Integer taskInstanceId) {
return taskRequestContextCache.get(taskInstanceId);
}
/**
* cache taskInstance
*
* @param request request
*/
public static void cacheTaskExecutionContext(TaskExecutionContext request) {
taskRequestContextCache.put(request.getTaskInstanceId(), request);
}
/**
* remove taskInstance by taskInstanceId
*
* @param taskInstanceId taskInstanceId
*/
public static void removeByTaskInstanceId(Integer taskInstanceId) {
taskRequestContextCache.remove(taskInstanceId);
}
public static boolean updateTaskExecutionContext(TaskExecutionContext request) {
taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(), (k, v) -> request);
return taskRequestContextCache.containsKey(request.getTaskInstanceId());
}
public static Collection<TaskExecutionContext> getAllTaskRequestList() {
return taskRequestContextCache.values();
}
}

12
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java

@ -21,7 +21,6 @@ import static java.util.Collections.singletonList;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.API_VERSION;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JOB_TTL_SECONDS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL;
@ -39,7 +38,6 @@ import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
@ -284,12 +282,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
try {
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
if (StringUtils.isEmpty(k8sParameterStr)) {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
return result;
}
K8sTaskExecutionContext k8sTaskExecutionContext = taskRequest.getK8sTaskExecutionContext();
@ -371,10 +364,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
public void setTaskStatus(int jobStatus, String taskInstanceId, TaskResponse taskResponse) {
if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) {
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId))) {
log.info("[K8sJobExecutor-{}] killed", job.getMetadata().getName());
taskResponse.setExitStatusCode(EXIT_CODE_KILL);
} else if (jobStatus == EXIT_CODE_SUCCESS) {
if (jobStatus == EXIT_CODE_SUCCESS) {
log.info("[K8sJobExecutor-{}] succeed in k8s", job.getMetadata().getName());
taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS);
} else {

4
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.api.k8s;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
@ -90,7 +88,7 @@ public class K8sTaskExecutorTest {
TaskResponse taskResponse = new TaskResponse();
k8sTaskExecutor.setJob(job);
k8sTaskExecutor.setTaskStatus(jobStatus, String.valueOf(taskInstanceId), taskResponse);
Assertions.assertEquals(0, Integer.compare(EXIT_CODE_KILL, taskResponse.getExitStatusCode()));
Assertions.assertEquals(0, taskResponse.getExitStatusCode());
}
@Test
public void testWaitTimeoutNormal() {

2
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.dvc;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -35,7 +34,6 @@ public class DvcTaskTest {
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
return taskExecutionContext;
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java

@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import java.io.File;
import java.io.IOException;
@ -127,7 +126,6 @@ public class KubeflowTaskTest {
}
Mockito.when(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml())
.thenReturn(kubeflowParameters.getClusterYAML());
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
return taskExecutionContext;
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java

@ -20,7 +20,6 @@ package org.apache.dolphinler.plugin.task.mlflow;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowConstants;
import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowParameters;
import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowTask;
@ -58,7 +57,6 @@ public class MlflowTaskTest {
String parameters = JSONUtils.toJsonString(mlflowParameters);
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
return taskExecutionContext;
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java

@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.commons.lang3.SystemUtils;
@ -198,7 +197,6 @@ public class PytorchTaskTest {
String parameters = JSONUtils.toJsonString(pytorchParameters);
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters);
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
return taskExecutionContext;
}

38
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -22,15 +22,14 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueueLooper;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import org.apache.commons.collections4.CollectionUtils;
@ -52,9 +51,6 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@Slf4j
public class WorkerServer implements IStoppable {
@Autowired
private WorkerManagerThread workerManagerThread;
@Autowired
private WorkerRegistryClient workerRegistryClient;
@ -67,9 +63,6 @@ public class WorkerServer implements IStoppable {
@Autowired
private MessageRetryRunner messageRetryRunner;
@Autowired
private GlobalTaskInstanceWaitingQueueLooper globalTaskInstanceWaitingQueueLooper;
/**
* worker server startup, not use web service
*
@ -88,10 +81,7 @@ public class WorkerServer implements IStoppable {
this.workerRegistryClient.setRegistryStoppable(this);
this.workerRegistryClient.start();
this.workerManagerThread.start();
this.messageRetryRunner.start();
this.globalTaskInstanceWaitingQueueLooper.start();
/*
* registry hooks, which are called before the process exits
@ -114,7 +104,13 @@ public class WorkerServer implements IStoppable {
WorkerRpcServer closedWorkerRpcServer = workerRpcServer;
WorkerRegistryClient closedRegistryClient = workerRegistryClient) {
log.info("Worker server is stopping, current cause : {}", cause);
// kill running tasks
// todo: we need to remove this method
// since for some task, we need to take-over the remote task after the worker restart
// and if the worker crash, the `killAllRunningTasks` will not be execute, this will cause there exist two
// kind of situation:
// 1. If the worker is stop by kill, the tasks will be kill.
// 2. If the worker is stop by kill -9, the tasks will not be kill.
// So we don't need to kill the tasks.
this.killAllRunningTasks();
} catch (Exception e) {
log.error("Worker server stop failed, current cause: {}", cause, e);
@ -129,25 +125,25 @@ public class WorkerServer implements IStoppable {
}
public void killAllRunningTasks() {
Collection<TaskExecutionContext> taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList();
if (CollectionUtils.isEmpty(taskRequests)) {
Collection<WorkerTaskExecutor> workerTaskExecutors = WorkerTaskExecutorHolder.getAllTaskExecutor();
if (CollectionUtils.isEmpty(workerTaskExecutors)) {
return;
}
log.info("Worker begin to kill all cache task, task size: {}", taskRequests.size());
log.info("Worker begin to kill all cache task, task size: {}", workerTaskExecutors.size());
int killNumber = 0;
for (TaskExecutionContext taskRequest : taskRequests) {
for (WorkerTaskExecutor workerTaskExecutor : workerTaskExecutors) {
// kill task when it's not finished yet
try {
LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(),
taskRequest.getTaskInstanceId());
if (ProcessUtils.kill(taskRequest)) {
TaskExecutionContext taskExecutionContext = workerTaskExecutor.getTaskExecutionContext();
LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId());
if (ProcessUtils.kill(taskExecutionContext)) {
killNumber++;
}
} finally {
LogUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
log.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(),
log.info("Worker after kill all cache task, task size: {}, killed number: {}", workerTaskExecutors.size(),
killNumber);
}
}

13
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import java.time.Duration;
@ -92,13 +93,15 @@ public class MessageRetryRunner extends BaseDaemonThread {
needToRetryMessages.remove(taskInstanceId);
}
public void updateMessageHost(int taskInstanceId, String messageReceiverHost) {
public boolean updateMessageHost(int taskInstanceId, String messageReceiverHost) {
List<TaskInstanceMessage> taskInstanceMessages = this.needToRetryMessages.get(taskInstanceId);
if (taskInstanceMessages != null) {
taskInstanceMessages.forEach(taskInstanceMessage -> {
taskInstanceMessage.getEvent().setWorkflowInstanceHost(messageReceiverHost);
});
if (CollectionUtils.isEmpty(taskInstanceMessages)) {
return false;
}
taskInstanceMessages.forEach(taskInstanceMessage -> {
taskInstanceMessage.getEvent().setWorkflowInstanceHost(messageReceiverHost);
});
return true;
}
public void run() {

6
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
import org.apache.commons.collections4.CollectionUtils;
@ -55,7 +55,7 @@ public class WorkerRegistryClient implements AutoCloseable {
private WorkerConfig workerConfig;
@Autowired
private WorkerManagerThread workerManagerThread;
private WorkerTaskExecutorThreadPool workerManagerThread;
@Autowired
private RegistryClient registryClient;
@ -71,7 +71,7 @@ public class WorkerRegistryClient implements AutoCloseable {
this.workerHeartBeatTask = new WorkerHeartBeatTask(
workerConfig,
registryClient,
() -> workerManagerThread.getWaitSubmitQueueSize());
() -> workerManagerThread.getWaitingTaskExecutorSize());
}
public void start() {

11
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java

@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.registry.api.StrategyType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer;
import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import java.time.Duration;
@ -54,10 +54,7 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy {
private MessageRetryRunner messageRetryRunner;
@Autowired
private WorkerManagerThread workerManagerThread;
@Autowired
private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
private WorkerTaskExecutorThreadPool workerManagerThread;
@Override
public void disconnect() {
@ -121,7 +118,7 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy {
workerRpcServer.close();
log.warn("Worker server close the RPC server due to lost connection from registry");
workerManagerThread.clearTask();
globalTaskInstanceWaitingQueue.clearTask();
WorkerTaskExecutorHolder.clear();
log.warn("Worker server clear the tasks due to lost connection from registry");
messageRetryRunner.clearMessage();
log.warn("Worker server clear the retry message due to lost connection from registry");

19
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java

@ -21,12 +21,11 @@ import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import lombok.extern.slf4j.Slf4j;
@ -38,7 +37,7 @@ import org.springframework.stereotype.Component;
public class StreamingTaskInstanceOperatorImpl implements IStreamingTaskInstanceOperator {
@Autowired
private WorkerManagerThread workerManager;
private WorkerTaskExecutorThreadPool workerManager;
@Override
public TaskInstanceTriggerSavepointResponse triggerSavepoint(TaskInstanceTriggerSavepointRequest taskInstanceTriggerSavepointRequest) {
@ -47,16 +46,10 @@ public class StreamingTaskInstanceOperatorImpl implements IStreamingTaskInstance
try {
int taskInstanceId = taskInstanceTriggerSavepointRequest.getTaskInstanceId();
LogUtils.setTaskInstanceIdMDC(taskInstanceId);
TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
log.error("Cannot find TaskExecutionContext for taskInstance: {}", taskInstanceId);
return TaskInstanceTriggerSavepointResponse.fail("Cannot find TaskExecutionContext");
}
WorkerTaskExecutor workerTaskExecutor = workerManager.getTaskExecuteThread(taskInstanceId);
WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId);
if (workerTaskExecutor == null) {
log.error("Cannot find WorkerTaskExecuteRunnable for taskInstance: {}", taskInstanceId);
return TaskInstanceTriggerSavepointResponse.fail("Cannot find WorkerTaskExecuteRunnable");
log.error("Cannot find WorkerTaskExecutor for taskInstance: {}", taskInstanceId);
return TaskInstanceTriggerSavepointResponse.fail("Cannot find TaskExecutionContext");
}
AbstractTask task = workerTaskExecutor.getTask();
if (task == null) {

14
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java

@ -30,8 +30,8 @@ import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFil
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest;
import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import java.util.List;
@ -64,9 +64,13 @@ public class WorkerLogServiceImpl implements ILogService {
@Override
public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) {
TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(getAppIdRequest.getTaskInstanceId());
String appInfoPath = taskExecutionContext.getAppInfoPath();
String appInfoPath = null;
WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(getAppIdRequest.getTaskInstanceId());
if (workerTaskExecutor != null) {
// todo: remove this kind of logic, and remove get appId method, the appId should be send by worker rather
// than query by master
appInfoPath = workerTaskExecutor.getTaskExecutionContext().getAppInfoPath();
}
String logPath = getAppIdRequest.getLogPath();
List<String> appIds = org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils.getAppIds(logPath, appInfoPath,
PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY));

70
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java

@ -1,70 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class GlobalTaskInstanceWaitingQueue {
private final WorkerConfig workerConfig;
private final BlockingQueue<TaskExecutionContext> blockingQueue;
public GlobalTaskInstanceWaitingQueue(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
this.blockingQueue = new ArrayBlockingQueue<>(workerConfig.getExecThreads());
}
public boolean addDispatchTask(TaskExecutionContext taskExecutionContext) {
if (workerConfig.getTaskExecuteThreadsFullPolicy() == TaskExecuteThreadsFullPolicy.CONTINUE) {
return blockingQueue.offer(taskExecutionContext);
}
if (blockingQueue.size() > getQueueSize()) {
log.warn("Wait submit queue is full, will retry submit task later");
WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
return false;
}
return blockingQueue.offer(taskExecutionContext);
}
public TaskExecutionContext take() throws InterruptedException {
return blockingQueue.take();
}
public void clearTask() {
blockingQueue.clear();
}
public int getQueueSize() {
return workerConfig.getExecThreads();
}
}

101
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java

@ -1,101 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class GlobalTaskInstanceWaitingQueueLooper extends BaseDaemonThread {
@Autowired
private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
@Autowired
private WorkerConfig workerConfig;
@Autowired
private WorkerMessageSender workerMessageSender;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private WorkerManagerThread workerManager;
@Autowired(required = false)
private StorageOperate storageOperate;
@Autowired
private WorkerRegistryClient workerRegistryClient;
protected GlobalTaskInstanceWaitingQueueLooper() {
super("GlobalTaskDispatchQueueLooper");
}
public synchronized void start() {
log.info("GlobalTaskDispatchQueueLooper starting");
super.start();
log.info("GlobalTaskDispatchQueueLooper started");
}
public void run() {
while (true) {
try {
TaskExecutionContext taskExecutionContext = globalTaskInstanceWaitingQueue.take();
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId());
WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorFactoryBuilder
.createWorkerTaskExecutorFactory(
taskExecutionContext,
workerConfig,
workerMessageSender,
taskPluginManager,
storageOperate,
workerRegistryClient)
.createWorkerTaskExecutor();
if (workerManager.offer(workerTaskExecutor)) {
log.info("Success submit WorkerDelayTaskExecuteRunnable to WorkerManagerThread's waiting queue");
}
} catch (InterruptedException e) {
log.error("GlobalTaskDispatchQueueLooper interrupted");
Thread.currentThread().interrupt();
break;
} catch (Exception ex) {
log.error("GlobalTaskDispatchQueueLooper error", ex);
} finally {
LogUtils.removeTaskInstanceIdMDC();
LogUtils.removeTaskInstanceLogFullPathMDC();
}
}
}
}

18
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java

@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
@ -42,15 +41,7 @@ public class TaskCallbackImpl implements TaskCallBack {
@Override
public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) {
TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
log.error("task execution context is empty, taskInstanceId: {}, applicationInfo:{}", taskInstanceId,
applicationInfo);
return;
}
log.info("send remote application info {}", applicationInfo);
// todo: use listener
taskExecutionContext.setAppIds(applicationInfo.getAppIds());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING_INFO);
@ -58,13 +49,6 @@ public class TaskCallbackImpl implements TaskCallBack {
@Override
public void updateTaskInstanceInfo(int taskInstanceId) {
TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
log.error("task execution context is empty, taskInstanceId: {}", taskInstanceId);
return;
}
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING_INFO);
}

91
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java

@ -1,91 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@Slf4j
public class WorkerExecService {
private final ListeningExecutorService listeningExecutorService;
private final ExecutorService execService;
private final ConcurrentHashMap<Integer, WorkerTaskExecutor> taskExecuteThreadMap;
public WorkerExecService(ExecutorService execService,
ConcurrentHashMap<Integer, WorkerTaskExecutor> taskExecuteThreadMap) {
this.execService = execService;
this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
this.taskExecuteThreadMap = taskExecuteThreadMap;
WorkerServerMetrics.registerWorkerTaskTotalGauge(taskExecuteThreadMap::size);
}
public void submit(final WorkerTaskExecutor taskExecuteThread) {
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@Override
public void onSuccess(Object o) {
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
@Override
public void onFailure(Throwable throwable) {
log.error("task execute failed, processInstanceId:{}, taskInstanceId:{}",
taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
throwable);
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
};
Futures.addCallback(future, futureCallback, this.listeningExecutorService);
}
/**
* get thread pool queue size
*
* @return queue size
*/
public int getThreadPoolQueueSize() {
return ((ThreadPoolExecutor) this.execService).getQueue().size();
}
public int getActiveExecThreadCount() {
return ((ThreadPoolExecutor) this.execService).getActiveCount();
}
public Map<Integer, WorkerTaskExecutor> getTaskExecuteThreadMap() {
return taskExecuteThreadMap;
}
}

150
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java

@ -1,150 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* Manage tasks
*/
@Component
@Slf4j
public class WorkerManagerThread implements Runnable {
private final BlockingQueue<WorkerTaskExecutor> waitSubmitQueue;
private final WorkerExecService workerExecService;
private final int workerExecThreads;
private final ConcurrentHashMap<Integer, WorkerTaskExecutor> taskExecuteThreadMap = new ConcurrentHashMap<>();
public WorkerManagerThread(WorkerConfig workerConfig) {
workerExecThreads = workerConfig.getExecThreads();
this.waitSubmitQueue = new LinkedBlockingQueue<>();
workerExecService = new WorkerExecService(
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()),
taskExecuteThreadMap);
}
public @Nullable WorkerTaskExecutor getTaskExecuteThread(Integer taskInstanceId) {
return taskExecuteThreadMap.get(taskInstanceId);
}
/**
* get wait submit queue size
*
* @return queue size
*/
public int getWaitSubmitQueueSize() {
return waitSubmitQueue.size();
}
/**
* get thread pool queue size
*
* @return queue size
*/
public int getThreadPoolQueueSize() {
return workerExecService.getThreadPoolQueueSize();
}
/**
* Kill tasks that have not been executed, like delay task
* then send Response to Master, update the execution status of task instance
*/
public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
waitSubmitQueue.stream()
.filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext()
.getTaskInstanceId() == taskInstanceId)
.forEach(waitSubmitQueue::remove);
}
public boolean offer(WorkerTaskExecutor workerDelayTaskExecuteRunnable) {
return waitSubmitQueue.add(workerDelayTaskExecuteRunnable);
}
public void start() {
log.info("Worker manager thread starting");
Thread thread = new Thread(this, this.getClass().getName());
thread.setDaemon(true);
thread.start();
log.info("Worker manager thread started");
}
@Override
public void run() {
WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage);
WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize);
WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage);
WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(workerExecService::getThreadPoolQueueSize);
WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(workerExecService::getActiveExecThreadCount);
Thread.currentThread().setName("Worker-Execute-Manager-Thread");
while (!ServerLifeCycleManager.isStopped()) {
try {
if (!ServerLifeCycleManager.isRunning()) {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}
if (this.getThreadPoolQueueSize() <= workerExecThreads) {
WorkerTaskExecutor workerTaskExecutor = waitSubmitQueue.take();
workerExecService.submit(workerTaskExecutor);
} else {
WorkerServerMetrics.incWorkerOverloadCount();
log.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}",
this.getWaitSubmitQueueSize(), this.getThreadPoolQueueSize());
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
}
} catch (Exception e) {
log.error("An unexpected interrupt is happened, "
+ "the exception will be ignored and this thread will continue to run", e);
}
}
}
public void clearTask() {
waitSubmitQueue.clear();
workerExecService.getTaskExecuteThreadMap().values().forEach(workerTaskExecuteRunnable -> {
int taskInstanceId = workerTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId();
try {
workerTaskExecuteRunnable.cancelTask();
log.info("Cancel the taskInstance in worker {}", taskInstanceId);
} catch (Exception ex) {
log.error("Cancel the taskInstance error {}", taskInstanceId, ex);
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
}
});
workerExecService.getTaskExecuteThreadMap().clear();
}
}

9
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java

@ -39,7 +39,6 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
@ -108,7 +107,7 @@ public abstract class WorkerTaskExecutor implements Runnable {
sendTaskResult();
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
log.info("Remove the current task execute context from worker cache");
clearTaskExecPathIfNeeded();
@ -118,7 +117,7 @@ public abstract class WorkerTaskExecutor implements Runnable {
if (cancelTask()) {
log.info("Cancel the task successfully");
}
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(System.currentTimeMillis());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
@ -128,7 +127,7 @@ public abstract class WorkerTaskExecutor implements Runnable {
}
public boolean cancelTask() {
protected boolean cancelTask() {
// cancel the task
if (task == null) {
return true;
@ -157,7 +156,7 @@ public abstract class WorkerTaskExecutor implements Runnable {
if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(System.currentTimeMillis());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH);
log.info(

33
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java

@ -24,20 +24,31 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import javax.annotation.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import lombok.NonNull;
import lombok.experimental.UtilityClass;
@UtilityClass
@Component
public class WorkerTaskExecutorFactoryBuilder {
public static WorkerTaskExecutorFactory<? extends WorkerTaskExecutor> createWorkerTaskExecutorFactory(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate,
@NonNull WorkerRegistryClient workerRegistryClient) {
@Autowired
private WorkerConfig workerConfig;
@Autowired
private WorkerMessageSender workerMessageSender;
@Autowired
private TaskPluginManager taskPluginManager;
@Autowired
private WorkerTaskExecutorThreadPool workerManager;
@Autowired(required = false)
private StorageOperate storageOperate;
@Autowired
private WorkerRegistryClient workerRegistryClient;
public WorkerTaskExecutorFactory<? extends WorkerTaskExecutor> createWorkerTaskExecutorFactory(TaskExecutionContext taskExecutionContext) {
return new DefaultWorkerTaskExecutorFactory(taskExecutionContext,
workerConfig,
workerMessageSender,

54
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* Used to store all running and waiting {@link WorkerTaskExecutor}. If the task has been finished, it will be removed from the map.
*/
public class WorkerTaskExecutorHolder {
private static final Map<Integer, WorkerTaskExecutor> workerTaskExecutorMap = new HashMap<>();
public static void put(WorkerTaskExecutor workerTaskExecutor) {
int taskInstanceId = workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId();
if (workerTaskExecutorMap.containsKey(taskInstanceId)) {
throw new IllegalArgumentException("TaskInstance: " + taskInstanceId + " already exists");
}
workerTaskExecutorMap.put(taskInstanceId, workerTaskExecutor);
}
public static WorkerTaskExecutor get(int taskInstanceId) {
return workerTaskExecutorMap.get(taskInstanceId);
}
public static WorkerTaskExecutor remove(int taskInstanceId) {
return workerTaskExecutorMap.remove(taskInstanceId);
}
public static void clear() {
workerTaskExecutorMap.clear();
}
public static Collection<WorkerTaskExecutor> getAllTaskExecutor() {
return workerTaskExecutorMap.values();
}
}

96
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import java.util.concurrent.ThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class WorkerTaskExecutorThreadPool {
private final ThreadPoolExecutor threadPoolExecutor;
private final WorkerConfig workerConfig;
public WorkerTaskExecutorThreadPool(WorkerConfig workerConfig) {
this.threadPoolExecutor =
ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool", workerConfig.getExecThreads());
this.workerConfig = workerConfig;
WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage);
WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize);
WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage);
WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(
() -> threadPoolExecutor.getQueue().size() - threadPoolExecutor.getActiveCount());
WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(threadPoolExecutor::getActiveCount);
}
public boolean submitWorkerTaskExecutor(WorkerTaskExecutor workerTaskExecutor) {
synchronized (WorkerTaskExecutorThreadPool.class) {
if (TaskExecuteThreadsFullPolicy.CONTINUE.equals(workerConfig.getTaskExecuteThreadsFullPolicy())) {
WorkerTaskExecutorHolder.put(workerTaskExecutor);
threadPoolExecutor.submit(workerTaskExecutor);
return true;
}
if (isOverload()) {
log.warn("WorkerTaskExecutorThreadPool is overload, cannot submit new WorkerTaskExecutor");
WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
return false;
}
WorkerTaskExecutorHolder.put(workerTaskExecutor);
threadPoolExecutor.submit(workerTaskExecutor);
return true;
}
}
public boolean isOverload() {
return threadPoolExecutor.getQueue().size() > 0;
}
public int getWaitingTaskExecutorSize() {
return threadPoolExecutor.getQueue().size();
}
public int getRunningTaskExecutorSize() {
return threadPoolExecutor.getActiveCount();
}
/**
* Kill tasks that have not been executed, e.g. waiting in the queue
*/
public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
synchronized (WorkerTaskExecutorThreadPool.class) {
WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId);
threadPoolExecutor.remove(workerTaskExecutor);
}
}
public void clearTask() {
threadPoolExecutor.getQueue().clear();
}
}

20
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java

@ -20,11 +20,12 @@ package org.apache.dolphinscheduler.server.worker.runner.operator;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorFactoryBuilder;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import lombok.extern.slf4j.Slf4j;
@ -41,15 +42,16 @@ public class TaskInstanceDispatchOperationFunction
private WorkerConfig workerConfig;
@Autowired
private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue;
private WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder;
@Autowired
private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool;
@Override
public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInstanceDispatchRequest) {
log.info("Receive TaskInstanceDispatchRequest: {}", taskInstanceDispatchRequest);
TaskExecutionContext taskExecutionContext = taskInstanceDispatchRequest.getTaskExecutionContext();
try {
// set cache, it will be used when kill task
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
taskExecutionContext.setHost(workerConfig.getWorkerAddress());
taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext));
@ -57,9 +59,11 @@ public class TaskInstanceDispatchOperationFunction
taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
if (!globalTaskInstanceWaitingQueue.addDispatchTask(taskExecutionContext)) {
log.error("Submit task: {} to wait queue error, current queue size: {} is full",
taskExecutionContext.getTaskName(), workerConfig.getExecThreads());
WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder
.createWorkerTaskExecutorFactory(taskExecutionContext).createWorkerTaskExecutor();
// todo: hold the workerTaskExecutor
if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) {
log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName());
return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(),
"WorkerManagerThread is full");
} else {

37
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java

@ -23,13 +23,13 @@ import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRe
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import lombok.extern.slf4j.Slf4j;
@ -45,7 +45,7 @@ public class TaskInstanceKillOperationFunction
ITaskInstanceOperationFunction<TaskInstanceKillRequest, TaskInstanceKillResponse> {
@Autowired
private WorkerManagerThread workerManager;
private WorkerTaskExecutorThreadPool workerManager;
@Autowired
private MessageRetryRunner messageRetryRunner;
@ -57,22 +57,24 @@ public class TaskInstanceKillOperationFunction
int taskInstanceId = taskInstanceKillRequest.getTaskInstanceId();
try {
LogUtils.setTaskInstanceIdMDC(taskInstanceId);
TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
log.error("Cannot find TaskExecutionContext for taskInstance: {}", taskInstanceId);
return TaskInstanceKillResponse.fail("Cannot find TaskExecutionContext");
WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId);
if (workerTaskExecutor == null) {
log.error("Cannot find WorkerTaskExecutor for taskInstance: {}", taskInstanceId);
return TaskInstanceKillResponse.fail("Cannot find WorkerTaskExecutor");
}
TaskExecutionContext taskExecutionContext = workerTaskExecutor.getTaskExecutionContext();
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
boolean result = doKill(taskExecutionContext);
this.cancelApplication(taskInstanceId);
this.cancelApplication(workerTaskExecutor);
int processId = taskExecutionContext.getProcessId();
if (processId == 0) {
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
// todo: the task might be executed, but the processId is 0
WorkerTaskExecutorHolder.remove(taskInstanceId);
log.info("The task has not been executed and has been cancelled, task id:{}", taskInstanceId);
return TaskInstanceKillResponse.success(taskExecutionContext);
}
@ -80,7 +82,7 @@ public class TaskInstanceKillOperationFunction
taskExecutionContext
.setCurrentExecutionStatus(result ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
return TaskInstanceKillResponse.success(taskExecutionContext);
} finally {
@ -102,15 +104,11 @@ public class TaskInstanceKillOperationFunction
return processFlag;
}
protected void cancelApplication(int taskInstanceId) {
WorkerTaskExecutor workerTaskExecutor = workerManager.getTaskExecuteThread(taskInstanceId);
if (workerTaskExecutor == null) {
log.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId);
return;
}
protected void cancelApplication(WorkerTaskExecutor workerTaskExecutor) {
AbstractTask task = workerTaskExecutor.getTask();
if (task == null) {
log.warn("task not found, taskInstanceId:{}", taskInstanceId);
log.warn("task not found, taskInstanceId: {}",
workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId());
return;
}
try {
@ -118,7 +116,8 @@ public class TaskInstanceKillOperationFunction
} catch (Exception e) {
log.error("kill task error", e);
}
log.info("kill task by cancelApplication, task id:{}", taskInstanceId);
log.info("kill task by cancelApplication, taskInstanceId: {}",
workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId());
}
protected boolean killProcess(String tenantCode, Integer processId) {

33
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java

@ -20,9 +20,10 @@ package org.apache.dolphinscheduler.server.worker.runner.operator;
import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest;
import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder;
import lombok.extern.slf4j.Slf4j;
@ -47,21 +48,29 @@ public class UpdateWorkflowHostOperationFunction
LogUtils.setTaskInstanceIdMDC(taskInstanceId);
log.info("Received UpdateWorkflowHostRequest: {}", updateWorkflowHostRequest);
TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
log.error("Cannot find the taskExecutionContext for taskInstance : {}", taskInstanceId);
return UpdateWorkflowHostResponse.failed("Cannot find the taskExecutionContext");
boolean updateWorkerTaskExecutor = updateHostInWorkflowTaskExecutor(taskInstanceId, workflowHost);
boolean updateMessage = updateHostInMessage(taskInstanceId, workflowHost);
if (updateWorkerTaskExecutor || updateMessage) {
return UpdateWorkflowHostResponse.success();
}
LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath());
taskExecutionContext.setWorkflowInstanceHost(workflowHost);
messageRetryRunner.updateMessageHost(taskInstanceId, workflowHost);
log.info("Success update workflow host: {} for taskInstance: {}", workflowHost, taskInstanceId);
return UpdateWorkflowHostResponse.success();
return UpdateWorkflowHostResponse.failed("The taskInstance is not in the worker");
} finally {
LogUtils.removeTaskInstanceIdMDC();
LogUtils.removeTaskInstanceLogFullPathMDC();
}
}
private boolean updateHostInWorkflowTaskExecutor(int taskInstanceId, String workflowHost) {
WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId);
if (workerTaskExecutor == null) {
return false;
}
TaskExecutionContext taskExecutionContext = workerTaskExecutor.getTaskExecutionContext();
taskExecutionContext.setWorkflowInstanceHost(workflowHost);
return true;
}
private boolean updateHostInMessage(int taskInstanceId, String workflowHost) {
return messageRetryRunner.updateMessageHost(taskInstanceId, workflowHost);
}
}

4
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool;
import java.time.Duration;
import java.util.Set;
@ -67,7 +67,7 @@ public class WorkerRegistryClientTest {
private ScheduledExecutorService heartBeatExecutor;
@Mock
private WorkerManagerThread workerManagerThread;
private WorkerTaskExecutorThreadPool workerManagerThread;
@Mock
private WorkerConnectStrategy workerConnectStrategy;

194
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager;
import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.io.IOException;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class WorkerTaskExecutorThreadPoolTest {
@Test
public void testIsOverload() {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setExecThreads(1);
workerConfig.setTaskExecuteThreadsFullPolicy(TaskExecuteThreadsFullPolicy.CONTINUE);
WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool = new WorkerTaskExecutorThreadPool(workerConfig);
// submit 100 task, the thread pool size is 1
// assert the overload should be true
// assert the submitQueue should be 99
for (int i = 0; i < 100; i++) {
boolean submitResult =
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(new MockWorkerTaskExecutor(() -> {
try {
Thread.sleep(10_000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}));
Assertions.assertTrue(submitResult);
}
Assertions.assertTrue(workerTaskExecutorThreadPool.isOverload());
Assertions.assertEquals(99, workerTaskExecutorThreadPool.getWaitingTaskExecutorSize());
Assertions.assertEquals(1, workerTaskExecutorThreadPool.getRunningTaskExecutorSize());
}
static class MockWorkerTaskExecutor extends WorkerTaskExecutor {
private final Runnable runnable;
protected MockWorkerTaskExecutor(Runnable runnable) {
super(TaskExecutionContext.builder().taskInstanceId((int) System.nanoTime()).build(), new WorkerConfig(),
new WorkerMessageSender(), new TaskPluginManager(), new StorageOperate() {
@Override
public void createTenantDirIfNotExists(String tenantCode) {
}
@Override
public String getResDir(String tenantCode) {
return null;
}
@Override
public String getUdfDir(String tenantCode) {
return null;
}
@Override
public boolean mkdir(String tenantCode, String path) throws IOException {
return false;
}
@Override
public String getResourceFullName(String tenantCode, String fileName) {
return null;
}
@Override
public String getResourceFileName(String tenantCode, String fullName) {
return null;
}
@Override
public String getFileName(ResourceType resourceType, String tenantCode, String fileName) {
return null;
}
@Override
public boolean exists(String fullName) {
return false;
}
@Override
public boolean delete(String filePath, boolean recursive) {
return false;
}
@Override
public boolean delete(String filePath, List<String> childrenPathArray,
boolean recursive) {
return false;
}
@Override
public boolean copy(String srcPath, String dstPath, boolean deleteSource,
boolean overwrite) {
return false;
}
@Override
public String getDir(ResourceType resourceType, String tenantCode) {
return null;
}
@Override
public boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource,
boolean overwrite) {
return false;
}
@Override
public void download(String srcFilePath, String dstFile, boolean overwrite) {
}
@Override
public List<String> vimFile(String tenantCode, String filePath, int skipLineNums,
int limit) {
return null;
}
@Override
public void deleteTenant(String tenantCode) {
}
@Override
public ResUploadType returnStorageType() {
return null;
}
@Override
public List<StorageEntity> listFilesStatusRecursively(String path, String defaultPath,
String tenantCode, ResourceType type) {
return null;
}
@Override
public List<StorageEntity> listFilesStatus(String path, String defaultPath, String tenantCode,
ResourceType type) throws Exception {
return null;
}
@Override
public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode,
ResourceType type) throws Exception {
return null;
}
}, new WorkerRegistryClient());
this.runnable = runnable;
}
@Override
public void run() {
executeTask(new TaskCallbackImpl(null, null));
}
@Override
protected void executeTask(TaskCallBack taskCallBack) {
runnable.run();
}
}
}
Loading…
Cancel
Save