From 5523a62825eb7168de8cb2fda564d5a923b5edd4 Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 27 Dec 2023 19:51:45 +0800 Subject: [PATCH] Remove taskQueue and looper in worker (#15292) --- .../common/thread/ThreadUtils.java | 6 +- .../server/master/config/MasterConfig.java | 4 +- ...TaskInstanceDispatchOperationFunction.java | 16 +- ...gicITaskInstanceKillOperationFunction.java | 8 +- .../GlobalMasterTaskExecuteRunnableQueue.java | 52 ----- ...lMasterTaskExecuteRunnableQueueLooper.java | 84 -------- .../GlobalTaskDispatchWaitingQueue.java | 8 +- .../GlobalTaskDispatchWaitingQueueLooper.java | 16 +- .../runner/MasterTaskExecutorBootstrap.java | 6 - .../dispatcher/TaskDispatchFactory.java | 5 + .../AsyncMasterTaskDelayQueueLooper.java | 15 +- .../IMasterTaskExecutorThreadPool.java | 25 +++ .../MasterAsyncTaskExecutorThreadPool.java | 58 ++++++ ... => MasterSyncTaskExecutorThreadPool.java} | 34 +-- .../MasterTaskExecutorThreadPoolManager.java | 59 ++++++ ...seTaskExecuteRunnableDispatchOperator.java | 2 +- .../task/api/AbstractCommandExecutor.java | 16 +- .../api/TaskExecutionContextCacheManager.java | 72 ------- .../task/api/k8s/impl/K8sTaskExecutor.java | 12 +- .../task/api/k8s/K8sTaskExecutorTest.java | 4 +- .../plugin/task/dvc/DvcTaskTest.java | 2 - .../plugin/kubeflow/KubeflowTaskTest.java | 2 - .../plugin/task/mlflow/MlflowTaskTest.java | 2 - .../plugin/task/pytorch/PytorchTaskTest.java | 2 - .../server/worker/WorkerServer.java | 38 ++-- .../worker/message/MessageRetryRunner.java | 13 +- .../worker/registry/WorkerRegistryClient.java | 6 +- .../registry/WorkerWaitingStrategy.java | 11 +- .../StreamingTaskInstanceOperatorImpl.java | 19 +- .../worker/rpc/WorkerLogServiceImpl.java | 14 +- .../GlobalTaskInstanceWaitingQueue.java | 70 ------- .../GlobalTaskInstanceWaitingQueueLooper.java | 101 --------- .../worker/runner/TaskCallbackImpl.java | 18 +- .../worker/runner/WorkerExecService.java | 91 -------- .../worker/runner/WorkerManagerThread.java | 150 -------------- .../worker/runner/WorkerTaskExecutor.java | 9 +- .../WorkerTaskExecutorFactoryBuilder.java | 33 ++- .../runner/WorkerTaskExecutorHolder.java | 54 +++++ .../runner/WorkerTaskExecutorThreadPool.java | 96 +++++++++ ...TaskInstanceDispatchOperationFunction.java | 20 +- .../TaskInstanceKillOperationFunction.java | 37 ++-- .../UpdateWorkflowHostOperationFunction.java | 33 +-- .../registry/WorkerRegistryClientTest.java | 4 +- .../WorkerTaskExecutorThreadPoolTest.java | 194 ++++++++++++++++++ 44 files changed, 672 insertions(+), 849 deletions(-) delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/{MasterTaskExecutorThreadPool.java => MasterSyncTaskExecutorThreadPool.java} (53%) create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java delete mode 100644 dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java delete mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java delete mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java delete mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java delete mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java create mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java create mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java create mode 100644 dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java index 3b66552274..5eef04ed82 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/thread/ThreadUtils.java @@ -17,10 +17,10 @@ package org.apache.dolphinscheduler.common.thread; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -38,9 +38,9 @@ public class ThreadUtils { * @param threadsNum threadsNum * @return ExecutorService */ - public static ExecutorService newDaemonFixedThreadExecutor(String threadName, int threadsNum) { + public static ThreadPoolExecutor newDaemonFixedThreadExecutor(String threadName, int threadsNum) { ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build(); - return Executors.newFixedThreadPool(threadsNum, threadFactory); + return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadsNum, threadFactory); } public static ScheduledExecutorService newSingleDaemonScheduledExecutorService(String threadName) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index f1b8485c69..d7f10f03fb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -64,9 +64,9 @@ public class MasterConfig implements Validator { private int execThreads = 10; // todo: change to sync thread pool/ async thread pool ? - private int masterTaskExecuteThreadPoolSize = Runtime.getRuntime().availableProcessors(); + private int masterSyncTaskExecutorThreadPoolSize = Runtime.getRuntime().availableProcessors(); - private int masterAsyncTaskStateCheckThreadPoolSize = Runtime.getRuntime().availableProcessors(); + private int masterAsyncTaskExecutorThreadPoolSize = Runtime.getRuntime().availableProcessors(); /** * The task dispatch thread pool size. */ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java index f69ff4fb77..7590708208 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceDispatchOperationFunction.java @@ -21,10 +21,10 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchR import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager; import lombok.extern.slf4j.Slf4j; @@ -41,7 +41,7 @@ public class LogicITaskInstanceDispatchOperationFunction private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder; @Autowired - private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue; + private MasterTaskExecutorThreadPoolManager masterTaskExecutorThreadPool; @Override public LogicTaskDispatchResponse operate(LogicTaskDispatchRequest taskDispatchRequest) { @@ -62,16 +62,12 @@ public class LogicITaskInstanceDispatchOperationFunction MasterTaskExecutor masterTaskExecutor = masterTaskExecutorFactoryBuilder .createMasterTaskExecutorFactory(taskExecutionContext.getTaskType()) .createMasterTaskExecutor(taskExecutionContext); - if (globalMasterTaskExecuteRunnableQueue - .submitMasterTaskExecuteRunnable(masterTaskExecutor)) { - log.info("Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue success", taskInstanceName); + if (masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor)) { + log.info("Submit LogicTask: {} to MasterTaskExecutorThreadPool success", taskInstanceName); return LogicTaskDispatchResponse.success(taskInstanceId); } else { - log.error( - "Submit LogicTask: {} to MasterDelayTaskExecuteRunnableDelayQueue failed, current task waiting queue size: {} is full", - taskInstanceName, globalMasterTaskExecuteRunnableQueue.size()); - return LogicTaskDispatchResponse.failed(taskInstanceId, - "MasterDelayTaskExecuteRunnableDelayQueue is full"); + log.error("Submit LogicTask: {} to MasterTaskExecutorThreadPool failed", taskInstanceName); + return LogicTaskDispatchResponse.failed(taskInstanceId, "MasterTaskExecutorThreadPool is full"); } } finally { LogUtils.removeWorkflowAndTaskInstanceIdMDC(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java index 27fc52333d..121c5ae6e1 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceKillOperationFunction.java @@ -21,10 +21,10 @@ import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillReque import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException; -import org.apache.dolphinscheduler.server.master.runner.GlobalMasterTaskExecuteRunnableQueue; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager; import lombok.extern.slf4j.Slf4j; @@ -38,7 +38,7 @@ public class LogicITaskInstanceKillOperationFunction ITaskInstanceOperationFunction { @Autowired - private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue; + private MasterTaskExecutorThreadPoolManager masterTaskExecutorThreadPool; @Override public LogicTaskKillResponse operate(LogicTaskKillRequest taskKillRequest) { @@ -54,8 +54,8 @@ public class LogicITaskInstanceKillOperationFunction } try { masterTaskExecutor.cancelTask(); - globalMasterTaskExecuteRunnableQueue - .removeMasterTaskExecuteRunnable(masterTaskExecutor); + // todo: if we remove success then we don't need to cancel? + masterTaskExecutorThreadPool.removeMasterTaskExecutor(masterTaskExecutor); return LogicTaskKillResponse.success(); } catch (MasterTaskExecuteException e) { log.error("Cancel MasterTaskExecuteRunnable failed ", e); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java deleted file mode 100644 index 9005416b92..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueue.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner; - -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import org.springframework.stereotype.Component; - -/** - * - */ -@Component -public class GlobalMasterTaskExecuteRunnableQueue { - - private final BlockingQueue masterTaskExecutorBlockingQueue = - new LinkedBlockingQueue<>(); - - public boolean submitMasterTaskExecuteRunnable(MasterTaskExecutor masterTaskExecutor) { - return masterTaskExecutorBlockingQueue.offer(masterTaskExecutor); - } - - public MasterTaskExecutor takeMasterTaskExecuteRunnable() throws InterruptedException { - return masterTaskExecutorBlockingQueue.take(); - } - - public boolean removeMasterTaskExecuteRunnable(MasterTaskExecutor masterTaskExecutor) { - return masterTaskExecutorBlockingQueue.remove(masterTaskExecutor); - } - - public int size() { - return masterTaskExecutorBlockingQueue.size(); - } - -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java deleted file mode 100644 index e0b1f80704..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalMasterTaskExecuteRunnableQueueLooper.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner; - -import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorHolder; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPool; - -import java.util.concurrent.atomic.AtomicBoolean; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class GlobalMasterTaskExecuteRunnableQueueLooper extends BaseDaemonThread implements AutoCloseable { - - @Autowired - private GlobalMasterTaskExecuteRunnableQueue globalMasterTaskExecuteRunnableQueue; - - @Autowired - private MasterTaskExecutorThreadPool masterTaskExecutorThreadPool; - - private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false); - - public GlobalMasterTaskExecuteRunnableQueueLooper() { - super("MasterDelayTaskExecuteRunnableDelayQueueLooper"); - } - - @Override - public synchronized void start() { - if (!RUNNING_FLAG.compareAndSet(false, true)) { - log.error("The MasterDelayTaskExecuteRunnableDelayQueueLooper already started, will not start again"); - return; - } - log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper starting..."); - super.start(); - masterTaskExecutorThreadPool.start(); - log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper started..."); - } - - @Override - public void run() { - while (RUNNING_FLAG.get()) { - try { - final MasterTaskExecutor masterTaskExecutor = - globalMasterTaskExecuteRunnableQueue.takeMasterTaskExecuteRunnable(); - masterTaskExecutorThreadPool.submitMasterTaskExecutor(masterTaskExecutor); - MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has been interrupted, will stop loop"); - break; - } - } - log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper stop loop..."); - } - - @Override - public void close() throws Exception { - if (RUNNING_FLAG.compareAndSet(true, false)) { - log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper stopping..."); - log.info("MasterDelayTaskExecuteRunnableDelayQueueLooper stopped..."); - } - } -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java index f4d50537c6..7e0d683571 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueue.java @@ -23,17 +23,21 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +/** + * The class is used to store {@link TaskExecuteRunnable} which needs to be dispatched. The {@link TaskExecuteRunnable} will be stored in a {@link DelayQueue}, + * if the {@link TaskExecuteRunnable}'s delay time is 0, then it will be consumed by {@link GlobalTaskDispatchWaitingQueueLooper}. + */ @Slf4j @Component public class GlobalTaskDispatchWaitingQueue { private final DelayQueue queue = new DelayQueue<>(); - public void submitNeedToDispatchTaskExecuteRunnable(DefaultTaskExecuteRunnable priorityTaskExecuteRunnable) { + public void submitTaskExecuteRunnable(DefaultTaskExecuteRunnable priorityTaskExecuteRunnable) { queue.put(priorityTaskExecuteRunnable); } - public DefaultTaskExecuteRunnable takeNeedToDispatchTaskExecuteRunnable() throws InterruptedException { + public DefaultTaskExecuteRunnable takeTaskExecuteRunnable() throws InterruptedException { return queue.take(); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java index b496bea5a5..a1f4b28783 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/GlobalTaskDispatchWaitingQueueLooper.java @@ -42,7 +42,7 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple private final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false); - private final AtomicInteger DISPATCHED_TIMES = new AtomicInteger(); + private final AtomicInteger DISPATCHED_CONSECUTIVE_FAILURE_TIMES = new AtomicInteger(); private static final Integer MAX_DISPATCHED_FAILED_TIMES = 100; @@ -66,24 +66,24 @@ public class GlobalTaskDispatchWaitingQueueLooper extends BaseDaemonThread imple DefaultTaskExecuteRunnable defaultTaskExecuteRunnable; while (RUNNING_FLAG.get()) { try { - defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeNeedToDispatchTaskExecuteRunnable(); + defaultTaskExecuteRunnable = globalTaskDispatchWaitingQueue.takeTaskExecuteRunnable(); } catch (InterruptedException e) { log.warn("Get waiting dispatch task failed, the current thread has been interrupted, will stop loop"); Thread.currentThread().interrupt(); break; } try { - final TaskDispatcher taskDispatcher = taskDispatchFactory - .getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance().getTaskType()); + TaskDispatcher taskDispatcher = + taskDispatchFactory.getTaskDispatcher(defaultTaskExecuteRunnable.getTaskInstance()); taskDispatcher.dispatchTask(defaultTaskExecuteRunnable); - DISPATCHED_TIMES.set(0); + DISPATCHED_CONSECUTIVE_FAILURE_TIMES.set(0); } catch (Exception e) { defaultTaskExecuteRunnable.getTaskExecutionContext().increaseDispatchFailTimes(); - globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(defaultTaskExecuteRunnable); - if (DISPATCHED_TIMES.incrementAndGet() > MAX_DISPATCHED_FAILED_TIMES) { + globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(defaultTaskExecuteRunnable); + if (DISPATCHED_CONSECUTIVE_FAILURE_TIMES.incrementAndGet() > MAX_DISPATCHED_FAILED_TIMES) { ThreadUtils.sleep(10 * 1000L); } - log.error("Dispatch task failed", e); + log.error("Dispatch Task: {} failed", defaultTaskExecuteRunnable.getTaskInstance().getName(), e); } } log.info("GlobalTaskDispatchWaitingQueueLooper started..."); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java index 3e99d2141c..7d3bf6940b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecutorBootstrap.java @@ -31,16 +31,12 @@ public class MasterTaskExecutorBootstrap implements AutoCloseable { @Autowired private GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper; - @Autowired - private GlobalMasterTaskExecuteRunnableQueueLooper globalMasterTaskExecuteRunnableQueueLooper; - @Autowired private AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper; public synchronized void start() { log.info("MasterTaskExecutorBootstrap starting..."); globalTaskDispatchWaitingQueueLooper.start(); - globalMasterTaskExecuteRunnableQueueLooper.start(); asyncMasterTaskDelayQueueLooper.start(); log.info("MasterTaskExecutorBootstrap started..."); } @@ -51,8 +47,6 @@ public class MasterTaskExecutorBootstrap implements AutoCloseable { try ( final GlobalTaskDispatchWaitingQueueLooper globalTaskDispatchWaitingQueueLooper1 = globalTaskDispatchWaitingQueueLooper; - final GlobalMasterTaskExecuteRunnableQueueLooper globalMasterTaskExecuteRunnableQueueLooper1 = - globalMasterTaskExecuteRunnableQueueLooper; final AsyncMasterTaskDelayQueueLooper asyncMasterTaskDelayQueueLooper1 = asyncMasterTaskDelayQueueLooper) { // closed the resource diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java index 1979b48de5..52469fb54c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/dispatcher/TaskDispatchFactory.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.dispatcher; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.utils.TaskUtils; import lombok.extern.slf4j.Slf4j; @@ -38,4 +39,8 @@ public class TaskDispatchFactory { return TaskUtils.isMasterTask(taskType) ? masterTaskDispatcher : workerTaskDispatcher; } + public TaskDispatcher getTaskDispatcher(TaskInstance taskInstance) { + return getTaskDispatcher(taskInstance.getTaskType()); + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java index 63b8636be6..86b711f245 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncMasterTaskDelayQueueLooper.java @@ -18,12 +18,9 @@ package org.apache.dolphinscheduler.server.master.runner.execute; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; @@ -39,12 +36,9 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements private AsyncMasterTaskDelayQueue asyncMasterTaskDelayQueue; @Autowired - private MasterConfig masterConfig; - + private MasterAsyncTaskExecutorThreadPool masterAsyncTaskExecutorThreadPool; private static final AtomicBoolean RUNNING_FLAG = new AtomicBoolean(false); - private ExecutorService asyncTaskStateCheckThreadPool; - public AsyncMasterTaskDelayQueueLooper() { super("AsyncMasterTaskDelayQueueLooper"); } @@ -63,8 +57,6 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements @Override public void run() { - asyncTaskStateCheckThreadPool = ThreadUtils.newDaemonFixedThreadExecutor("AsyncTaskStateCheckThreadPool", - masterConfig.getMasterAsyncTaskStateCheckThreadPoolSize()); while (RUNNING_FLAG.get()) { AsyncTaskExecutionContext asyncTaskExecutionContext; try { @@ -86,7 +78,7 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements "Cannot find the taskInstance from TaskExecutionContextCacheManager, the task may already been killed, will stop the async master task"); continue; } - asyncTaskStateCheckThreadPool.submit(() -> { + masterAsyncTaskExecutorThreadPool.getThreadPool().submit(() -> { final AsyncTaskExecuteFunction asyncTaskExecuteFunction = asyncTaskExecutionContext.getAsyncTaskExecuteFunction(); final AsyncTaskCallbackFunction asyncTaskCallbackFunction = @@ -131,8 +123,5 @@ public class AsyncMasterTaskDelayQueueLooper extends BaseDaemonThread implements log.warn("The AsyncMasterTaskDelayQueueLooper is not started, will not close"); return; } - log.info("AsyncMasterTaskDelayQueueLooper closing..."); - asyncTaskStateCheckThreadPool.shutdown(); - log.info("AsyncMasterTaskDelayQueueLooper closed..."); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java new file mode 100644 index 0000000000..6c8e2caa73 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/IMasterTaskExecutorThreadPool.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.execute; + +public interface IMasterTaskExecutorThreadPool { + + boolean submitMasterTaskExecutor(T masterTaskExecutor); + + boolean removeMasterTaskExecutor(T masterTaskExecutor); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java new file mode 100644 index 0000000000..868b66b6df --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterAsyncTaskExecutorThreadPool.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.execute; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; + +import java.util.concurrent.ThreadPoolExecutor; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class MasterAsyncTaskExecutorThreadPool implements IMasterTaskExecutorThreadPool { + + private final ThreadPoolExecutor threadPoolExecutor; + + public MasterAsyncTaskExecutorThreadPool(MasterConfig masterConfig) { + this.threadPoolExecutor = ThreadUtils.newDaemonFixedThreadExecutor("MasterAsyncTaskExecutorThreadPool", + masterConfig.getMasterSyncTaskExecutorThreadPoolSize()); + } + + @Override + public boolean submitMasterTaskExecutor(AsyncMasterTaskExecutor asyncMasterTaskExecutor) { + synchronized (MasterAsyncTaskExecutorThreadPool.class) { + // todo: check if the thread pool is overload + threadPoolExecutor.submit(asyncMasterTaskExecutor); + return true; + } + } + + @Override + public boolean removeMasterTaskExecutor(AsyncMasterTaskExecutor asyncMasterTaskExecutor) { + return threadPoolExecutor.remove(asyncMasterTaskExecutor); + } + + // todo: remove this method, it's not a good idea to expose the ThreadPoolExecutor to out side. + ThreadPoolExecutor getThreadPool() { + return threadPoolExecutor; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java similarity index 53% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java index d61d058d22..3f683076e6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPool.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterSyncTaskExecutorThreadPool.java @@ -20,32 +20,34 @@ package org.apache.dolphinscheduler.server.master.runner.execute; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import java.util.concurrent.ThreadPoolExecutor; + import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - @Slf4j @Component -public class MasterTaskExecutorThreadPool { +public class MasterSyncTaskExecutorThreadPool implements IMasterTaskExecutorThreadPool { - @Autowired - private MasterConfig masterConfig; + private final ThreadPoolExecutor threadPoolExecutor; - private ListeningExecutorService listeningExecutorService; - - public synchronized void start() { - log.info("MasterTaskExecuteRunnableThreadPool starting..."); - this.listeningExecutorService = MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor( - "MasterTaskExecuteRunnableThread", masterConfig.getMasterTaskExecuteThreadPoolSize())); - log.info("MasterTaskExecuteRunnableThreadPool started..."); + public MasterSyncTaskExecutorThreadPool(MasterConfig masterConfig) { + this.threadPoolExecutor = ThreadUtils.newDaemonFixedThreadExecutor("MasterSyncTaskExecutorThreadPool", + masterConfig.getMasterSyncTaskExecutorThreadPoolSize()); } - public void submitMasterTaskExecutor(MasterTaskExecutor masterTaskExecutor) { - listeningExecutorService.submit(masterTaskExecutor); + @Override + public boolean submitMasterTaskExecutor(SyncMasterTaskExecutor syncMasterTaskExecutor) { + synchronized (MasterSyncTaskExecutorThreadPool.class) { + // todo: check if the thread pool is overload + threadPoolExecutor.submit(syncMasterTaskExecutor); + return true; + } } + @Override + public boolean removeMasterTaskExecutor(SyncMasterTaskExecutor syncMasterTaskExecutor) { + return threadPoolExecutor.remove(syncMasterTaskExecutor); + } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java new file mode 100644 index 0000000000..1f4a916897 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.execute; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class MasterTaskExecutorThreadPoolManager { + + @Autowired + private MasterSyncTaskExecutorThreadPool masterSyncTaskExecutorThreadPool; + + @Autowired + private MasterAsyncTaskExecutorThreadPool masterAsyncTaskExecutorThreadPool; + + public boolean submitMasterTaskExecutor(MasterTaskExecutor masterTaskExecutor) { + if (masterTaskExecutor instanceof SyncMasterTaskExecutor) { + return masterSyncTaskExecutorThreadPool + .submitMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor); + } + if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) { + return masterAsyncTaskExecutorThreadPool + .submitMasterTaskExecutor((AsyncMasterTaskExecutor) masterTaskExecutor); + } + throw new IllegalArgumentException("Unknown type of MasterTaskExecutor: " + masterTaskExecutor); + } + + public boolean removeMasterTaskExecutor(MasterTaskExecutor masterTaskExecutor) { + if (masterTaskExecutor instanceof SyncMasterTaskExecutor) { + return masterSyncTaskExecutorThreadPool + .removeMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor); + } + if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) { + return masterAsyncTaskExecutorThreadPool + .removeMasterTaskExecutor((AsyncMasterTaskExecutor) masterTaskExecutor); + } + throw new IllegalArgumentException("Unknown type of MasterTaskExecutor: " + masterTaskExecutor); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java index 6f0419ae97..cb2c7a0e07 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/BaseTaskExecuteRunnableDispatchOperator.java @@ -52,6 +52,6 @@ public abstract class BaseTaskExecuteRunnableDispatchOperator implements TaskExe taskInstance.getName(), taskInstance.getDelayTime(), remainTime); } - globalTaskDispatchWaitingQueue.submitNeedToDispatchTaskExecuteRunnable(taskExecuteRunnable); + globalTaskDispatchWaitingQueue.submitTaskExecuteRunnable(taskExecuteRunnable); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 8aa4c36af2..4203516f42 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -105,13 +105,7 @@ public abstract class AbstractCommandExecutor { TaskCallBack taskCallBack) throws Exception { TaskResponse result = new TaskResponse(); int taskInstanceId = taskRequest.getTaskInstanceId(); - if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { - log.warn( - "Cannot find the taskInstance: {} from TaskExecutionContextCacheManager, the task might already been killed", - taskInstanceId); - result.setExitStatusCode(EXIT_CODE_KILL); - return result; - } + // todo: we need to use state like JDK Thread to make sure the killed task should not be executed iShellInterceptorBuilder = iShellInterceptorBuilder .shellDirectory(taskRequest.getExecutePath()) .shellName(taskRequest.getTaskAppId()); @@ -155,13 +149,7 @@ public abstract class AbstractCommandExecutor { // cache processId taskRequest.setProcessId(processId); - boolean updateTaskExecutionContextStatus = - TaskExecutionContextCacheManager.updateTaskExecutionContext(taskRequest); - if (Boolean.FALSE.equals(updateTaskExecutionContextStatus)) { - result.setExitStatusCode(EXIT_CODE_KILL); - cancelApplication(); - return result; - } + // print process id log.info("process start, process id is: {}", processId); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java deleted file mode 100644 index dbe39d7042..0000000000 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContextCacheManager.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.task.api; - -import java.util.Collection; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class TaskExecutionContextCacheManager { - - private TaskExecutionContextCacheManager() { - throw new IllegalStateException("Utility class"); - } - - /** - * taskInstance cache - */ - private static final Map taskRequestContextCache = new ConcurrentHashMap<>(); - - /** - * get taskInstance by taskInstance id - * - * @param taskInstanceId taskInstanceId - * @return taskInstance - */ - - public static TaskExecutionContext getByTaskInstanceId(Integer taskInstanceId) { - return taskRequestContextCache.get(taskInstanceId); - } - - /** - * cache taskInstance - * - * @param request request - */ - public static void cacheTaskExecutionContext(TaskExecutionContext request) { - taskRequestContextCache.put(request.getTaskInstanceId(), request); - } - - /** - * remove taskInstance by taskInstanceId - * - * @param taskInstanceId taskInstanceId - */ - public static void removeByTaskInstanceId(Integer taskInstanceId) { - taskRequestContextCache.remove(taskInstanceId); - } - - public static boolean updateTaskExecutionContext(TaskExecutionContext request) { - taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(), (k, v) -> request); - return taskRequestContextCache.containsKey(request.getTaskInstanceId()); - } - - public static Collection getAllTaskRequestList() { - return taskRequestContextCache.values(); - } -} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index f5f794859b..1ce6b12c22 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -21,7 +21,6 @@ import static java.util.Collections.singletonList; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.API_VERSION; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.CPU; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.JOB_TTL_SECONDS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.LAYER_LABEL; @@ -39,7 +38,6 @@ import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters; @@ -284,12 +282,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { TaskResponse result = new TaskResponse(); int taskInstanceId = taskRequest.getTaskInstanceId(); try { - if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { - result.setExitStatusCode(EXIT_CODE_KILL); - return result; - } if (StringUtils.isEmpty(k8sParameterStr)) { - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); return result; } K8sTaskExecutionContext k8sTaskExecutionContext = taskRequest.getK8sTaskExecutionContext(); @@ -371,10 +364,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { public void setTaskStatus(int jobStatus, String taskInstanceId, TaskResponse taskResponse) { if (jobStatus == EXIT_CODE_SUCCESS || jobStatus == EXIT_CODE_FAILURE) { - if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(Integer.valueOf(taskInstanceId))) { - log.info("[K8sJobExecutor-{}] killed", job.getMetadata().getName()); - taskResponse.setExitStatusCode(EXIT_CODE_KILL); - } else if (jobStatus == EXIT_CODE_SUCCESS) { + if (jobStatus == EXIT_CODE_SUCCESS) { log.info("[K8sJobExecutor-{}] succeed in k8s", job.getMetadata().getName()); taskResponse.setExitStatusCode(EXIT_CODE_SUCCESS); } else { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index 4bbd29d18e..1e7629acce 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.plugin.task.api.k8s; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; - import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor; @@ -90,7 +88,7 @@ public class K8sTaskExecutorTest { TaskResponse taskResponse = new TaskResponse(); k8sTaskExecutor.setJob(job); k8sTaskExecutor.setTaskStatus(jobStatus, String.valueOf(taskInstanceId), taskResponse); - Assertions.assertEquals(0, Integer.compare(EXIT_CODE_KILL, taskResponse.getExitStatusCode())); + Assertions.assertEquals(0, taskResponse.getExitStatusCode()); } @Test public void testWaitTimeoutNormal() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java index 3f87271015..204b024e19 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/test/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTaskTest.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.dvc; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -35,7 +34,6 @@ public class DvcTaskTest { TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters); - TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); return taskExecutionContext; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java index e6df60491d..0f70b67f75 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/test/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTaskTest.java @@ -21,7 +21,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import java.io.File; import java.io.IOException; @@ -127,7 +126,6 @@ public class KubeflowTaskTest { } Mockito.when(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml()) .thenReturn(kubeflowParameters.getClusterYAML()); - TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); return taskExecutionContext; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java index cf746799c6..4196e208e7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/test/java/org/apache/dolphinler/plugin/task/mlflow/MlflowTaskTest.java @@ -20,7 +20,6 @@ package org.apache.dolphinler.plugin.task.mlflow; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowConstants; import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowParameters; import org.apache.dolphinscheduler.plugin.task.mlflow.MlflowTask; @@ -58,7 +57,6 @@ public class MlflowTaskTest { String parameters = JSONUtils.toJsonString(mlflowParameters); TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters); - TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); return taskExecutionContext; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java index 25b975b885..c213021607 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/test/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTaskTest.java @@ -21,7 +21,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.commons.lang3.SystemUtils; @@ -198,7 +197,6 @@ public class PytorchTaskTest { String parameters = JSONUtils.toJsonString(pytorchParameters); TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class); Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(parameters); - TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); return taskExecutionContext; } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index e61409c9d2..e8ae5381fd 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -22,15 +22,14 @@ import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer; -import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueueLooper; -import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder; import org.apache.commons.collections4.CollectionUtils; @@ -52,9 +51,6 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; @Slf4j public class WorkerServer implements IStoppable { - @Autowired - private WorkerManagerThread workerManagerThread; - @Autowired private WorkerRegistryClient workerRegistryClient; @@ -67,9 +63,6 @@ public class WorkerServer implements IStoppable { @Autowired private MessageRetryRunner messageRetryRunner; - @Autowired - private GlobalTaskInstanceWaitingQueueLooper globalTaskInstanceWaitingQueueLooper; - /** * worker server startup, not use web service * @@ -88,10 +81,7 @@ public class WorkerServer implements IStoppable { this.workerRegistryClient.setRegistryStoppable(this); this.workerRegistryClient.start(); - this.workerManagerThread.start(); - this.messageRetryRunner.start(); - this.globalTaskInstanceWaitingQueueLooper.start(); /* * registry hooks, which are called before the process exits @@ -114,7 +104,13 @@ public class WorkerServer implements IStoppable { WorkerRpcServer closedWorkerRpcServer = workerRpcServer; WorkerRegistryClient closedRegistryClient = workerRegistryClient) { log.info("Worker server is stopping, current cause : {}", cause); - // kill running tasks + // todo: we need to remove this method + // since for some task, we need to take-over the remote task after the worker restart + // and if the worker crash, the `killAllRunningTasks` will not be execute, this will cause there exist two + // kind of situation: + // 1. If the worker is stop by kill, the tasks will be kill. + // 2. If the worker is stop by kill -9, the tasks will not be kill. + // So we don't need to kill the tasks. this.killAllRunningTasks(); } catch (Exception e) { log.error("Worker server stop failed, current cause: {}", cause, e); @@ -129,25 +125,25 @@ public class WorkerServer implements IStoppable { } public void killAllRunningTasks() { - Collection taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList(); - if (CollectionUtils.isEmpty(taskRequests)) { + Collection workerTaskExecutors = WorkerTaskExecutorHolder.getAllTaskExecutor(); + if (CollectionUtils.isEmpty(workerTaskExecutors)) { return; } - log.info("Worker begin to kill all cache task, task size: {}", taskRequests.size()); + log.info("Worker begin to kill all cache task, task size: {}", workerTaskExecutors.size()); int killNumber = 0; - for (TaskExecutionContext taskRequest : taskRequests) { + for (WorkerTaskExecutor workerTaskExecutor : workerTaskExecutors) { // kill task when it's not finished yet try { - LogUtils.setWorkflowAndTaskInstanceIDMDC(taskRequest.getProcessInstanceId(), - taskRequest.getTaskInstanceId()); - if (ProcessUtils.kill(taskRequest)) { + TaskExecutionContext taskExecutionContext = workerTaskExecutor.getTaskExecutionContext(); + LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId()); + if (ProcessUtils.kill(taskExecutionContext)) { killNumber++; } } finally { LogUtils.removeWorkflowAndTaskInstanceIdMDC(); } } - log.info("Worker after kill all cache task, task size: {}, killed number: {}", taskRequests.size(), + log.info("Worker after kill all cache task, task size: {}, killed number: {}", workerTaskExecutors.size(), killNumber); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java index 2b9ce2ec5f..28a4d8b214 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import java.time.Duration; @@ -92,13 +93,15 @@ public class MessageRetryRunner extends BaseDaemonThread { needToRetryMessages.remove(taskInstanceId); } - public void updateMessageHost(int taskInstanceId, String messageReceiverHost) { + public boolean updateMessageHost(int taskInstanceId, String messageReceiverHost) { List taskInstanceMessages = this.needToRetryMessages.get(taskInstanceId); - if (taskInstanceMessages != null) { - taskInstanceMessages.forEach(taskInstanceMessage -> { - taskInstanceMessage.getEvent().setWorkflowInstanceHost(messageReceiverHost); - }); + if (CollectionUtils.isEmpty(taskInstanceMessages)) { + return false; } + taskInstanceMessages.forEach(taskInstanceMessage -> { + taskInstanceMessage.getEvent().setWorkflowInstanceHost(messageReceiverHost); + }); + return true; } public void run() { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index ab60de9a77..b383af752d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask; import org.apache.commons.collections4.CollectionUtils; @@ -55,7 +55,7 @@ public class WorkerRegistryClient implements AutoCloseable { private WorkerConfig workerConfig; @Autowired - private WorkerManagerThread workerManagerThread; + private WorkerTaskExecutorThreadPool workerManagerThread; @Autowired private RegistryClient registryClient; @@ -71,7 +71,7 @@ public class WorkerRegistryClient implements AutoCloseable { this.workerHeartBeatTask = new WorkerHeartBeatTask( workerConfig, registryClient, - () -> workerManagerThread.getWaitSubmitQueueSize()); + () -> workerManagerThread.getWaitingTaskExecutorSize()); } public void start() { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java index dc97a2c9a4..b2c3d0b606 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerWaitingStrategy.java @@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.registry.api.StrategyType; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcServer; -import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue; -import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; import java.time.Duration; @@ -54,10 +54,7 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy { private MessageRetryRunner messageRetryRunner; @Autowired - private WorkerManagerThread workerManagerThread; - - @Autowired - private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue; + private WorkerTaskExecutorThreadPool workerManagerThread; @Override public void disconnect() { @@ -121,7 +118,7 @@ public class WorkerWaitingStrategy implements WorkerConnectStrategy { workerRpcServer.close(); log.warn("Worker server close the RPC server due to lost connection from registry"); workerManagerThread.clearTask(); - globalTaskInstanceWaitingQueue.clearTask(); + WorkerTaskExecutorHolder.clear(); log.warn("Worker server clear the tasks due to lost connection from registry"); messageRetryRunner.clearMessage(); log.warn("Worker server clear the retry message due to lost connection from registry"); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java index c57ee2efca..51b75aa457 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/StreamingTaskInstanceOperatorImpl.java @@ -21,12 +21,11 @@ import org.apache.dolphinscheduler.extract.worker.IStreamingTaskInstanceOperator import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest; import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; import lombok.extern.slf4j.Slf4j; @@ -38,7 +37,7 @@ import org.springframework.stereotype.Component; public class StreamingTaskInstanceOperatorImpl implements IStreamingTaskInstanceOperator { @Autowired - private WorkerManagerThread workerManager; + private WorkerTaskExecutorThreadPool workerManager; @Override public TaskInstanceTriggerSavepointResponse triggerSavepoint(TaskInstanceTriggerSavepointRequest taskInstanceTriggerSavepointRequest) { @@ -47,16 +46,10 @@ public class StreamingTaskInstanceOperatorImpl implements IStreamingTaskInstance try { int taskInstanceId = taskInstanceTriggerSavepointRequest.getTaskInstanceId(); LogUtils.setTaskInstanceIdMDC(taskInstanceId); - TaskExecutionContext taskExecutionContext = - TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); - if (taskExecutionContext == null) { - log.error("Cannot find TaskExecutionContext for taskInstance: {}", taskInstanceId); - return TaskInstanceTriggerSavepointResponse.fail("Cannot find TaskExecutionContext"); - } - WorkerTaskExecutor workerTaskExecutor = workerManager.getTaskExecuteThread(taskInstanceId); + WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId); if (workerTaskExecutor == null) { - log.error("Cannot find WorkerTaskExecuteRunnable for taskInstance: {}", taskInstanceId); - return TaskInstanceTriggerSavepointResponse.fail("Cannot find WorkerTaskExecuteRunnable"); + log.error("Cannot find WorkerTaskExecutor for taskInstance: {}", taskInstanceId); + return TaskInstanceTriggerSavepointResponse.fail("Cannot find TaskExecutionContext"); } AbstractTask task = workerTaskExecutor.getTask(); if (task == null) { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java index 24b5477cce..a8a0c5a2b7 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerLogServiceImpl.java @@ -30,8 +30,8 @@ import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFil import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder; import java.util.List; @@ -64,9 +64,13 @@ public class WorkerLogServiceImpl implements ILogService { @Override public GetAppIdResponse getAppId(GetAppIdRequest getAppIdRequest) { - TaskExecutionContext taskExecutionContext = - TaskExecutionContextCacheManager.getByTaskInstanceId(getAppIdRequest.getTaskInstanceId()); - String appInfoPath = taskExecutionContext.getAppInfoPath(); + String appInfoPath = null; + WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(getAppIdRequest.getTaskInstanceId()); + if (workerTaskExecutor != null) { + // todo: remove this kind of logic, and remove get appId method, the appId should be send by worker rather + // than query by master + appInfoPath = workerTaskExecutor.getTaskExecutionContext().getAppInfoPath(); + } String logPath = getAppIdRequest.getLogPath(); List appIds = org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils.getAppIds(logPath, appInfoPath, PropertyUtils.getString(APPID_COLLECT, DEFAULT_COLLECT_WAY)); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java deleted file mode 100644 index e1ac97b2d3..0000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueue.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.runner; - -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class GlobalTaskInstanceWaitingQueue { - - private final WorkerConfig workerConfig; - - private final BlockingQueue blockingQueue; - - public GlobalTaskInstanceWaitingQueue(WorkerConfig workerConfig) { - this.workerConfig = workerConfig; - this.blockingQueue = new ArrayBlockingQueue<>(workerConfig.getExecThreads()); - } - - public boolean addDispatchTask(TaskExecutionContext taskExecutionContext) { - if (workerConfig.getTaskExecuteThreadsFullPolicy() == TaskExecuteThreadsFullPolicy.CONTINUE) { - return blockingQueue.offer(taskExecutionContext); - } - - if (blockingQueue.size() > getQueueSize()) { - log.warn("Wait submit queue is full, will retry submit task later"); - WorkerServerMetrics.incWorkerSubmitQueueIsFullCount(); - return false; - } - return blockingQueue.offer(taskExecutionContext); - } - - public TaskExecutionContext take() throws InterruptedException { - return blockingQueue.take(); - } - - public void clearTask() { - blockingQueue.clear(); - } - - public int getQueueSize() { - return workerConfig.getExecThreads(); - } - -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java deleted file mode 100644 index 6e501a1be6..0000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/GlobalTaskInstanceWaitingQueueLooper.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.runner; - -import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; -import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; -import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; -import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -@Slf4j -@Component -public class GlobalTaskInstanceWaitingQueueLooper extends BaseDaemonThread { - - @Autowired - private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue; - - @Autowired - private WorkerConfig workerConfig; - - @Autowired - private WorkerMessageSender workerMessageSender; - - @Autowired - private TaskPluginManager taskPluginManager; - - @Autowired - private WorkerManagerThread workerManager; - - @Autowired(required = false) - private StorageOperate storageOperate; - - @Autowired - private WorkerRegistryClient workerRegistryClient; - - protected GlobalTaskInstanceWaitingQueueLooper() { - super("GlobalTaskDispatchQueueLooper"); - } - - public synchronized void start() { - log.info("GlobalTaskDispatchQueueLooper starting"); - super.start(); - log.info("GlobalTaskDispatchQueueLooper started"); - } - - public void run() { - while (true) { - try { - TaskExecutionContext taskExecutionContext = globalTaskInstanceWaitingQueue.take(); - LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); - LogUtils.setTaskInstanceIdMDC(taskExecutionContext.getTaskInstanceId()); - - WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorFactoryBuilder - .createWorkerTaskExecutorFactory( - taskExecutionContext, - workerConfig, - workerMessageSender, - taskPluginManager, - storageOperate, - workerRegistryClient) - .createWorkerTaskExecutor(); - if (workerManager.offer(workerTaskExecutor)) { - log.info("Success submit WorkerDelayTaskExecuteRunnable to WorkerManagerThread's waiting queue"); - } - } catch (InterruptedException e) { - log.error("GlobalTaskDispatchQueueLooper interrupted"); - Thread.currentThread().interrupt(); - break; - } catch (Exception ex) { - log.error("GlobalTaskDispatchQueueLooper error", ex); - } finally { - LogUtils.removeTaskInstanceIdMDC(); - LogUtils.removeTaskInstanceLogFullPathMDC(); - } - } - } - -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java index be8a62a6c8..aeb7d658d1 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskCallbackImpl.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.runner; import org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; @@ -42,15 +41,7 @@ public class TaskCallbackImpl implements TaskCallBack { @Override public void updateRemoteApplicationInfo(int taskInstanceId, ApplicationInfo applicationInfo) { - TaskExecutionContext taskExecutionContext = - TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); - if (taskExecutionContext == null) { - log.error("task execution context is empty, taskInstanceId: {}, applicationInfo:{}", taskInstanceId, - applicationInfo); - return; - } - - log.info("send remote application info {}", applicationInfo); + // todo: use listener taskExecutionContext.setAppIds(applicationInfo.getAppIds()); workerMessageSender.sendMessageWithRetry(taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING_INFO); @@ -58,13 +49,6 @@ public class TaskCallbackImpl implements TaskCallBack { @Override public void updateTaskInstanceInfo(int taskInstanceId) { - TaskExecutionContext taskExecutionContext = - TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); - if (taskExecutionContext == null) { - log.error("task execution context is empty, taskInstanceId: {}", taskInstanceId); - return; - } - workerMessageSender.sendMessageWithRetry(taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.RUNNING_INFO); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java deleted file mode 100644 index 2a6b7feec2..0000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.runner; - -import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; - -import lombok.extern.slf4j.Slf4j; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - -@Slf4j -public class WorkerExecService { - - private final ListeningExecutorService listeningExecutorService; - - private final ExecutorService execService; - - private final ConcurrentHashMap taskExecuteThreadMap; - - public WorkerExecService(ExecutorService execService, - ConcurrentHashMap taskExecuteThreadMap) { - this.execService = execService; - this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService); - this.taskExecuteThreadMap = taskExecuteThreadMap; - WorkerServerMetrics.registerWorkerTaskTotalGauge(taskExecuteThreadMap::size); - } - - public void submit(final WorkerTaskExecutor taskExecuteThread) { - taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread); - ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread); - FutureCallback futureCallback = new FutureCallback() { - - @Override - public void onSuccess(Object o) { - taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId()); - } - - @Override - public void onFailure(Throwable throwable) { - log.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", - taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(), - taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), - throwable); - taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId()); - } - }; - Futures.addCallback(future, futureCallback, this.listeningExecutorService); - } - - /** - * get thread pool queue size - * - * @return queue size - */ - public int getThreadPoolQueueSize() { - return ((ThreadPoolExecutor) this.execService).getQueue().size(); - } - - public int getActiveExecThreadCount() { - return ((ThreadPoolExecutor) this.execService).getActiveCount(); - } - - public Map getTaskExecuteThreadMap() { - return taskExecuteThreadMap; - } - -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java deleted file mode 100644 index 4b666cfb20..0000000000 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.runner; - -import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; - -import javax.annotation.Nullable; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.stereotype.Component; - -/** - * Manage tasks - */ -@Component -@Slf4j -public class WorkerManagerThread implements Runnable { - - private final BlockingQueue waitSubmitQueue; - private final WorkerExecService workerExecService; - - private final int workerExecThreads; - - private final ConcurrentHashMap taskExecuteThreadMap = new ConcurrentHashMap<>(); - - public WorkerManagerThread(WorkerConfig workerConfig) { - workerExecThreads = workerConfig.getExecThreads(); - this.waitSubmitQueue = new LinkedBlockingQueue<>(); - workerExecService = new WorkerExecService( - ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()), - taskExecuteThreadMap); - } - - public @Nullable WorkerTaskExecutor getTaskExecuteThread(Integer taskInstanceId) { - return taskExecuteThreadMap.get(taskInstanceId); - } - - /** - * get wait submit queue size - * - * @return queue size - */ - public int getWaitSubmitQueueSize() { - return waitSubmitQueue.size(); - } - - /** - * get thread pool queue size - * - * @return queue size - */ - public int getThreadPoolQueueSize() { - return workerExecService.getThreadPoolQueueSize(); - } - - /** - * Kill tasks that have not been executed, like delay task - * then send Response to Master, update the execution status of task instance - */ - public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) { - waitSubmitQueue.stream() - .filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext() - .getTaskInstanceId() == taskInstanceId) - .forEach(waitSubmitQueue::remove); - } - - public boolean offer(WorkerTaskExecutor workerDelayTaskExecuteRunnable) { - return waitSubmitQueue.add(workerDelayTaskExecuteRunnable); - } - - public void start() { - log.info("Worker manager thread starting"); - Thread thread = new Thread(this, this.getClass().getName()); - thread.setDaemon(true); - thread.start(); - log.info("Worker manager thread started"); - } - - @Override - public void run() { - WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage); - WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize); - WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage); - WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(workerExecService::getThreadPoolQueueSize); - WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(workerExecService::getActiveExecThreadCount); - - Thread.currentThread().setName("Worker-Execute-Manager-Thread"); - while (!ServerLifeCycleManager.isStopped()) { - try { - if (!ServerLifeCycleManager.isRunning()) { - Thread.sleep(Constants.SLEEP_TIME_MILLIS); - } - if (this.getThreadPoolQueueSize() <= workerExecThreads) { - WorkerTaskExecutor workerTaskExecutor = waitSubmitQueue.take(); - workerExecService.submit(workerTaskExecutor); - } else { - WorkerServerMetrics.incWorkerOverloadCount(); - log.info("Exec queue is full, waiting submit queue {}, waiting exec queue size {}", - this.getWaitSubmitQueueSize(), this.getThreadPoolQueueSize()); - ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); - } - } catch (Exception e) { - log.error("An unexpected interrupt is happened, " - + "the exception will be ignored and this thread will continue to run", e); - } - } - } - - public void clearTask() { - waitSubmitQueue.clear(); - workerExecService.getTaskExecuteThreadMap().values().forEach(workerTaskExecuteRunnable -> { - int taskInstanceId = workerTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(); - try { - workerTaskExecuteRunnable.cancelTask(); - log.info("Cancel the taskInstance in worker {}", taskInstanceId); - } catch (Exception ex) { - log.error("Cancel the taskInstance error {}", taskInstanceId, ex); - } finally { - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); - } - }); - workerExecService.getTaskExecuteThreadMap().clear(); - } -} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java index e47a28d264..f713605a48 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutor.java @@ -39,7 +39,6 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException; import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; import org.apache.dolphinscheduler.plugin.task.api.enums.Direct; @@ -108,7 +107,7 @@ public abstract class WorkerTaskExecutor implements Runnable { sendTaskResult(); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId()); log.info("Remove the current task execute context from worker cache"); clearTaskExecPathIfNeeded(); @@ -118,7 +117,7 @@ public abstract class WorkerTaskExecutor implements Runnable { if (cancelTask()) { log.info("Cancel the task successfully"); } - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId()); taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.FAILURE); taskExecutionContext.setEndTime(System.currentTimeMillis()); workerMessageSender.sendMessageWithRetry(taskExecutionContext, @@ -128,7 +127,7 @@ public abstract class WorkerTaskExecutor implements Runnable { } - public boolean cancelTask() { + protected boolean cancelTask() { // cancel the task if (task == null) { return true; @@ -157,7 +156,7 @@ public abstract class WorkerTaskExecutor implements Runnable { if (DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) { taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.SUCCESS); taskExecutionContext.setEndTime(System.currentTimeMillis()); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId()); workerMessageSender.sendMessageWithRetry(taskExecutionContext, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType.FINISH); log.info( diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java index c2efdc9c7a..a9c2948482 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorFactoryBuilder.java @@ -24,20 +24,31 @@ import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; -import javax.annotation.Nullable; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; -import lombok.NonNull; -import lombok.experimental.UtilityClass; - -@UtilityClass +@Component public class WorkerTaskExecutorFactoryBuilder { - public static WorkerTaskExecutorFactory createWorkerTaskExecutorFactory(@NonNull TaskExecutionContext taskExecutionContext, - @NonNull WorkerConfig workerConfig, - @NonNull WorkerMessageSender workerMessageSender, - @NonNull TaskPluginManager taskPluginManager, - @Nullable StorageOperate storageOperate, - @NonNull WorkerRegistryClient workerRegistryClient) { + @Autowired + private WorkerConfig workerConfig; + + @Autowired + private WorkerMessageSender workerMessageSender; + + @Autowired + private TaskPluginManager taskPluginManager; + + @Autowired + private WorkerTaskExecutorThreadPool workerManager; + + @Autowired(required = false) + private StorageOperate storageOperate; + + @Autowired + private WorkerRegistryClient workerRegistryClient; + + public WorkerTaskExecutorFactory createWorkerTaskExecutorFactory(TaskExecutionContext taskExecutionContext) { return new DefaultWorkerTaskExecutorFactory(taskExecutionContext, workerConfig, workerMessageSender, diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java new file mode 100644 index 0000000000..fa07dbfde6 --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Used to store all running and waiting {@link WorkerTaskExecutor}. If the task has been finished, it will be removed from the map. + */ +public class WorkerTaskExecutorHolder { + + private static final Map workerTaskExecutorMap = new HashMap<>(); + + public static void put(WorkerTaskExecutor workerTaskExecutor) { + int taskInstanceId = workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId(); + if (workerTaskExecutorMap.containsKey(taskInstanceId)) { + throw new IllegalArgumentException("TaskInstance: " + taskInstanceId + " already exists"); + } + workerTaskExecutorMap.put(taskInstanceId, workerTaskExecutor); + } + + public static WorkerTaskExecutor get(int taskInstanceId) { + return workerTaskExecutorMap.get(taskInstanceId); + } + + public static WorkerTaskExecutor remove(int taskInstanceId) { + return workerTaskExecutorMap.remove(taskInstanceId); + } + + public static void clear() { + workerTaskExecutorMap.clear(); + } + + public static Collection getAllTaskExecutor() { + return workerTaskExecutorMap.values(); + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java new file mode 100644 index 0000000000..28588c0dbf --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics; + +import java.util.concurrent.ThreadPoolExecutor; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.stereotype.Component; + +@Component +@Slf4j +public class WorkerTaskExecutorThreadPool { + + private final ThreadPoolExecutor threadPoolExecutor; + + private final WorkerConfig workerConfig; + + public WorkerTaskExecutorThreadPool(WorkerConfig workerConfig) { + this.threadPoolExecutor = + ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool", workerConfig.getExecThreads()); + this.workerConfig = workerConfig; + + WorkerServerMetrics.registerWorkerCpuUsageGauge(OSUtils::cpuUsagePercentage); + WorkerServerMetrics.registerWorkerMemoryAvailableGauge(OSUtils::availablePhysicalMemorySize); + WorkerServerMetrics.registerWorkerMemoryUsageGauge(OSUtils::memoryUsagePercentage); + WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge( + () -> threadPoolExecutor.getQueue().size() - threadPoolExecutor.getActiveCount()); + WorkerServerMetrics.registerWorkerActiveExecuteThreadGauge(threadPoolExecutor::getActiveCount); + } + + public boolean submitWorkerTaskExecutor(WorkerTaskExecutor workerTaskExecutor) { + synchronized (WorkerTaskExecutorThreadPool.class) { + if (TaskExecuteThreadsFullPolicy.CONTINUE.equals(workerConfig.getTaskExecuteThreadsFullPolicy())) { + WorkerTaskExecutorHolder.put(workerTaskExecutor); + threadPoolExecutor.submit(workerTaskExecutor); + return true; + } + if (isOverload()) { + log.warn("WorkerTaskExecutorThreadPool is overload, cannot submit new WorkerTaskExecutor"); + WorkerServerMetrics.incWorkerSubmitQueueIsFullCount(); + return false; + } + WorkerTaskExecutorHolder.put(workerTaskExecutor); + threadPoolExecutor.submit(workerTaskExecutor); + return true; + } + } + + public boolean isOverload() { + return threadPoolExecutor.getQueue().size() > 0; + } + + public int getWaitingTaskExecutorSize() { + return threadPoolExecutor.getQueue().size(); + } + + public int getRunningTaskExecutorSize() { + return threadPoolExecutor.getActiveCount(); + } + + /** + * Kill tasks that have not been executed, e.g. waiting in the queue + */ + public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) { + synchronized (WorkerTaskExecutorThreadPool.class) { + WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId); + threadPoolExecutor.remove(workerTaskExecutor); + } + } + + public void clearTask() { + threadPoolExecutor.getQueue().clear(); + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java index 4d3ebc9aa9..8c6825df63 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceDispatchOperationFunction.java @@ -20,11 +20,12 @@ package org.apache.dolphinscheduler.server.worker.runner.operator; import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchRequest; import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceDispatchResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics; -import org.apache.dolphinscheduler.server.worker.runner.GlobalTaskInstanceWaitingQueue; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorFactoryBuilder; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; import lombok.extern.slf4j.Slf4j; @@ -41,15 +42,16 @@ public class TaskInstanceDispatchOperationFunction private WorkerConfig workerConfig; @Autowired - private GlobalTaskInstanceWaitingQueue globalTaskInstanceWaitingQueue; + private WorkerTaskExecutorFactoryBuilder workerTaskExecutorFactoryBuilder; + + @Autowired + private WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool; @Override public TaskInstanceDispatchResponse operate(TaskInstanceDispatchRequest taskInstanceDispatchRequest) { log.info("Receive TaskInstanceDispatchRequest: {}", taskInstanceDispatchRequest); TaskExecutionContext taskExecutionContext = taskInstanceDispatchRequest.getTaskExecutionContext(); try { - // set cache, it will be used when kill task - TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext); taskExecutionContext.setHost(workerConfig.getWorkerAddress()); taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext)); @@ -57,9 +59,11 @@ public class TaskInstanceDispatchOperationFunction taskExecutionContext.getTaskInstanceId()); TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType()); - if (!globalTaskInstanceWaitingQueue.addDispatchTask(taskExecutionContext)) { - log.error("Submit task: {} to wait queue error, current queue size: {} is full", - taskExecutionContext.getTaskName(), workerConfig.getExecThreads()); + WorkerTaskExecutor workerTaskExecutor = workerTaskExecutorFactoryBuilder + .createWorkerTaskExecutorFactory(taskExecutionContext).createWorkerTaskExecutor(); + // todo: hold the workerTaskExecutor + if (!workerTaskExecutorThreadPool.submitWorkerTaskExecutor(workerTaskExecutor)) { + log.info("Submit task: {} to wait queue failed", taskExecutionContext.getTaskName()); return TaskInstanceDispatchResponse.failed(taskExecutionContext.getTaskInstanceId(), "WorkerManagerThread is full"); } else { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java index c5f4ffb78b..69e3994a90 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java @@ -23,13 +23,13 @@ import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceKillRe import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; -import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; import lombok.extern.slf4j.Slf4j; @@ -45,7 +45,7 @@ public class TaskInstanceKillOperationFunction ITaskInstanceOperationFunction { @Autowired - private WorkerManagerThread workerManager; + private WorkerTaskExecutorThreadPool workerManager; @Autowired private MessageRetryRunner messageRetryRunner; @@ -57,22 +57,24 @@ public class TaskInstanceKillOperationFunction int taskInstanceId = taskInstanceKillRequest.getTaskInstanceId(); try { LogUtils.setTaskInstanceIdMDC(taskInstanceId); - TaskExecutionContext taskExecutionContext = - TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); - if (taskExecutionContext == null) { - log.error("Cannot find TaskExecutionContext for taskInstance: {}", taskInstanceId); - return TaskInstanceKillResponse.fail("Cannot find TaskExecutionContext"); + WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId); + if (workerTaskExecutor == null) { + log.error("Cannot find WorkerTaskExecutor for taskInstance: {}", taskInstanceId); + return TaskInstanceKillResponse.fail("Cannot find WorkerTaskExecutor"); } + TaskExecutionContext taskExecutionContext = workerTaskExecutor.getTaskExecutionContext(); + LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); boolean result = doKill(taskExecutionContext); - this.cancelApplication(taskInstanceId); + this.cancelApplication(workerTaskExecutor); int processId = taskExecutionContext.getProcessId(); if (processId == 0) { workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); taskExecutionContext.setCurrentExecutionStatus(TaskExecutionStatus.KILL); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); + // todo: the task might be executed, but the processId is 0 + WorkerTaskExecutorHolder.remove(taskInstanceId); log.info("The task has not been executed and has been cancelled, task id:{}", taskInstanceId); return TaskInstanceKillResponse.success(taskExecutionContext); } @@ -80,7 +82,7 @@ public class TaskInstanceKillOperationFunction taskExecutionContext .setCurrentExecutionStatus(result ? TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId()); messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId()); return TaskInstanceKillResponse.success(taskExecutionContext); } finally { @@ -102,15 +104,11 @@ public class TaskInstanceKillOperationFunction return processFlag; } - protected void cancelApplication(int taskInstanceId) { - WorkerTaskExecutor workerTaskExecutor = workerManager.getTaskExecuteThread(taskInstanceId); - if (workerTaskExecutor == null) { - log.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId); - return; - } + protected void cancelApplication(WorkerTaskExecutor workerTaskExecutor) { AbstractTask task = workerTaskExecutor.getTask(); if (task == null) { - log.warn("task not found, taskInstanceId:{}", taskInstanceId); + log.warn("task not found, taskInstanceId: {}", + workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId()); return; } try { @@ -118,7 +116,8 @@ public class TaskInstanceKillOperationFunction } catch (Exception e) { log.error("kill task error", e); } - log.info("kill task by cancelApplication, task id:{}", taskInstanceId); + log.info("kill task by cancelApplication, taskInstanceId: {}", + workerTaskExecutor.getTaskExecutionContext().getTaskInstanceId()); } protected boolean killProcess(String tenantCode, Integer processId) { diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java index 6d31feff7e..7485b9230f 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/UpdateWorkflowHostOperationFunction.java @@ -20,9 +20,10 @@ package org.apache.dolphinscheduler.server.worker.runner.operator; import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostRequest; import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutor; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorHolder; import lombok.extern.slf4j.Slf4j; @@ -47,21 +48,29 @@ public class UpdateWorkflowHostOperationFunction LogUtils.setTaskInstanceIdMDC(taskInstanceId); log.info("Received UpdateWorkflowHostRequest: {}", updateWorkflowHostRequest); - TaskExecutionContext taskExecutionContext = - TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); - if (taskExecutionContext == null) { - log.error("Cannot find the taskExecutionContext for taskInstance : {}", taskInstanceId); - return UpdateWorkflowHostResponse.failed("Cannot find the taskExecutionContext"); + boolean updateWorkerTaskExecutor = updateHostInWorkflowTaskExecutor(taskInstanceId, workflowHost); + boolean updateMessage = updateHostInMessage(taskInstanceId, workflowHost); + if (updateWorkerTaskExecutor || updateMessage) { + return UpdateWorkflowHostResponse.success(); } - - LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); - taskExecutionContext.setWorkflowInstanceHost(workflowHost); - messageRetryRunner.updateMessageHost(taskInstanceId, workflowHost); - log.info("Success update workflow host: {} for taskInstance: {}", workflowHost, taskInstanceId); - return UpdateWorkflowHostResponse.success(); + return UpdateWorkflowHostResponse.failed("The taskInstance is not in the worker"); } finally { LogUtils.removeTaskInstanceIdMDC(); LogUtils.removeTaskInstanceLogFullPathMDC(); } } + + private boolean updateHostInWorkflowTaskExecutor(int taskInstanceId, String workflowHost) { + WorkerTaskExecutor workerTaskExecutor = WorkerTaskExecutorHolder.get(taskInstanceId); + if (workerTaskExecutor == null) { + return false; + } + TaskExecutionContext taskExecutionContext = workerTaskExecutor.getTaskExecutionContext(); + taskExecutionContext.setWorkflowInstanceHost(workflowHost); + return true; + } + + private boolean updateHostInMessage(int taskInstanceId, String workflowHost) { + return messageRetryRunner.updateMessageHost(taskInstanceId, workflowHost); + } } diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java index d55ccc25af..e6043486f5 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; +import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecutorThreadPool; import java.time.Duration; import java.util.Set; @@ -67,7 +67,7 @@ public class WorkerRegistryClientTest { private ScheduledExecutorService heartBeatExecutor; @Mock - private WorkerManagerThread workerManagerThread; + private WorkerTaskExecutorThreadPool workerManagerThread; @Mock private WorkerConnectStrategy workerConnectStrategy; diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java new file mode 100644 index 0000000000..988f1f7fec --- /dev/null +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.dolphinscheduler.common.enums.ResUploadType; +import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; +import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; +import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.TaskPluginManager; +import org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; +import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender; +import org.apache.dolphinscheduler.spi.enums.ResourceType; + +import java.io.IOException; +import java.util.List; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class WorkerTaskExecutorThreadPoolTest { + + @Test + public void testIsOverload() { + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setExecThreads(1); + workerConfig.setTaskExecuteThreadsFullPolicy(TaskExecuteThreadsFullPolicy.CONTINUE); + WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool = new WorkerTaskExecutorThreadPool(workerConfig); + // submit 100 task, the thread pool size is 1 + // assert the overload should be true + // assert the submitQueue should be 99 + for (int i = 0; i < 100; i++) { + boolean submitResult = + workerTaskExecutorThreadPool.submitWorkerTaskExecutor(new MockWorkerTaskExecutor(() -> { + try { + Thread.sleep(10_000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + })); + Assertions.assertTrue(submitResult); + } + Assertions.assertTrue(workerTaskExecutorThreadPool.isOverload()); + Assertions.assertEquals(99, workerTaskExecutorThreadPool.getWaitingTaskExecutorSize()); + Assertions.assertEquals(1, workerTaskExecutorThreadPool.getRunningTaskExecutorSize()); + } + + static class MockWorkerTaskExecutor extends WorkerTaskExecutor { + + private final Runnable runnable; + + protected MockWorkerTaskExecutor(Runnable runnable) { + super(TaskExecutionContext.builder().taskInstanceId((int) System.nanoTime()).build(), new WorkerConfig(), + new WorkerMessageSender(), new TaskPluginManager(), new StorageOperate() { + + @Override + public void createTenantDirIfNotExists(String tenantCode) { + + } + + @Override + public String getResDir(String tenantCode) { + return null; + } + + @Override + public String getUdfDir(String tenantCode) { + return null; + } + + @Override + public boolean mkdir(String tenantCode, String path) throws IOException { + return false; + } + + @Override + public String getResourceFullName(String tenantCode, String fileName) { + return null; + } + + @Override + public String getResourceFileName(String tenantCode, String fullName) { + return null; + } + + @Override + public String getFileName(ResourceType resourceType, String tenantCode, String fileName) { + return null; + } + + @Override + public boolean exists(String fullName) { + return false; + } + + @Override + public boolean delete(String filePath, boolean recursive) { + return false; + } + + @Override + public boolean delete(String filePath, List childrenPathArray, + boolean recursive) { + return false; + } + + @Override + public boolean copy(String srcPath, String dstPath, boolean deleteSource, + boolean overwrite) { + return false; + } + + @Override + public String getDir(ResourceType resourceType, String tenantCode) { + return null; + } + + @Override + public boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource, + boolean overwrite) { + return false; + } + + @Override + public void download(String srcFilePath, String dstFile, boolean overwrite) { + + } + + @Override + public List vimFile(String tenantCode, String filePath, int skipLineNums, + int limit) { + return null; + } + + @Override + public void deleteTenant(String tenantCode) { + + } + + @Override + public ResUploadType returnStorageType() { + return null; + } + + @Override + public List listFilesStatusRecursively(String path, String defaultPath, + String tenantCode, ResourceType type) { + return null; + } + + @Override + public List listFilesStatus(String path, String defaultPath, String tenantCode, + ResourceType type) throws Exception { + return null; + } + + @Override + public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode, + ResourceType type) throws Exception { + return null; + } + }, new WorkerRegistryClient()); + this.runnable = runnable; + } + + @Override + public void run() { + executeTask(new TaskCallbackImpl(null, null)); + } + + @Override + protected void executeTask(TaskCallBack taskCallBack) { + runnable.run(); + } + } + +}