From 6c7e00c4aa51172dd36d505885ebcd848eed4f3c Mon Sep 17 00:00:00 2001 From: caishunfeng Date: Wed, 27 Jul 2022 19:21:21 +0800 Subject: [PATCH] [Bug-11101] fix task failover NPE (#11168) --- .../master/service/MasterFailoverService.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index f89c872784..856418b509 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactor import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; +import org.apache.dolphinscheduler.spi.utils.StringUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.time.StopWatch; @@ -232,14 +233,7 @@ public class MasterFailoverService { // kill worker task, When the master failover and worker failover happened in the same time, // the task may not be failover if we don't set NEED_FAULT_TOLERANCE. // This can be improved if we can load all task when cache a workflowInstance in memory - try { - TaskKillRequestCommand killCommand = new TaskKillRequestCommand(taskInstance.getId()); - Host workerHost = Host.of(taskInstance.getHost()); - nettyExecutorManager.doExecute(workerHost, killCommand.convert2Command()); - LOGGER.info("Failover task success, has killed the task in worker: {}", taskInstance.getHost()); - } catch (ExecuteException e) { - LOGGER.error("Kill task failed", e); - } + sendKillCommandToWorker(taskInstance); } else { LOGGER.info("The failover taskInstance is a master task"); } @@ -249,6 +243,20 @@ public class MasterFailoverService { processService.saveTaskInstance(taskInstance); } + private void sendKillCommandToWorker(@NonNull TaskInstance taskInstance) { + if (StringUtils.isEmpty(taskInstance.getHost())) { + return; + } + try { + TaskKillRequestCommand killCommand = new TaskKillRequestCommand(taskInstance.getId()); + Host workerHost = Host.of(taskInstance.getHost()); + nettyExecutorManager.doExecute(workerHost, killCommand.convert2Command()); + LOGGER.info("Failover task success, has killed the task in worker: {}", taskInstance.getHost()); + } catch (ExecuteException e) { + LOGGER.error("Kill task failed", e); + } + } + private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) { if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) { // The task is already finished, so we don't need to failover this task instance