diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index cf2cfe51bb..b2042678a5 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -98,6 +98,11 @@ public enum CommandType { */ TASK_KILL_RESPONSE, + /** + * kill task response ack + */ + TASK_KILL_RESPONSE_ACK, + /** * HEART_BEAT */ diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillAckCommand.java new file mode 100644 index 0000000000..61775d59f4 --- /dev/null +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillAckCommand.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.remote.command; + +import org.apache.dolphinscheduler.common.utils.JSONUtils; + +import java.io.Serializable; + +public class TaskKillAckCommand implements Serializable { + + private int taskInstanceId; + private int status; + + public TaskKillAckCommand() { + super(); + } + + public TaskKillAckCommand(int status, int taskInstanceId) { + this.status = status; + this.taskInstanceId = taskInstanceId; + } + + public int getTaskInstanceId() { + return taskInstanceId; + } + + public void setTaskInstanceId(int taskInstanceId) { + this.taskInstanceId = taskInstanceId; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + /** + * package response command + * + * @return command + */ + public Command convert2Command() { + Command command = new Command(); + command.setType(CommandType.TASK_KILL_RESPONSE_ACK); + byte[] body = JSONUtils.toJsonByteArray(this); + command.setBody(body); + return command; + } + + @Override + public String toString() { + return "KillTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}'; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java index ca8ad0ac93..0e9a0b9349 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskKillAckCommand; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; @@ -157,6 +158,8 @@ public class TaskResponsePersistThread implements Runnable { logger.debug("ACTION_STOP: task instance id:{}, process instance id:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId()); } } + TaskKillAckCommand taskKillAckCommand = new TaskKillAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId()); + channel.writeAndFlush(taskKillAckCommand.convert2Command()); break; default: throw new IllegalArgumentException("invalid event type : " + event); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index c66718f21b..ff0067cb29 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor; import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; +import org.apache.dolphinscheduler.server.worker.processor.TaskKillAckProcessor; import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient; import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; @@ -140,6 +141,7 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService, taskPluginManager)); this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE_ACK,new TaskKillAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, new HostUpdateProcessor()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java index 3639b8eba3..9ee91f2b70 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/cache/ResponceCache.java @@ -38,6 +38,7 @@ public class ResponceCache { private Map ackCache = new ConcurrentHashMap<>(); private Map responseCache = new ConcurrentHashMap<>(); + private final Map killResponseCache = new ConcurrentHashMap<>(); /** @@ -54,6 +55,9 @@ public class ResponceCache { case RESULT: responseCache.put(taskInstanceId,command); break; + case ACTION_STOP: + killResponseCache.put(taskInstanceId,command); + break; default: throw new IllegalArgumentException("invalid event type : " + event); } @@ -68,6 +72,19 @@ public class ResponceCache { ackCache.remove(taskInstanceId); } + /** + * remove kill response cache + * + * @param taskInstanceId taskInstanceId + */ + public void removeKillResponseCache(Integer taskInstanceId) { + killResponseCache.remove(taskInstanceId); + } + + public Map getKillResponseCache() { + return killResponseCache; + } + /** * remove reponse cache * @param taskInstanceId taskInstanceId diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillAckProcessor.java new file mode 100644 index 0000000000..bbb2c8f695 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillAckProcessor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.processor; + +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.remote.command.Command; +import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.command.TaskKillAckCommand; +import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; +import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import io.netty.channel.Channel; + +public class TaskKillAckProcessor implements NettyRequestProcessor { + + private final Logger logger = LoggerFactory.getLogger(TaskKillAckProcessor.class); + + @Override + public void process(Channel channel, Command command) { + Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE_ACK == command.getType(), + String.format("invalid command type : %s", command.getType())); + + TaskKillAckCommand taskKillAckCommand = JSONUtils.parseObject( + command.getBody(), TaskKillAckCommand.class); + + if (taskKillAckCommand == null) { + return; + } + + if (taskKillAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) { + ResponceCache.get().removeKillResponseCache(taskKillAckCommand.getTaskInstanceId()); + TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillAckCommand.getTaskInstanceId()); + logger.debug("removeKillResponseCache: taskinstance id:{}", taskKillAckCommand.getTaskInstanceId()); + TaskCallbackService.remove(taskKillAckCommand.getTaskInstanceId()); + logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskKillAckCommand.getTaskInstanceId()); + } + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 4f235eaf78..2cf70aef7e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils; @@ -31,6 +32,7 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.server.utils.ProcessUtils; +import org.apache.dolphinscheduler.server.worker.cache.ResponceCache; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; @@ -93,16 +95,19 @@ public class TaskKillProcessor implements NettyRequestProcessor { TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class); logger.info("received kill command : {}", killCommand); - Pair> result = doKill(killCommand); - taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); - TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result); + Pair> result = doKill(killCommand); + + TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); + + if (taskRequest == null) { + return; + } + TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskRequest, result); + ResponceCache.get().cache(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command(), Event.ACTION_STOP); taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId()); - TaskCallbackService.remove(killCommand.getTaskInstanceId()); - logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId()); } /** @@ -116,7 +121,6 @@ public class TaskKillProcessor implements NettyRequestProcessor { int taskInstanceId = killCommand.getTaskInstanceId(); TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId); TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(JSONUtils.toJsonString(taskRequest), TaskExecutionContext.class); - try { Integer processId = taskExecutionContext.getProcessId(); if (processId.equals(0)) { @@ -161,15 +165,11 @@ public class TaskKillProcessor implements NettyRequestProcessor { * @param result exe result * @return build TaskKillResponseCommand */ - private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand, + private TaskKillResponseCommand buildKillTaskResponseCommand(TaskRequest taskRequest, Pair> result) { TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand(); taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode()); taskKillResponseCommand.setAppIds(result.getRight()); - TaskRequest taskRequest = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); - if (taskRequest == null) { - return taskKillResponseCommand; - } TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(JSONUtils.toJsonString(taskRequest), TaskExecutionContext.class); if (taskExecutionContext != null) { taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java index f52be9dd3b..d99c84d948 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/RetryReportTaskStatusThread.java @@ -88,6 +88,14 @@ public class RetryReportTaskStatusThread implements Runnable { taskCallbackService.sendResult(taskInstanceId,responseCommand); } } + if (!responceCache.getKillResponseCache().isEmpty()) { + Map killResponseCache = responceCache.getKillResponseCache(); + for (Map.Entry entry : killResponseCache.entrySet()) { + Integer taskInstanceId = entry.getKey(); + Command killResponseCommand = entry.getValue(); + taskCallbackService.sendResult(taskInstanceId, killResponseCommand); + } + } }catch (Exception e){ logger.warn("retry report task status error", e); }