Browse Source

[Bug-11101] fix task failover NPE (#11168)

3.1.0-release
caishunfeng 2 years ago committed by GitHub
parent
commit
6c7e00c4aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

24
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactor
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.time.StopWatch;
@ -232,14 +233,7 @@ public class MasterFailoverService {
// kill worker task, When the master failover and worker failover happened in the same time, // kill worker task, When the master failover and worker failover happened in the same time,
// the task may not be failover if we don't set NEED_FAULT_TOLERANCE. // the task may not be failover if we don't set NEED_FAULT_TOLERANCE.
// This can be improved if we can load all task when cache a workflowInstance in memory // This can be improved if we can load all task when cache a workflowInstance in memory
try { sendKillCommandToWorker(taskInstance);
TaskKillRequestCommand killCommand = new TaskKillRequestCommand(taskInstance.getId());
Host workerHost = Host.of(taskInstance.getHost());
nettyExecutorManager.doExecute(workerHost, killCommand.convert2Command());
LOGGER.info("Failover task success, has killed the task in worker: {}", taskInstance.getHost());
} catch (ExecuteException e) {
LOGGER.error("Kill task failed", e);
}
} else { } else {
LOGGER.info("The failover taskInstance is a master task"); LOGGER.info("The failover taskInstance is a master task");
} }
@ -249,6 +243,20 @@ public class MasterFailoverService {
processService.saveTaskInstance(taskInstance); processService.saveTaskInstance(taskInstance);
} }
private void sendKillCommandToWorker(@NonNull TaskInstance taskInstance) {
if (StringUtils.isEmpty(taskInstance.getHost())) {
return;
}
try {
TaskKillRequestCommand killCommand = new TaskKillRequestCommand(taskInstance.getId());
Host workerHost = Host.of(taskInstance.getHost());
nettyExecutorManager.doExecute(workerHost, killCommand.convert2Command());
LOGGER.info("Failover task success, has killed the task in worker: {}", taskInstance.getHost());
} catch (ExecuteException e) {
LOGGER.error("Kill task failed", e);
}
}
private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) { private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) {
if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) { if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) {
// The task is already finished, so we don't need to failover this task instance // The task is already finished, so we don't need to failover this task instance

Loading…
Cancel
Save