Browse Source

Fix kill yarn job error when failover caused by doesn't set ProcessDefinition (#10948)

(cherry picked from commit b245e7c973)
3.0.0/version-upgrade
Wenjun Ruan 2 years ago
parent
commit
44ddb6908e
  1. 21
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java
  2. 9
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

21
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/MasterFailoverService.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -114,16 +115,18 @@ public class MasterFailoverService {
* @param masterHost master host
*/
private void doFailoverMaster(@NonNull String masterHost) {
LOGGER.info("Master[{}] failover starting, need to failover process", masterHost);
StopWatch failoverTimeCost = StopWatch.createStarted();
Optional<Date> masterStartupTimeOptional =
getServerStartupTime(registryClient.getServerList(NodeType.MASTER), masterHost);
List<ProcessInstance> needFailoverProcessInstanceList =
processService.queryNeedFailoverProcessInstances(masterHost);
Optional<Date> masterStartupTimeOptional = getServerStartupTime(registryClient.getServerList(NodeType.MASTER),
masterHost);
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(
masterHost);
if (CollectionUtils.isEmpty(needFailoverProcessInstanceList)) {
return;
}
LOGGER.info(
"Master[{}] failover there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
"Master[{}] failover starting there are {} workflowInstance may need to failover, will do a deep check, workflowInstanceIds: {}",
masterHost,
needFailoverProcessInstanceList.size(),
needFailoverProcessInstanceList.stream().map(ProcessInstance::getId).collect(Collectors.toList()));
@ -136,6 +139,11 @@ public class MasterFailoverService {
LOGGER.info("WorkflowInstance doesn't need to failover");
continue;
}
// todo: use batch query
ProcessDefinition processDefinition
= processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
processInstance.setProcessDefinition(processDefinition);
int processInstanceId = processInstance.getId();
List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(processInstanceId);
for (TaskInstance taskInstance : taskInstanceList) {
@ -205,6 +213,7 @@ public class MasterFailoverService {
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) {

9
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/service/WorkerFailoverService.java

@ -115,10 +115,10 @@ public class WorkerFailoverService {
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
try {
ProcessInstance processInstance =
processInstanceCacheMap.computeIfAbsent(taskInstance.getProcessInstanceId(), k -> {
WorkflowExecuteRunnable workflowExecuteRunnable =
cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId());
ProcessInstance processInstance = processInstanceCacheMap.computeIfAbsent(
taskInstance.getProcessInstanceId(), k -> {
WorkflowExecuteRunnable workflowExecuteRunnable = cacheManager.getByProcessInstanceId(
taskInstance.getProcessInstanceId());
if (workflowExecuteRunnable == null) {
return null;
}
@ -167,6 +167,7 @@ public class WorkerFailoverService {
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.buildProcessDefinitionRelatedInfo(processInstance.getProcessDefinition())
.create();
if (masterConfig.isKillYarnJobWhenTaskFailover()) {

Loading…
Cancel
Save