diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 9c46ad66fd..6923cd02e1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -128,7 +128,7 @@ public class MasterServer implements IStoppable { this.nettyRemotingServer = new NettyRemotingServer(serverConfig); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); this.nettyRemotingServer.start(); // diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 286b0e6391..7719cf82d2 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; +import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.Host; @@ -71,7 +72,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ */ this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); - this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor()); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); } @@ -130,8 +131,9 @@ public class NettyExecutorManager extends AbstractExecutorManager{ return success; } + @Override public void executeDirectly(ExecutionContext context) throws ExecuteException { - Command command = buildCommand(context); + Command command = buildKillCommand(context); Host host = context.getHost(); doExecute(host,command); } @@ -158,6 +160,28 @@ public class NettyExecutorManager extends AbstractExecutorManager{ return requestCommand.convert2Command(); } + /** + * build command + * @param context context + * @return command + */ + private Command buildKillCommand(ExecutionContext context) { + TaskKillRequestCommand requestCommand = new TaskKillRequestCommand(); + ExecutorType executorType = context.getExecutorType(); + switch (executorType){ + case WORKER: + TaskExecutionContext taskExecutionContext = context.getContext(); + requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext)); + break; + case CLIENT: + break; + default: + throw new IllegalArgumentException("invalid executor type : " + executorType); + + } + return requestCommand.convert2Command(); + } + /** * execute logic * @param host host diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java index 4986e89e67..3e8cdfdadc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java @@ -46,8 +46,7 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor { Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); TaskKillResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class); - logger.info("received command : {}", responseCommand); - logger.info("已经接受到了worker杀任务的回应"); + logger.info("received task kill response command : {}", responseCommand); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 995a4e8c21..51a4f44cda 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; @@ -183,7 +184,6 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { alreadyKilled = true; TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance); - ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER); Host host = new Host();