|
|
|
@ -40,6 +40,7 @@ import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactor
|
|
|
|
|
import org.apache.dolphinscheduler.server.utils.ProcessUtils; |
|
|
|
|
import org.apache.dolphinscheduler.service.process.ProcessService; |
|
|
|
|
import org.apache.dolphinscheduler.service.registry.RegistryClient; |
|
|
|
|
import org.apache.dolphinscheduler.spi.utils.StringUtils; |
|
|
|
|
|
|
|
|
|
import org.apache.commons.collections4.CollectionUtils; |
|
|
|
|
import org.apache.commons.lang3.time.StopWatch; |
|
|
|
@ -232,14 +233,7 @@ public class MasterFailoverService {
|
|
|
|
|
// kill worker task, When the master failover and worker failover happened in the same time,
|
|
|
|
|
// the task may not be failover if we don't set NEED_FAULT_TOLERANCE.
|
|
|
|
|
// This can be improved if we can load all task when cache a workflowInstance in memory
|
|
|
|
|
try { |
|
|
|
|
TaskKillRequestCommand killCommand = new TaskKillRequestCommand(taskInstance.getId()); |
|
|
|
|
Host workerHost = Host.of(taskInstance.getHost()); |
|
|
|
|
nettyExecutorManager.doExecute(workerHost, killCommand.convert2Command()); |
|
|
|
|
LOGGER.info("Failover task success, has killed the task in worker: {}", taskInstance.getHost()); |
|
|
|
|
} catch (ExecuteException e) { |
|
|
|
|
LOGGER.error("Kill task failed", e); |
|
|
|
|
} |
|
|
|
|
sendKillCommandToWorker(taskInstance); |
|
|
|
|
} else { |
|
|
|
|
LOGGER.info("The failover taskInstance is a master task"); |
|
|
|
|
} |
|
|
|
@ -249,6 +243,20 @@ public class MasterFailoverService {
|
|
|
|
|
processService.saveTaskInstance(taskInstance); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private void sendKillCommandToWorker(@NonNull TaskInstance taskInstance) { |
|
|
|
|
if (StringUtils.isEmpty(taskInstance.getHost())) { |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
try { |
|
|
|
|
TaskKillRequestCommand killCommand = new TaskKillRequestCommand(taskInstance.getId()); |
|
|
|
|
Host workerHost = Host.of(taskInstance.getHost()); |
|
|
|
|
nettyExecutorManager.doExecute(workerHost, killCommand.convert2Command()); |
|
|
|
|
LOGGER.info("Failover task success, has killed the task in worker: {}", taskInstance.getHost()); |
|
|
|
|
} catch (ExecuteException e) { |
|
|
|
|
LOGGER.error("Kill task failed", e); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private boolean checkTaskInstanceNeedFailover(@NonNull TaskInstance taskInstance) { |
|
|
|
|
if (taskInstance.getState() != null && taskInstance.getState().typeIsFinished()) { |
|
|
|
|
// The task is already finished, so we don't need to failover this task instance
|
|
|
|
|