From 71c11bd59474787da9ad49d2a30c312afad41440 Mon Sep 17 00:00:00 2001 From: Tboy Date: Mon, 2 Mar 2020 22:15:08 +0800 Subject: [PATCH] refactor kill logic (#2060) --- .../remote/command/CommandType.java | 2 +- ...ommand.java => TaskExecuteAckCommand.java} | 2 +- ...nd.java => TaskExecuteRequestCommand.java} | 2 +- ...d.java => TaskExecuteResponseCommand.java} | 2 +- ...mmand.java => TaskKillRequestCommand.java} | 2 +- ...mand.java => TaskKillResponseCommand.java} | 2 +- .../server/master/MasterServer.java | 6 +- .../cache/TaskInstanceCacheManager.java | 10 +- .../impl/TaskInstanceCacheManagerImpl.java | 16 +-- .../dispatch/executor/ExecutorManager.java | 8 ++ .../executor/NettyExecutorManager.java | 20 ++- .../dispatch/executor/NettyKillManager.java | 119 ------------------ .../master/processor/TaskAckProcessor.java | 8 +- .../processor/TaskKillResponseProcessor.java | 13 +- .../processor/TaskResponseProcessor.java | 6 +- .../master/runner/MasterTaskExecThread.java | 8 +- .../server/worker/WorkerServer.java | 4 +- .../processor/KillTaskCallbackService.java | 116 ----------------- .../worker/processor/TaskCallbackService.java | 17 +-- .../processor/TaskExecuteProcessor.java | 22 ++-- .../worker/processor/TaskKillProcessor.java | 44 +++---- .../worker/runner/TaskExecuteThread.java | 6 +- 22 files changed, 107 insertions(+), 328 deletions(-) rename dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/{ExecuteTaskAckCommand.java => TaskExecuteAckCommand.java} (92%) rename dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/{ExecuteTaskRequestCommand.java => TaskExecuteRequestCommand.java} (82%) rename dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/{ExecuteTaskResponseCommand.java => TaskExecuteResponseCommand.java} (87%) rename dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/{KillTaskRequestCommand.java => TaskKillRequestCommand.java} (83%) rename dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/{KillTaskResponseCommand.java => TaskKillResponseCommand.java} (93%) delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java delete mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/KillTaskCallbackService.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java index 053b38f5a2..c8d56597ee 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/CommandType.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task ack */ EXECUTE_TASK_ACK, /** * execute task response */ EXECUTE_TASK_RESPONSE, /** * kill task */ KILL_TASK_REQUEST, /** * kill task response */ KILL_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG; } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java similarity index 92% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java index 52d0a5dd6d..0b3d901a3c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskAckCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteAckCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task request command */ public class ExecuteTaskAckCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * startTime */ private Date startTime; /** * host */ private String host; /** * status */ private int status; /** * logPath */ private String logPath; /** * executePath */ private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task request command */ public class TaskExecuteAckCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * startTime */ private Date startTime; /** * host */ private String host; /** * status */ private int status; /** * logPath */ private String logPath; /** * executePath */ private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java similarity index 82% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java index e7564edf3d..637724f49d 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class ExecuteTaskRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */ public class TaskExecuteRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public TaskExecuteRequestCommand() { } public TaskExecuteRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java similarity index 87% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java index 707bf07550..deb6f5dd8f 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/ExecuteTaskResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task response command */ public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; /** * processId */ private int processId; /** * appIds */ private String appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getAppIds() { return appIds; } public void setAppIds(String appIds) { this.appIds = appIds; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + ", endTime=" + endTime + ", processId=" + processId + ", appIds='" + appIds + '\'' + '}'; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; /** * execute task response command */ public class TaskExecuteResponseCommand implements Serializable { public TaskExecuteResponseCommand() { } public TaskExecuteResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; /** * processId */ private int processId; /** * appIds */ private String appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getAppIds() { return appIds; } public void setAppIds(String appIds) { this.appIds = appIds; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + ", endTime=" + endTime + ", processId=" + processId + ", appIds='" + appIds + '\'' + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java similarity index 83% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java index b8cfd89fc2..e5c756acab 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */ public class KillTaskRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public KillTaskRequestCommand() { } public KillTaskRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "KillTaskRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */ public class TaskKillRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public TaskKillRequestCommand() { } public TaskKillRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java similarity index 93% rename from dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskResponseCommand.java rename to dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java index 515bd07f1d..2ca2330c80 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/KillTaskResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java @@ -1 +1 @@ -/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; import java.util.List; /** * kill task response command */ public class KillTaskResponseCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * host */ private String host; /** * status */ private int status; /** * processId */ private int processId; /** * other resource manager appId , for example : YARN etc */ protected List appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public List getAppIds() { return appIds; } public void setAppIds(List appIds) { this.appIds = appIds; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "KillTaskResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", host='" + host + '\'' + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + '}'; } } \ No newline at end of file +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; import java.util.Date; import java.util.List; /** * kill task response command */ public class TaskKillResponseCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * host */ private String host; /** * status */ private int status; /** * processId */ private int processId; /** * other resource manager appId , for example : YARN etc */ protected List appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public List getAppIds() { return appIds; } public void setAppIds(List appIds) { this.appIds = appIds; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", host='" + host + '\'' + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + '}'; } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 4c0c3e8ce2..9c46ad66fd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -126,9 +126,9 @@ public class MasterServer implements IStoppable { NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(45678); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor()); this.nettyRemotingServer.start(); // diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java index a62ee49f8e..031d8b2b94 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/TaskInstanceCacheManager.java @@ -18,8 +18,8 @@ package org.apache.dolphinscheduler.server.master.cache; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; /** @@ -47,14 +47,14 @@ public interface TaskInstanceCacheManager { * * @param taskAckCommand taskAckCommand */ - void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand); + void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand); /** * cache taskInstance * - * @param executeTaskResponseCommand executeTaskResponseCommand + * @param taskExecuteResponseCommand taskExecuteResponseCommand */ - void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand); + void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand); /** * remove taskInstance by taskInstanceId diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java index dc775d8d67..c149ac3335 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/cache/impl/TaskInstanceCacheManagerImpl.java @@ -18,8 +18,8 @@ package org.apache.dolphinscheduler.server.master.cache.impl; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -86,7 +86,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { * @param taskAckCommand taskAckCommand */ @Override - public void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand) { + public void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand) { TaskInstance taskInstance = new TaskInstance(); taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus())); taskInstance.setStartTime(taskAckCommand.getStartTime()); @@ -99,13 +99,13 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { /** * cache taskInstance * - * @param executeTaskResponseCommand executeTaskResponseCommand + * @param taskExecuteResponseCommand taskExecuteResponseCommand */ @Override - public void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand) { - TaskInstance taskInstance = getByTaskInstanceId(executeTaskResponseCommand.getTaskInstanceId()); - taskInstance.setState(ExecutionStatus.of(executeTaskResponseCommand.getStatus())); - taskInstance.setEndTime(executeTaskResponseCommand.getEndTime()); + public void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand) { + TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId()); + taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus())); + taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime()); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java index f1707df66b..c8cd9d1b2f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/ExecutorManager.java @@ -41,6 +41,14 @@ public interface ExecutorManager { */ T execute(ExecutionContext context) throws ExecuteException; + /** + * execute task directly without retry + * @param context context + * @return T + * @throws ExecuteException + */ + void executeDirectly(ExecutionContext context) throws ExecuteException; + /** * after execute * @param context context diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 544a958cc2..286b0e6391 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -21,7 +21,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.Host; @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; +import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.slf4j.Logger; @@ -68,8 +69,9 @@ public class NettyExecutorManager extends AbstractExecutorManager{ * register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor * register EXECUTE_TASK_ACK command type TaskAckProcessor */ - this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor()); - this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor()); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); + this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor()); } @@ -128,13 +130,19 @@ public class NettyExecutorManager extends AbstractExecutorManager{ return success; } + public void executeDirectly(ExecutionContext context) throws ExecuteException { + Command command = buildCommand(context); + Host host = context.getHost(); + doExecute(host,command); + } + /** * build command * @param context context * @return command */ private Command buildCommand(ExecutionContext context) { - ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand(); + TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(); ExecutorType executorType = context.getExecutorType(); switch (executorType){ case WORKER: @@ -203,4 +211,8 @@ public class NettyExecutorManager extends AbstractExecutorManager{ } return nodes; } + + public NettyRemotingClient getNettyRemotingClient() { + return nettyRemotingClient; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java deleted file mode 100644 index 54d0022cfe..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyKillManager.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.dispatch.executor; - -import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand; -import org.apache.dolphinscheduler.remote.config.NettyClientConfig; -import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; -import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; -import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; -import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Service; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -/** - * netty executor manager - */ -@Service -public class NettyKillManager extends AbstractExecutorManager{ - - private final Logger logger = LoggerFactory.getLogger(NettyKillManager.class); - /** - * netty remote client - */ - private final NettyRemotingClient nettyRemotingClient; - - public NettyKillManager(){ - final NettyClientConfig clientConfig = new NettyClientConfig(); - this.nettyRemotingClient = new NettyRemotingClient(clientConfig); - /** - * register KILL_TASK_RESPONSE command type TaskKillResponseProcessor - */ - this.nettyRemotingClient.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor()); - } - - /** - * execute logic - * - * @param context context - * @return result - * @throws ExecuteException - */ - @Override - public Boolean execute(ExecutionContext context) throws ExecuteException { - Host host = context.getHost(); - Command command = buildCommand(context); - try { - doExecute(host, command); - return true; - }catch (ExecuteException ex) { - logger.error(String.format("execute context : %s error", context.getContext()), ex); - return false; - } - } - - - private Command buildCommand(ExecutionContext context) { - KillTaskRequestCommand requestCommand = new KillTaskRequestCommand(); - TaskExecutionContext taskExecutionContext = context.getContext(); - - requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext)); - return requestCommand.convert2Command(); - } - - /** - * execute logic - * @param host host - * @param command command - * @throws ExecuteException - */ - private void doExecute(final Host host, final Command command) throws ExecuteException { - /** - * retry count,default retry 3 - */ - int retryCount = 3; - boolean success = false; - do { - try { - nettyRemotingClient.send(host, command); - success = true; - } catch (Exception ex) { - logger.error(String.format("send command : %s to %s error", command, host), ex); - retryCount--; - try { - Thread.sleep(100); - } catch (InterruptedException ignore) {} - } - } while (retryCount >= 0 && !success); - - if (!success) { - throw new ExecuteException(String.format("send command : %s to %s error", command, host)); - } - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java index cf3857995c..ef2cb67840 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessor.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; @@ -57,12 +57,12 @@ public class TaskAckProcessor implements NettyRequestProcessor { /** * task ack process * @param channel channel channel - * @param command command ExecuteTaskAckCommand + * @param command command TaskExecuteAckCommand */ @Override public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); - ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class); + Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); + TaskExecuteAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class); logger.info("taskAckCommand : {}", taskAckCommand); taskInstanceCacheManager.cacheTaskInstance(taskAckCommand); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java index d6c3f69c1f..4986e89e67 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java @@ -18,19 +18,12 @@ package org.apache.dolphinscheduler.server.master.processor; import io.netty.channel.Channel; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; -import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; -import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; -import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +43,9 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor { */ @Override public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.KILL_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); + Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); - KillTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskResponseCommand.class); + TaskKillResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class); logger.info("received command : {}", responseCommand); logger.info("已经接受到了worker杀任务的回应"); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java index ed761539f9..93ca4abd6d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java @@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; @@ -63,9 +63,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor { */ @Override public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); + Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); - ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class); + TaskExecuteResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class); logger.info("received command : {}", responseCommand); taskInstanceCacheManager.cacheTaskInstance(responseCommand); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java index 07f9168594..995a4e8c21 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThread.java @@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; -import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyKillManager; +import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +57,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { private TaskInstanceCacheManager taskInstanceCacheManager; - private NettyKillManager nettyKillManager; + private NettyExecutorManager nettyExecutorManager; /** * constructor of MasterTaskExecThread @@ -67,7 +67,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){ super(taskInstance, processInstance); this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class); - this.nettyKillManager = SpringApplicationContext.getBean(NettyKillManager.class); + this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class); } /** @@ -191,7 +191,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { host.setPort(12346); executionContext.setHost(host); - nettyKillManager.execute(executionContext); + nettyExecutorManager.executeDirectly(executionContext); logger.info("master add kill task :{} id:{} to kill queue", taskInstance.getName(), taskInstance.getId() ); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index ec43dd875c..cfb94b550e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -127,8 +127,8 @@ public class WorkerServer implements IStoppable { //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new TaskExecuteProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.start(); this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/KillTaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/KillTaskCallbackService.java deleted file mode 100644 index 65342bd04d..0000000000 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/KillTaskCallbackService.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.worker.processor; - - -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; -import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand; -import org.apache.dolphinscheduler.remote.config.NettyClientConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentHashMap; - -/** - * taks callback service - */ -public class KillTaskCallbackService { - - private final Logger logger = LoggerFactory.getLogger(KillTaskCallbackService.class); - - /** - * remote channels - */ - private static final ConcurrentHashMap REMOTE_CHANNELS = new ConcurrentHashMap<>(); - - /** - * netty remoting client - */ - private final NettyRemotingClient nettyRemotingClient; - - - public KillTaskCallbackService(){ - final NettyClientConfig clientConfig = new NettyClientConfig(); - this.nettyRemotingClient = new NettyRemotingClient(clientConfig); - } - - /** - * add callback channel - * @param taskInstanceId taskInstanceId - * @param channel channel - */ - public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel){ - REMOTE_CHANNELS.put(taskInstanceId, channel); - } - - /** - * get callback channel - * @param taskInstanceId taskInstanceId - * @return callback channel - */ - public NettyRemoteChannel getRemoteChannel(int taskInstanceId){ - NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId); - if(nettyRemoteChannel.isActive()){ - return nettyRemoteChannel; - } - Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost()); - if(newChannel != null){ - NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque()); - addRemoteChannel(taskInstanceId, remoteChannel); - return remoteChannel; - } - return null; - } - - /** - * remove callback channels - * @param taskInstanceId taskInstanceId - */ - public void remove(int taskInstanceId){ - REMOTE_CHANNELS.remove(taskInstanceId); - } - - /** - * send result - * - * @param taskInstanceId taskInstanceId - * @param killTaskResponseCommand killTaskResponseCommand - */ - public void sendKillResult(int taskInstanceId, KillTaskResponseCommand killTaskResponseCommand){ - NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId); - if(nettyRemoteChannel == null){ - //TODO - } else{ - nettyRemoteChannel.writeAndFlush(killTaskResponseCommand.convert2Command()).addListener(new ChannelFutureListener(){ - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if(future.isSuccess()){ - remove(taskInstanceId); - return; - } - } - }); - } - } -} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java index 23ac7e2ddc..02d889ba4d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java @@ -22,17 +22,18 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.apache.dolphinscheduler.remote.NettyRemotingClient; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; import java.util.concurrent.ConcurrentHashMap; /** * taks callback service */ +@Service public class TaskCallbackService { private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class); @@ -92,14 +93,14 @@ public class TaskCallbackService { /** * send ack * @param taskInstanceId taskInstanceId - * @param ackCommand ackCommand + * @param command command */ - public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){ + public void sendAck(int taskInstanceId, Command command){ NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId); if(nettyRemoteChannel == null){ //TODO } else{ - nettyRemoteChannel.writeAndFlush(ackCommand.convert2Command()); + nettyRemoteChannel.writeAndFlush(command); } } @@ -107,14 +108,14 @@ public class TaskCallbackService { * send result * * @param taskInstanceId taskInstanceId - * @param responseCommand responseCommand + * @param command command */ - public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){ + public void sendResult(int taskInstanceId, Command command){ NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId); if(nettyRemoteChannel == null){ //TODO } else{ - nettyRemoteChannel.writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){ + nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 98e4e92235..b53ef1720b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; @@ -69,18 +69,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private final TaskCallbackService taskCallbackService; public TaskExecuteProcessor(){ - this.taskCallbackService = new TaskCallbackService(); + this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads()); } @Override public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(), + Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); logger.info("received command : {}", command); - ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( - command.getBody(), ExecuteTaskRequestCommand.class); + TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize( + command.getBody(), TaskExecuteRequestCommand.class); String contextJson = taskRequestCommand.getTaskExecutionContext(); @@ -105,8 +105,8 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { private void doAck(TaskExecutionContext taskExecutionContext){ // tell master that task is in executing - ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext); - taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand); + TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext); + taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command()); } /** @@ -134,10 +134,10 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { /** * build ack command * @param taskExecutionContext taskExecutionContext - * @return ExecuteTaskAckCommand + * @return TaskExecuteAckCommand */ - private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) { - ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand(); + private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) { + TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode()); ackCommand.setLogPath(getTaskLogPath(taskExecutionContext)); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index c910aed279..c23f1995e4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.common.utils.Preconditions; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; -import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand; -import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; +import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; @@ -58,7 +58,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { /** * task callback service */ - private final KillTaskCallbackService killTaskCallbackService; + private final TaskCallbackService taskCallbackService; /** * taskExecutionContextCacheManager @@ -72,7 +72,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { public TaskKillProcessor(){ - this.killTaskCallbackService = new KillTaskCallbackService(); + this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class); } @@ -115,41 +115,41 @@ public class TaskKillProcessor implements NettyRequestProcessor { @Override public void process(Channel channel, Command command) { - Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); - KillTaskRequestCommand killTaskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class); - logger.info("received command : {}", killTaskRequestCommand); + Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); + TaskKillRequestCommand taskKillRequestCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class); + logger.info("received command : {}", taskKillRequestCommand); - String contextJson = killTaskRequestCommand.getTaskExecutionContext(); + String contextJson = taskKillRequestCommand.getTaskExecutionContext(); TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class); Boolean killStatus = doKill(taskExecutionContext); - killTaskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), + taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(), new NettyRemoteChannel(channel, command.getOpaque())); - KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus); - killTaskCallbackService.sendKillResult(killTaskResponseCommand.getTaskInstanceId(),killTaskResponseCommand); + TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus); + taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command()); } /** - * build KillTaskResponseCommand + * build TaskKillResponseCommand * * @param taskExecutionContext taskExecutionContext * @param killStatus killStatus - * @return build KillTaskResponseCommand + * @return build TaskKillResponseCommand */ - private KillTaskResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext, + private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext, Boolean killStatus) { - KillTaskResponseCommand killTaskResponseCommand = new KillTaskResponseCommand(); - killTaskResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); - killTaskResponseCommand.setHost(taskExecutionContext.getHost()); - killTaskResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode()); - killTaskResponseCommand.setProcessId(taskExecutionContext.getProcessId()); - killTaskResponseCommand.setAppIds(appIds); - - return killTaskResponseCommand; + TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand(); + taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); + taskKillResponseCommand.setHost(taskExecutionContext.getHost()); + taskKillResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode()); + taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); + taskKillResponseCommand.setAppIds(appIds); + + return taskKillResponseCommand; } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index d3161eca59..347dfb620a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.utils.*; -import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; +import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; @@ -75,7 +75,7 @@ public class TaskExecuteThread implements Runnable { @Override public void run() { - ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId()); + TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId()); try { logger.info("script path : {}", taskExecutionContext.getExecutePath()); // task node @@ -134,7 +134,7 @@ public class TaskExecuteThread implements Runnable { responseCommand.setProcessId(task.getProcessId()); responseCommand.setAppIds(task.getAppIds()); } finally { - taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand); + taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command()); } }