diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java index c7e5b4ea13..61ba7c3fd6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -114,16 +115,18 @@ public class MasterFailoverService { * @param masterHost master host */ private void doFailoverMaster(@NonNull String masterHost) { - LOGGER.info("Master[{}] failover starting, need to failover process", masterHost); StopWatch failoverTimeCost = StopWatch.createStarted(); - Optional masterStartupTimeOptional = - getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost); - List needFailoverProcessInstanceList = - processService.queryNeedFailoverProcessInstances(masterHost); + Optional masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER), + masterHost); + List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances( + masterHost); + if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) { + return; + } LOGGER.info( - "Master[{}] failover there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}", + "Master[{}] failover starting there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}", masterHost, needFailoverProcessInstanceList.size(), needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList())); @@ -136,6 +139,11 @@ public class MasterFailoverService { LOGGER.info("WorkflowInstance doesn't need to failover"); continue; } + // todo: use batch query + ProcessDefinition processDefinition + = processService.findProcessDefinition(processInstance.getProcessDefinitionCode(), + processInstance.getProcessDefinitionVersion()); + processInstance.setProcessDefinition(processDefinition); int processInstanceId = processInstance.getId(); List taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId); for (TaskInstance taskInstance : taskInstanceList) { @@ -205,6 +213,7 @@ public class MasterFailoverService { TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) .buildProcessInstanceRelatedInfo(processInstance) + .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition()) .create(); if (masterConfig.isKillYarnJobWhenTaskFailover()) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java index ec126a3ec3..402ec43354 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java @@ -115,10 +115,10 @@ public class WorkerFailoverService { for (TaskInstance taskInstance : needFailoverTaskInstanceList) { LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId()); try { - ProcessInstance processInstance = - processInstanceCacheMap.computeIfAbsent(taskInstance.getProcessInstanceId(), k -> { - WorkflowExecuteRunnable workflowExecuteRunnable = - cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()); + ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent( + taskInstance.getProcessInstanceId(), k -> { + WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId( + taskInstance.getProcessInstanceId()); if (workflowExecuteRunnable == null) { return null; } @@ -167,6 +167,7 @@ public class WorkerFailoverService { TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) .buildProcessInstanceRelatedInfo(processInstance) + .buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition()) .create(); if (masterConfig.isKillYarnJobWhenTaskFailover()) {