diff --git a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java index c4347d6bee..e2ab195a4b 100644 --- a/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java +++ b/dolphinscheduler-spi/src/main/java/org/apache/dolphinscheduler/spi/task/TaskExecutionContextCacheManager.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.spi.task; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; +import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -66,4 +67,8 @@ public class TaskExecutionContextCacheManager { taskRequestContextCache.computeIfPresent(request.getTaskInstanceId(), (k, v) -> request); return taskRequestContextCache.containsKey(request.getTaskInstanceId()); } + + public static Collection getAllTaskRequestList() { + return taskRequestContextCache.values(); + } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 6296aeb89f..e51f1d2888 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -37,7 +37,12 @@ import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThr import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.alert.AlertClientService; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; +import org.apache.dolphinscheduler.spi.task.request.TaskRequest; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.Collection; import java.util.Set; import javax.annotation.PostConstruct; @@ -201,6 +206,11 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer.close(); this.workerRegistryClient.unRegistry(); this.alertClientService.close(); + + // kill running tasks + this.killAllRunningTasks(); + + // close the application context this.springApplicationContext.close(); } catch (Exception e) { logger.error("worker server stop exception ", e); @@ -211,4 +221,21 @@ public class WorkerServer implements IStoppable { public void stop(String cause) { close(cause); } + + /** + * kill all tasks which are running + */ + public void killAllRunningTasks() { + Collection taskRequests = TaskExecutionContextCacheManager.getAllTaskRequestList(); + logger.info("ready to kill all cache job, job size:{}", taskRequests.size()); + + if (CollectionUtils.isEmpty(taskRequests)) { + return; + } + + for (TaskRequest taskRequest : taskRequests) { + // kill task when it's not finished yet + org.apache.dolphinscheduler.plugin.task.api.ProcessUtils.kill(taskRequest); + } + } }