diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index 61ba7c3fd6..93bdae2431 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -28,8 +28,12 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; +import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; +import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; @@ -63,13 +67,17 @@ public class MasterFailoverService { private final ProcessService processService; private final String localAddress; + private final NettyExecutorManager nettyExecutorManager; + public MasterFailoverService(@NonNull RegistryClient registryClient, @NonNull MasterConfig masterConfig, - @NonNull ProcessService processService) { + @NonNull ProcessService processService, + @NonNull NettyExecutorManager nettyExecutorManager) { this.registryClient = registryClient; this.masterConfig = masterConfig; this.processService = processService; this.localAddress = NetUtils.getAddr(masterConfig.getListenPort()); + this.nettyExecutorManager = nettyExecutorManager; } @@ -221,6 +229,17 @@ public class MasterFailoverService { LOGGER.info("TaskInstance failover begin kill the task related yarn job"); ProcessUtils.killYarnJob(taskExecutionContext); } + // kill worker task, When the master failover and worker failover happened in the same time, + // the task may not be failover if we don't set NEED_FAULT_TOLERANCE. + // This can be improved if we can load all task when cache a workflowInstance in memory + try { + TaskKillRequestCommand killCommand = new TaskKillRequestCommand(taskInstance.getId()); + Host workerHost = Host.of(taskInstance.getHost()); + nettyExecutorManager.doExecute(workerHost, killCommand.convert2Command()); + LOGGER.info("Failover task success, has killed the task in worker: {}", taskInstance.getHost()); + } catch (ExecuteException e) { + LOGGER.error("Kill task failed", e); + } } else { LOGGER.info("The failover taskInstance is a master task"); } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java index 44e5a382f6..ab71cd75cd 100644 --- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/service/FailoverServiceTest.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.event.StateEvent; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool; @@ -82,6 +83,9 @@ public class FailoverServiceTest { @Mock private ProcessInstanceExecCacheManager cacheManager; + @Mock + private NettyExecutorManager nettyExecutorManager; + private static int masterPort = 5678; private static int workerPort = 1234; @@ -100,7 +104,7 @@ public class FailoverServiceTest { given(masterConfig.getListenPort()).willReturn(masterPort); MasterFailoverService masterFailoverService = - new MasterFailoverService(registryClient, masterConfig, processService); + new MasterFailoverService(registryClient, masterConfig, processService, nettyExecutorManager); WorkerFailoverService workerFailoverService = new WorkerFailoverService(registryClient, masterConfig, processService, diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java index 155b31785e..7dc03d778c 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillRequestCommand.java @@ -21,9 +21,16 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import java.io.Serializable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + /** * kill task request command */ +@Data +@NoArgsConstructor +@AllArgsConstructor public class TaskKillRequestCommand implements Serializable { /** @@ -31,14 +38,6 @@ public class TaskKillRequestCommand implements Serializable { */ private int taskInstanceId; - public int getTaskInstanceId() { - return taskInstanceId; - } - - public void setTaskInstanceId(int taskInstanceId) { - this.taskInstanceId = taskInstanceId; - } - /** * package request command * @@ -51,11 +50,4 @@ public class TaskKillRequestCommand implements Serializable { command.setBody(body); return command; } - - @Override - public String toString() { - return "TaskKillRequestCommand{" - + "taskInstanceId=" + taskInstanceId - + '}'; - } }