diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java index e5c756acab..b8e02dd057 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */ public class TaskKillRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public TaskKillRequestCommand() { } public TaskKillRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */ public class TaskKillRequestCommand implements Serializable { /** * task id */ private int taskInstanceId; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillRequestCommand{" + "taskInstanceId=" + taskInstanceId + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index cdaa0f0d36..0d88d6a129 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.entity; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; -import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; @@ -421,12 +420,6 @@ public class TaskExecutionContext implements Serializable{ return requestCommand.convert2Command(); } - public Command toKillCommand(){ - TaskKillRequestCommand requestCommand = new TaskKillRequestCommand(); - requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this)); - return requestCommand.convert2Command(); - } - @Override public String toString() { return "TaskExecutionContext{" + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 172794e815..f220a09c8e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -16,6 +16,7 @@ */ package org.apache.dolphinscheduler.server.master.runner; +import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; @@ -25,9 +26,8 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; @@ -172,8 +172,6 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { /** - * TODO Kill TASK - * * task instance add queue , waiting worker to kill */ private void cancelTaskInstance() throws Exception{ @@ -182,14 +180,10 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { } alreadyKilled = true; - TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); - taskExecutionContext.setTaskInstanceId(taskInstance.getId()); - taskExecutionContext.setHost(taskInstance.getHost()); - taskExecutionContext.setLogPath(taskInstance.getLogPath()); - taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); - taskExecutionContext.setProcessId(taskInstance.getPid()); + TaskKillRequestCommand killCommand = new TaskKillRequestCommand(); + killCommand.setTaskInstanceId(taskInstance.getId()); - ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup()); + ExecutionContext executionContext = new ExecutionContext(killCommand.convert2Command(), ExecutorType.WORKER); Host host = Host.of(taskInstance.getHost()); executionContext.setHost(host); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 5a8c6686c6..b6f58279b1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.processor; -import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; @@ -32,6 +31,7 @@ import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager; @@ -42,6 +42,7 @@ import org.apache.dolphinscheduler.service.log.LogClientService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.List; /** @@ -66,11 +67,6 @@ public class TaskKillProcessor implements NettyRequestProcessor { */ private TaskExecutionContextCacheManager taskExecutionContextCacheManager; - /** - * appIds - */ - private List appIds; - public TaskKillProcessor(){ this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); @@ -79,86 +75,80 @@ public class TaskKillProcessor implements NettyRequestProcessor { } /** - * kill task logic + * task kill process * - * @param context context - * @return execute result + * @param channel channel channel + * @param command command command */ - private Boolean doKill(TaskExecutionContext context){ + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); + TaskKillRequestCommand killCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class); + logger.info("received kill command : {}", killCommand); + + Pair> result = doKill(killCommand); + + taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(), + new NettyRemoteChannel(channel, command.getOpaque())); + + TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand,result); + taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); + } + + /** + * do kill + * @param killCommand + * @return kill result + */ + private Pair> doKill(TaskKillRequestCommand killCommand){ + List appIds = Collections.EMPTY_LIST; try { - TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(context.getTaskInstanceId()); - context.setProcessId(taskExecutionContext.getProcessId()); + TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); Integer processId = taskExecutionContext.getProcessId(); if (processId == null || processId.equals(0)){ - logger.error("process kill failed, process id :{}, task id:{}", processId, context.getTaskInstanceId()); - return false; + logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId()); + return Pair.of(false, appIds); } + String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(taskExecutionContext.getProcessId())); - String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(context.getProcessId())); - - logger.info("process id:{}, cmd:{}", context.getProcessId(), cmd); + logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd); OSUtils.exeCmd(cmd); - // find log and kill yarn job - killYarnJob(Host.of(context.getHost()).getIp(), - context.getLogPath(), - context.getExecutePath(), - context.getTenantCode()); + appIds = killYarnJob(Host.of(taskExecutionContext.getHost()).getIp(), + taskExecutionContext.getLogPath(), + taskExecutionContext.getExecutePath(), + taskExecutionContext.getTenantCode()); - return true; + return Pair.of(true, appIds); } catch (Exception e) { logger.error("kill task error", e); - return false; } - } - - /** - * task kill process - * - * @param channel channel channel - * @param command command command - */ - @Override - public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); - TaskKillRequestCommand taskKillRequestCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class); - logger.info("received command : {}", taskKillRequestCommand); - - - String contextJson = taskKillRequestCommand.getTaskExecutionContext(); - - TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class); - - Boolean killStatus = doKill(taskExecutionContext); - - taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), - new NettyRemoteChannel(channel, command.getOpaque())); - - TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus); - taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); + return Pair.of(false, appIds); } /** * build TaskKillResponseCommand * - * @param taskExecutionContext taskExecutionContext - * @param killStatus killStatus + * @param killCommand kill command + * @param result exe result * @return build TaskKillResponseCommand */ - private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext, - Boolean killStatus) { + private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand, + Pair> result) { TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand(); - taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - taskKillResponseCommand.setHost(taskExecutionContext.getHost()); - taskKillResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode()); - taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); - taskKillResponseCommand.setAppIds(appIds); - + taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode()); + taskKillResponseCommand.setAppIds(result.getRight()); + TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); + if(taskExecutionContext != null){ + taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskKillResponseCommand.setHost(taskExecutionContext.getHost()); + taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); + } return taskKillResponseCommand; } @@ -169,8 +159,9 @@ public class TaskKillProcessor implements NettyRequestProcessor { * @param logPath logPath * @param executePath executePath * @param tenantCode tenantCode + * @return List appIds */ - private void killYarnJob(String host, String logPath, String executePath, String tenantCode) { + private List killYarnJob(String host, String logPath, String executePath, String tenantCode) { LogClientService logClient = null; try { logClient = new LogClientService(); @@ -185,9 +176,9 @@ public class TaskKillProcessor implements NettyRequestProcessor { } if (appIds.size() > 0) { ProcessUtils.cancelApplication(appIds, logger, tenantCode, executePath); + return appIds; } } - } catch (Exception e) { logger.error("kill yarn job error",e); } finally { @@ -195,6 +186,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { logClient.close(); } } + return Collections.EMPTY_LIST; } }