From 239be31ab732f5bbe68fca3033245125e8388657 Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Wed, 20 Apr 2022 22:46:15 +0800 Subject: [PATCH] [Bug] cancel application when kill task (#9624) * cancel application when kill task * add warn log * add cancel application --- .../worker/processor/TaskKillProcessor.java | 86 +++++++++++-------- .../worker/runner/TaskExecuteThread.java | 4 + .../worker/runner/WorkerExecService.java | 85 ++++++++++++++++++ .../worker/runner/WorkerManagerThread.java | 19 +++- 4 files changed, 154 insertions(+), 40 deletions(-) create mode 100644 dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 9873f1eaf6..ecc1ffe755 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; @@ -27,12 +28,12 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; -import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.server.utils.ProcessUtils; +import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.log.LogClientService; @@ -93,8 +94,9 @@ public class TaskKillProcessor implements NettyRequestProcessor { return; } - Integer processId = taskExecutionContext.getProcessId(); - if (processId.equals(0)) { + int processId = taskExecutionContext.getProcessId(); + if (processId == 0) { + this.cancelApplication(taskInstanceId); workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); @@ -121,22 +123,8 @@ public class TaskKillProcessor implements NettyRequestProcessor { * @return kill result */ private Pair> doKill(TaskExecutionContext taskExecutionContext) { - boolean processFlag = true; - List appIds = Collections.emptyList(); - - try { - String pidsStr = ProcessUtils.getPidsStr(taskExecutionContext.getProcessId()); - if (!StringUtils.isEmpty(pidsStr)) { - String cmd = String.format("kill -9 %s", pidsStr); - cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd); - logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd); - OSUtils.exeCmd(cmd); - } - - } catch (Exception e) { - processFlag = false; - logger.error("kill task error", e); - } + // kill system process + boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId()); // find log and kill yarn job Pair> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()), taskExecutionContext.getLogPath(), @@ -146,27 +134,51 @@ public class TaskKillProcessor implements NettyRequestProcessor { } /** - * build TaskKillResponseCommand - * - * @param killCommand kill command - * @param result exe result - * @return build TaskKillResponseCommand + * kill task by cancel application + * @param taskInstanceId */ - private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand, - Pair> result) { - TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand(); - taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode()); - taskKillResponseCommand.setAppIds(result.getRight()); - TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); - if (taskExecutionContext == null) { - return taskKillResponseCommand; + protected void cancelApplication(int taskInstanceId) { + TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId); + if (taskExecuteThread == null) { + logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId); + return; + } + AbstractTask task = taskExecuteThread.getTask(); + if (task == null) { + logger.warn("task not found, taskInstanceId:{}", taskInstanceId); + return; } - if (taskExecutionContext != null) { - taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskKillResponseCommand.setHost(taskExecutionContext.getHost()); - taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); + try { + task.cancelApplication(true); + } catch (Exception e) { + logger.error("kill task error", e); + } + logger.info("kill task by cancelApplication, task id:{}", taskInstanceId); + } + + /** + * kill system process + * @param tenantCode + * @param processId + */ + protected boolean killProcess(String tenantCode, Integer processId) { + boolean processFlag = true; + if (processId == null || processId.equals(0)) { + return true; + } + try { + String pidsStr = ProcessUtils.getPidsStr(processId); + if (!StringUtils.isEmpty(pidsStr)) { + String cmd = String.format("kill -9 %s", pidsStr); + cmd = OSUtils.getSudoCmd(tenantCode, cmd); + logger.info("process id:{}, cmd:{}", processId, cmd); + OSUtils.exeCmd(cmd); + } + } catch (Exception e) { + processFlag = false; + logger.error("kill task error", e); } - return taskKillResponseCommand; + return processFlag; } /** diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 9fffec7662..b58675b899 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -330,6 +330,10 @@ public class TaskExecuteThread implements Runnable, Delayed { return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); } + public AbstractTask getTask() { + return task; + } + private void preBuildBusinessParams() { Map paramsMap = new HashMap<>(); // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java new file mode 100644 index 0000000000..ff050370cc --- /dev/null +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +public class WorkerExecService { + /** + * logger of WorkerExecService + */ + private static final Logger logger = LoggerFactory.getLogger(WorkerExecService.class); + + private final ListeningExecutorService listeningExecutorService; + + /** + * thread executor service + */ + private final ExecutorService execService; + + /** + * running task + */ + private final ConcurrentHashMap taskExecuteThreadMap; + + public WorkerExecService(ExecutorService execService, ConcurrentHashMap taskExecuteThreadMap) { + this.execService = execService; + this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService); + this.taskExecuteThreadMap = taskExecuteThreadMap; + } + + public void submit(TaskExecuteThread taskExecuteThread) { + taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread); + ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread); + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(Object o) { + taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId()); + } + + @Override + public void onFailure(Throwable throwable) { + logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId() + , taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable); + taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId()); + } + }; + Futures.addCallback(future, futureCallback, this.listeningExecutorService); + } + + /** + * get thread pool queue size + * + * @return queue size + */ + public int getThreadPoolQueueSize() { + return ((ThreadPoolExecutor) this.execService).getQueue().size(); + } + +} \ No newline at end of file diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 49ac9bc6d1..60f752401d 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -54,7 +55,7 @@ public class WorkerManagerThread implements Runnable { /** * thread executor service */ - private final ExecutorService workerExecService; + private final WorkerExecService workerExecService; /** * task callback service @@ -62,8 +63,20 @@ public class WorkerManagerThread implements Runnable { @Autowired private TaskCallbackService taskCallbackService; + /** + * running task + */ + private final ConcurrentHashMap taskExecuteThreadMap = new ConcurrentHashMap<>(); + public WorkerManagerThread(WorkerConfig workerConfig) { - workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()); + workerExecService = new WorkerExecService( + ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()), + taskExecuteThreadMap + ); + } + + public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) { + return this.taskExecuteThreadMap.get(taskInstanceId); } /** @@ -81,7 +94,7 @@ public class WorkerManagerThread implements Runnable { * @return queue size */ public int getThreadPoolQueueSize() { - return ((ThreadPoolExecutor) workerExecService).getQueue().size(); + return this.workerExecService.getThreadPoolQueueSize(); } /**