From deca0a7ea1e5e2a34f2eb59e550d02d4d2cf368e Mon Sep 17 00:00:00 2001 From: Wenjun Ruan Date: Wed, 24 May 2023 13:49:32 +0800 Subject: [PATCH] Fix master memory leak due to MasterTaskExecuteRunnableDoesn't be removed (#14162) * Fix master memory leak due to MasterTaskExecuteRunnableDoesn't be removed * Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java Co-authored-by: caishunfeng --- .../MasterTaskDispatchProcessor.java | 1 + .../processor/MasterTaskKillProcessor.java | 10 +-- .../processor/MasterTaskPauseProcessor.java | 8 +- ...ayTaskExecuteRunnableDelayQueueLooper.java | 3 + .../MasterTaskExecuteRunnableThreadPool.java | 89 ------------------- .../AsyncTaskCallbackFunctionImpl.java | 6 +- .../execute/MasterTaskExecuteRunnable.java | 6 +- .../MasterTaskExecuteRunnableHolder.java | 45 ++++++++++ .../MasterTaskExecuteRunnableThreadPool.java | 51 +++++++++++ .../SyncMasterDelayTaskExecuteRunnable.java | 2 + .../runner/operator/TaskTimeoutOperator.java | 10 +-- 11 files changed, 118 insertions(+), 113 deletions(-) delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java index 009a684e92..e70199152a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskDispatchProcessor.java @@ -99,6 +99,7 @@ public class MasterTaskDispatchProcessor implements MasterRpcProcessor { } } catch (Exception ex) { log.error("Handle task dispatch request error, command: {}", taskDispatchRequest, ex); + MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId()); sendDispatchFailedResult(channel, message, taskExecutionContext, ex); } finally { LogUtils.removeWorkflowAndTaskInstanceIdMDC(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java index 1ad56cedd3..37a61e6246 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskKillProcessor.java @@ -26,8 +26,8 @@ import org.apache.dolphinscheduler.remote.command.task.TaskKillRequest; import org.apache.dolphinscheduler.remote.processor.MasterRpcProcessor; import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException; import org.apache.dolphinscheduler.server.master.runner.MasterDelayTaskExecuteRunnableDelayQueue; -import org.apache.dolphinscheduler.server.master.runner.MasterTaskExecuteRunnableThreadPool; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder; import lombok.extern.slf4j.Slf4j; @@ -41,9 +41,6 @@ import io.netty.channel.Channel; @Component public class MasterTaskKillProcessor implements MasterRpcProcessor { - @Autowired - private MasterTaskExecuteRunnableThreadPool masterTaskExecuteRunnableThreadPool; - @Autowired private MasterDelayTaskExecuteRunnableDelayQueue masterDelayTaskExecuteRunnableDelayQueue; @@ -60,7 +57,7 @@ public class MasterTaskKillProcessor implements MasterRpcProcessor { return; } MasterTaskExecuteRunnable masterTaskExecuteRunnable = - masterTaskExecuteRunnableThreadPool.getMasterTaskExecuteRunnable(taskInstanceId); + MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskInstanceId); if (masterTaskExecuteRunnable == null) { log.error("Cannot find the MasterTaskExecuteRunnable, this task may already been killed"); return; @@ -71,6 +68,9 @@ public class MasterTaskKillProcessor implements MasterRpcProcessor { .removeMasterDelayTaskExecuteRunnable(masterTaskExecuteRunnable); } catch (MasterTaskExecuteException e) { log.error("Cancel MasterTaskExecuteRunnable failed ", e); + } finally { + MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId); + MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java index 5b7176c18f..41c9c50d69 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/MasterTaskPauseProcessor.java @@ -25,12 +25,11 @@ import org.apache.dolphinscheduler.remote.command.MessageType; import org.apache.dolphinscheduler.remote.command.task.TaskPauseRequest; import org.apache.dolphinscheduler.remote.processor.MasterRpcProcessor; import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException; -import org.apache.dolphinscheduler.server.master.runner.MasterTaskExecuteRunnableThreadPool; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import io.netty.channel.Channel; @@ -39,14 +38,11 @@ import io.netty.channel.Channel; @Component public class MasterTaskPauseProcessor implements MasterRpcProcessor { - @Autowired - private MasterTaskExecuteRunnableThreadPool masterTaskExecuteRunnableThreadPool; - @Override public void process(Channel channel, Message message) { TaskPauseRequest taskPauseRequest = JSONUtils.parseObject(message.getBody(), TaskPauseRequest.class); MasterTaskExecuteRunnable masterTaskExecuteRunnable = - masterTaskExecuteRunnableThreadPool.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId()); + MasterTaskExecuteRunnableHolder.getMasterTaskExecuteRunnable(taskPauseRequest.getTaskInstanceId()); if (masterTaskExecuteRunnable == null) { log.info("Cannot find the MasterTaskExecuteRunnable"); return; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java index 3b429e5d76..557f2ca447 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterDelayTaskExecuteRunnableDelayQueueLooper.java @@ -19,6 +19,8 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.thread.BaseDaemonThread; import org.apache.dolphinscheduler.server.master.runner.execute.MasterDelayTaskExecuteRunnable; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableHolder; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnableThreadPool; import java.util.concurrent.atomic.AtomicBoolean; @@ -62,6 +64,7 @@ public class MasterDelayTaskExecuteRunnableDelayQueueLooper extends BaseDaemonTh final MasterDelayTaskExecuteRunnable masterDelayTaskExecuteRunnable = masterDelayTaskExecuteRunnableDelayQueue.takeMasterDelayTaskExecuteRunnable(); masterTaskExecuteRunnableThreadPool.submitMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable); + MasterTaskExecuteRunnableHolder.putMasterTaskExecuteRunnable(masterDelayTaskExecuteRunnable); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); log.warn("MasterDelayTaskExecuteRunnableDelayQueueLooper has been interrupted, will stop loop"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java deleted file mode 100644 index 86c95308d8..0000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecuteRunnableThreadPool.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner; - -import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecuteRunnable; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import lombok.extern.slf4j.Slf4j; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; - -@Slf4j -@Component -public class MasterTaskExecuteRunnableThreadPool { - - @Autowired - private MasterConfig masterConfig; - - private static final Map SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>(); - - private ListeningExecutorService listeningExecutorService; - - public synchronized void start() { - log.info("MasterTaskExecuteRunnableThreadPool starting..."); - this.listeningExecutorService = MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor( - "MasterTaskExecuteRunnableThread", masterConfig.getMasterTaskExecuteThreadPoolSize())); - log.info("MasterTaskExecuteRunnableThreadPool started..."); - } - - public void submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) { - ListenableFuture future = listeningExecutorService.submit(masterTaskExecuteRunnable); - Futures.addCallback(future, new MasterTaskExecuteCallback(masterTaskExecuteRunnable), - this.listeningExecutorService); - SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(), - masterTaskExecuteRunnable); - } - - public MasterTaskExecuteRunnable getMasterTaskExecuteRunnable(Integer taskInstanceId) { - return SUBMITTED_MASTER_TASK_MAP.get(taskInstanceId); - } - - private static class MasterTaskExecuteCallback implements FutureCallback { - - private MasterTaskExecuteRunnable masterTaskExecuteRunnable; - - public MasterTaskExecuteCallback(MasterTaskExecuteRunnable masterTaskExecuteRunnable) { - this.masterTaskExecuteRunnable = masterTaskExecuteRunnable; - } - - @Override - public void onSuccess(Object result) { - log.info("MasterTaskExecuteRunnable execute success, will remove this task"); - SUBMITTED_MASTER_TASK_MAP.remove(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId()); - } - - @Override - public void onFailure(Throwable t) { - log.info("MasterTaskExecuteRunnable execute failed, will remove this task"); - SUBMITTED_MASTER_TASK_MAP.remove(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId()); - } - } - -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java index cb5d07fc20..e9d4fe4430 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/AsyncTaskCallbackFunctionImpl.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.runner.execute; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader; @@ -54,8 +53,9 @@ public class AsyncTaskCallbackFunctionImpl implements AsyncTaskCallbackFunction private void executeFinished() { TaskInstanceLogHeader.printFinalizeTaskHeader(); - TaskExecutionContextCacheManager.removeByTaskInstanceId( - asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId()); + int taskInstanceId = asyncMasterDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(); + MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskInstanceId); + MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskInstanceId); log.info("Task execute finished, removed the TaskExecutionContext"); asyncMasterDelayTaskExecuteRunnable.sendTaskResult(); log.info( diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java index 973baf978e..e9a9e8dae7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnable.java @@ -22,7 +22,6 @@ import static ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER; import org.apache.dolphinscheduler.common.log.remote.RemoteLogUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.log.TaskInstanceLogHeader; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; @@ -59,7 +58,7 @@ public abstract class MasterTaskExecuteRunnable implements Runnable { protected void afterThrowing(Throwable throwable) { try { cancelTask(); - log.info("Get a exception when execute the task, canceled the task"); + log.error("Get a exception when execute the task, canceled the task", throwable); } catch (Exception e) { log.error("Cancel task failed,", e); } @@ -68,7 +67,8 @@ public abstract class MasterTaskExecuteRunnable implements Runnable { log.info( "Get a exception when execute the task, sent the task execute result to master, the current task execute result is {}", taskExecutionContext.getCurrentExecutionStatus()); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId()); + MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId()); log.info("Get a exception when execute the task, removed the TaskExecutionContext"); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java new file mode 100644 index 0000000000..6b29897611 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableHolder.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.execute; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@UtilityClass +public class MasterTaskExecuteRunnableHolder { + + private static final Map SUBMITTED_MASTER_TASK_MAP = new ConcurrentHashMap<>(); + + public void putMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) { + SUBMITTED_MASTER_TASK_MAP.put(masterTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId(), + masterTaskExecuteRunnable); + } + + public MasterTaskExecuteRunnable getMasterTaskExecuteRunnable(Integer taskInstanceId) { + return SUBMITTED_MASTER_TASK_MAP.get(taskInstanceId); + } + + public void removeMasterTaskExecuteRunnable(Integer taskInstanceId) { + SUBMITTED_MASTER_TASK_MAP.remove(taskInstanceId); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java new file mode 100644 index 0000000000..4f542556e4 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecuteRunnableThreadPool.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.execute; + +import org.apache.dolphinscheduler.common.thread.ThreadUtils; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +@Slf4j +@Component +public class MasterTaskExecuteRunnableThreadPool { + + @Autowired + private MasterConfig masterConfig; + + private ListeningExecutorService listeningExecutorService; + + public synchronized void start() { + log.info("MasterTaskExecuteRunnableThreadPool starting..."); + this.listeningExecutorService = MoreExecutors.listeningDecorator(ThreadUtils.newDaemonFixedThreadExecutor( + "MasterTaskExecuteRunnableThread", masterConfig.getMasterTaskExecuteThreadPoolSize())); + log.info("MasterTaskExecuteRunnableThreadPool started..."); + } + + public void submitMasterTaskExecuteRunnable(MasterTaskExecuteRunnable masterTaskExecuteRunnable) { + listeningExecutorService.submit(masterTaskExecuteRunnable); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java index 03d3c0ea6f..04f9321c08 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/SyncMasterDelayTaskExecuteRunnable.java @@ -55,6 +55,8 @@ public class SyncMasterDelayTaskExecuteRunnable extends MasterDelayTaskExecuteRu "Execute task finished, will send the task execute result to master, the current task execute result is {}", taskExecutionContext.getCurrentExecutionStatus().name()); closeLogAppender(); + MasterTaskExecutionContextHolder.removeTaskExecutionContext(taskExecutionContext.getTaskInstanceId()); + MasterTaskExecuteRunnableHolder.removeMasterTaskExecuteRunnable(taskExecutionContext.getTaskInstanceId()); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java index 741a681d8a..17de1e9539 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/operator/TaskTimeoutOperator.java @@ -19,12 +19,9 @@ package org.apache.dolphinscheduler.server.master.runner.operator; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable; -import java.util.Date; - import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -51,9 +48,8 @@ public class TaskTimeoutOperator implements TaskOperator { taskInstance.getName(), taskTimeoutStrategy.name()); return; } - log.info("TaskInstance: {} timeout, will kill the task instance", taskInstance.getName()); - taskInstance.setState(TaskExecutionStatus.FAILURE); - taskInstance.setEndTime(new Date()); - taskInstanceDao.upsertTaskInstance(taskInstance); + taskExecuteRunnable.kill(); + log.info("TaskInstance: {} timeout, killed the task instance", taskInstance.getName()); + } }