Browse Source

refactor kill logic

Technoboy- 5 years ago
  1. 2
  2. 2
  3. 2
  4. 2
  5. 2
  6. 2
  7. 6
  8. 10
  9. 16
  10. 8
  11. 20
  12. 119
  13. 8
  14. 13
  15. 6
  16. 8
  17. 4
  18. 116
  19. 17
  20. 22
  21. 44
  22. 6


@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task ack */ EXECUTE_TASK_ACK, /** * execute task response */ EXECUTE_TASK_RESPONSE, /** * kill task */ EXECUTE_TASK_REQUEST, /** EXECUTE_TASK_REQUEST, EXECUTE_TASK_REQUEST, */ EXECUTE_TASK_REQUEST, _RESPONSE, /** * ping */ PING, /** * pong */ PONG; }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** EXECUTE_TASK_REQUEST, * execute task ack EXECUTE_TASK_REQUEST, */ EXECUTE_TASK_REQUEST, EXECUTE_TASK_ACK, */ TASK_EXECUTE_ACK, EXECUTE_TASK_REQUEST, */ * execute task response */ TASK_EXECUTE_RESPONSE, EXECUTE_TASK_REQUEST, */ EXECUTE_TASK_REQUEST, */ EXECUTE_TASK_REQUEST, * execute task ack EXECUTE_TASK_REQUEST, */ * kill task response EXECUTE_TASK_REQUEST, * execute task response * execute task ack _RESPONSE, /** * ping */ PING, /** * pong */ PONG; }

dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ → dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import; import java.util.Date; /** * execute task request command */ public class ExecuteTaskAckCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * startTime */ private Date startTime; /** * host */ private String host; /** * status */ private int status; /** * logPath */ private String logPath; /** * executePath */ private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import; import java.util.Date; /** * execute task request command */ public class TaskExecuteAckCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * startTime */ private Date startTime; /** * host */ private String host; /** * status */ private int status; /** * logPath */ private String logPath; /** * executePath */ private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; } }

dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ → dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } ExecuteTaskRequestCommand implements Serializable { ExecuteTaskRequestCommand implements Serializable { ExecuteTaskRequestCommand implements Serializable { } ExecuteTaskRequestCommand implements Serializable { } ExecuteTaskRequestCommand implements Serializable { /** this.taskExecutionContext = taskExecutionContext; } /** ExecuteTaskRequestCommand implements Serializable { * task execution context * ExecuteTaskRequestCommand implements Serializable { private String taskExecutionContext; */ ExecuteTaskRequestCommand implements Serializable { public String getTaskExecutionContext() { ExecuteTaskRequestCommand implements Serializable { return taskExecutionContext; ExecuteTaskRequestCommand implements Serializable { } ExecuteTaskRequestCommand implements Serializable { } /** * task execution context */ RequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import; /** * execute task request command */ public class private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } /** /** ExecuteTaskRequestCommand implements Serializable { public String getTaskExecutionContext() { /** return taskExecutionContext; } public String getTaskExecutionContext() { /** */ this.taskExecutionContext = taskExecutionContext; } public String getTaskExecutionContext() { /** public String getTaskExecutionContext() { /** * task execution context public TaskExecuteRequestCommand(String taskExecutionContext) { /** private String taskExecutionContext; } public String getTaskExecutionContext() { return taskExecutionContext; /** } * task execution context * task execution context ExecuteTaskRequestCommand implements Serializable { */ * task execution context Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; /** * task execution context public String getTaskExecutionContext() { * task execution context return taskExecutionContext; public String toString() { */ RequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; } }

dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ → dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import; import java.util.Date; /** * execute task response command */ public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; /** * processId */ private int processId; /** * appIds */ private String appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getAppIds() { return appIds; } public void setAppIds(String appIds) { this.appIds = appIds; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + ", endTime=" + endTime + ", processId=" + processId + ", appIds='" + appIds + '\'' + '}'; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import; import java.util.Date; /** * execute task response command */ public class TaskExecuteResponseCommand implements Serializable { public TaskExecuteResponseCommand() { } public TaskExecuteResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; /** * processId */ private int processId; /** * appIds */ private String appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getAppIds() { return appIds; } public void setAppIds(String appIds) { this.appIds = appIds; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + ", endTime=" + endTime + ", processId=" + processId + ", appIds='" + appIds + '\'' + '}'; } }

dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ → dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import; /** * kill task request command */ public class KillTaskRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } KillTaskRequestCommand implements Serializable { KillTaskRequestCommand implements Serializable { KillTaskRequestCommand implements Serializable { } KillTaskRequestCommand implements Serializable { } KillTaskRequestCommand implements Serializable { /** this.taskExecutionContext = taskExecutionContext; } /** KillTaskRequestCommand implements Serializable { * task execution context * KillTaskRequestCommand implements Serializable { private String taskExecutionContext; */ KillTaskRequestCommand implements Serializable { public String getTaskExecutionContext() { KillTaskRequestCommand implements Serializable { return taskExecutionContext; KillTaskRequestCommand implements Serializable { } KillTaskRequestCommand implements Serializable { } /** * task execution context */ RequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import; /** * kill task request command */ public class private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } /** /** KillTaskRequestCommand implements Serializable { public String getTaskExecutionContext() { /** return taskExecutionContext; } public String getTaskExecutionContext() { /** */ this.taskExecutionContext = taskExecutionContext; } public String getTaskExecutionContext() { /** public String getTaskExecutionContext() { /** * task execution context public TaskKillRequestCommand(String taskExecutionContext) { /** private String taskExecutionContext; } public String getTaskExecutionContext() { return taskExecutionContext; /** } * task execution context * task execution context KillTaskRequestCommand implements Serializable { */ * task execution context Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; /** * task execution context public String getTaskExecutionContext() { * task execution context return taskExecutionContext; public String toString() { */ RequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; } }

dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ → dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/

@ -1 +1 @@
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import; import java.util.Date; import java.util.List; /** * kill task response command */ public class KillTaskResponseCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * host */ private String host; /** * status */ private int status; /** * processId */ private int processId; /** * other resource manager appId , for example : YARN etc */ protected List<String> appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getHost() { return host; } public void setHost(String host) { = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public List<String> getAppIds() { return appIds; } public void setAppIds(List<String> appIds) { this.appIds = appIds; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "KillTaskResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", host='" + host + '\'' + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + '}'; } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import; import java.util.Date; import java.util.List; /** * kill task response command */ public class TaskKillResponseCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * host */ private String host; /** * status */ private int status; /** * processId */ private int processId; /** * other resource manager appId , for example : YARN etc */ protected List<String> appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getHost() { return host; } public void setHost(String host) { = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public List<String> getAppIds() { return appIds; } public void setAppIds(List<String> appIds) { this.appIds = appIds; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", host='" + host + '\'' + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + '}'; } }


@ -126,9 +126,9 @@ public class MasterServer implements IStoppable {
NettyServerConfig serverConfig = new NettyServerConfig();
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor());

dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/ vendored

@ -18,8 +18,8 @@
package org.apache.dolphinscheduler.server.master.cache;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@ -47,14 +47,14 @@ public interface TaskInstanceCacheManager {
* @param taskAckCommand taskAckCommand
void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand);
void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand);
* cache taskInstance
* @param executeTaskResponseCommand executeTaskResponseCommand
* @param taskExecuteResponseCommand taskExecuteResponseCommand
void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand);
void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand);
* remove taskInstance by taskInstanceId

dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/ vendored

@ -18,8 +18,8 @@ package org.apache.dolphinscheduler.server.master.cache.impl;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -86,7 +86,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
* @param taskAckCommand taskAckCommand
public void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand) {
public void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand) {
TaskInstance taskInstance = new TaskInstance();
@ -99,13 +99,13 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
* cache taskInstance
* @param executeTaskResponseCommand executeTaskResponseCommand
* @param taskExecuteResponseCommand taskExecuteResponseCommand
public void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand) {
TaskInstance taskInstance = getByTaskInstanceId(executeTaskResponseCommand.getTaskInstanceId());
public void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand) {
TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId());


@ -41,6 +41,14 @@ public interface ExecutorManager<T> {
T execute(ExecutionContext context) throws ExecuteException;
* execute task directly without retry
* @param context context
* @return T
* @throws ExecuteException
void executeDirectly(ExecutionContext context) throws ExecuteException;
* after execute
* @param context context


@ -21,7 +21,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
@ -68,8 +69,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
* register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
* register EXECUTE_TASK_ACK command type TaskAckProcessor
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor());
@ -128,13 +130,19 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
return success;
public void executeDirectly(ExecutionContext context) throws ExecuteException {
Command command = buildCommand(context);
Host host = context.getHost();
* build command
* @param context context
* @return command
private Command buildCommand(ExecutionContext context) {
ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand();
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
@ -203,4 +211,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
return nodes;
public NettyRemotingClient getNettyRemotingClient() {
return nettyRemotingClient;


@ -1,119 +0,0 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
* netty executor manager
public class NettyKillManager extends AbstractExecutorManager<Boolean>{
private final Logger logger = LoggerFactory.getLogger(NettyKillManager.class);
* netty remote client
private final NettyRemotingClient nettyRemotingClient;
public NettyKillManager(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
* register KILL_TASK_RESPONSE command type TaskKillResponseProcessor
this.nettyRemotingClient.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
* execute logic
* @param context context
* @return result
* @throws ExecuteException
public Boolean execute(ExecutionContext context) throws ExecuteException {
Host host = context.getHost();
Command command = buildCommand(context);
try {
doExecute(host, command);
return true;
}catch (ExecuteException ex) {
logger.error(String.format("execute context : %s error", context.getContext()), ex);
return false;
private Command buildCommand(ExecutionContext context) {
KillTaskRequestCommand requestCommand = new KillTaskRequestCommand();
TaskExecutionContext taskExecutionContext = context.getContext();
return requestCommand.convert2Command();
* execute logic
* @param host host
* @param command command
* @throws ExecuteException
private void doExecute(final Host host, final Command command) throws ExecuteException {
* retry countdefault retry 3
int retryCount = 3;
boolean success = false;
do {
try {
nettyRemotingClient.send(host, command);
success = true;
} catch (Exception ex) {
logger.error(String.format("send command : %s to %s error", command, host), ex);
try {
} catch (InterruptedException ignore) {}
} while (retryCount >= 0 && !success);
if (!success) {
throw new ExecuteException(String.format("send command : %s to %s error", command, host));


@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
@ -57,12 +57,12 @@ public class TaskAckProcessor implements NettyRequestProcessor {
* task ack process
* @param channel channel channel
* @param command command ExecuteTaskAckCommand
* @param command command TaskExecuteAckCommand
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class);"taskAckCommand : {}", taskAckCommand);


@ -18,19 +18,12 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,9 +43,9 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.KILL_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
KillTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskResponseCommand.class);
TaskKillResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class);"received command : {}", responseCommand);"已经接受到了worker杀任务的回应");


@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
@ -63,9 +63,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class);
TaskExecuteResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class);"received command : {}", responseCommand);


@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyKillManager;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -57,7 +57,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
private TaskInstanceCacheManager taskInstanceCacheManager;
private NettyKillManager nettyKillManager;
private NettyExecutorManager nettyExecutorManager;
* constructor of MasterTaskExecThread
@ -67,7 +67,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
super(taskInstance, processInstance);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.nettyKillManager = SpringApplicationContext.getBean(NettyKillManager.class);
this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
@ -191,7 +191,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
nettyExecutorManager.executeDirectly(executionContext);"master add kill task :{} id:{} to kill queue",
taskInstance.getName(), taskInstance.getId() );


@ -127,8 +127,8 @@ public class WorkerServer implements IStoppable {
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup());


@ -1,116 +0,0 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
* taks callback service
public class KillTaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(KillTaskCallbackService.class);
* remote channels
private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
* netty remoting client
private final NettyRemotingClient nettyRemotingClient;
public KillTaskCallbackService(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
* add callback channel
* @param taskInstanceId taskInstanceId
* @param channel channel
public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel){
REMOTE_CHANNELS.put(taskInstanceId, channel);
* get callback channel
* @param taskInstanceId taskInstanceId
* @return callback channel
public NettyRemoteChannel getRemoteChannel(int taskInstanceId){
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
return nettyRemoteChannel;
Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
if(newChannel != null){
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque());
addRemoteChannel(taskInstanceId, remoteChannel);
return remoteChannel;
return null;
* remove callback channels
* @param taskInstanceId taskInstanceId
public void remove(int taskInstanceId){
* send result
* @param taskInstanceId taskInstanceId
* @param killTaskResponseCommand killTaskResponseCommand
public void sendKillResult(int taskInstanceId, KillTaskResponseCommand killTaskResponseCommand){
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if(nettyRemoteChannel == null){
} else{
nettyRemoteChannel.writeAndFlush(killTaskResponseCommand.convert2Command()).addListener(new ChannelFutureListener(){
public void operationComplete(ChannelFuture future) throws Exception {


@ -22,17 +22,18 @@ import;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
* taks callback service
public class TaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
@ -92,14 +93,14 @@ public class TaskCallbackService {
* send ack
* @param taskInstanceId taskInstanceId
* @param ackCommand ackCommand
* @param command command
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
public void sendAck(int taskInstanceId, Command command){
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if(nettyRemoteChannel == null){
} else{
@ -107,14 +108,14 @@ public class TaskCallbackService {
* send result
* @param taskInstanceId taskInstanceId
* @param responseCommand responseCommand
* @param command command
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
public void sendResult(int taskInstanceId, Command command){
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if(nettyRemoteChannel == null){
} else{
nettyRemoteChannel.writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener(){
public void operationComplete(ChannelFuture future) throws Exception {


@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@ -69,18 +69,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private final TaskCallbackService taskCallbackService;
public TaskExecuteProcessor(){
this.taskCallbackService = new TaskCallbackService();
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(),
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));"received command : {}", command);
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), ExecuteTaskRequestCommand.class);
TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), TaskExecuteRequestCommand.class);
String contextJson = taskRequestCommand.getTaskExecutionContext();
@ -105,8 +105,8 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private void doAck(TaskExecutionContext taskExecutionContext){
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
@ -134,10 +134,10 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
* build ack command
* @param taskExecutionContext taskExecutionContext
* @return ExecuteTaskAckCommand
* @return TaskExecuteAckCommand
private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();


@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
@ -58,7 +58,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* task callback service
private final KillTaskCallbackService killTaskCallbackService;
private final TaskCallbackService taskCallbackService;
* taskExecutionContextCacheManager
@ -72,7 +72,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
public TaskKillProcessor(){
this.killTaskCallbackService = new KillTaskCallbackService();
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
@ -115,41 +115,41 @@ public class TaskKillProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
KillTaskRequestCommand killTaskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class);"received command : {}", killTaskRequestCommand);
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand taskKillRequestCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);"received command : {}", taskKillRequestCommand);
String contextJson = killTaskRequestCommand.getTaskExecutionContext();
String contextJson = taskKillRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
Boolean killStatus = doKill(taskExecutionContext);
new NettyRemoteChannel(channel, command.getOpaque()));
KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
* build KillTaskResponseCommand
* build TaskKillResponseCommand
* @param taskExecutionContext taskExecutionContext
* @param killStatus killStatus
* @return build KillTaskResponseCommand
* @return build TaskKillResponseCommand
private KillTaskResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
Boolean killStatus) {
KillTaskResponseCommand killTaskResponseCommand = new KillTaskResponseCommand();
killTaskResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
return killTaskResponseCommand;
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
return taskKillResponseCommand;


@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@ -75,7 +75,7 @@ public class TaskExecuteThread implements Runnable {
public void run() {
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
try {"script path : {}", taskExecutionContext.getExecutePath());
// task node
@ -134,7 +134,7 @@ public class TaskExecuteThread implements Runnable {
} finally {
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
