Browse Source

fix failover logic (#7276)

2.0.7-release
wind 3 years ago committed by GitHub
parent
commit
9a307dc71a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 140
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

140
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java

@ -47,7 +47,9 @@ import org.apache.commons.lang.StringUtils;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -194,7 +196,7 @@ public class MasterRegistryClient {
failoverMaster(serverHost); failoverMaster(serverHost);
break; break;
case WORKER: case WORKER:
failoverWorker(serverHost, true, true); failoverWorker(serverHost);
break; break;
default: default:
break; break;
@ -275,78 +277,114 @@ public class MasterRegistryClient {
* 3. failover all tasks when workerHost is null * 3. failover all tasks when workerHost is null
* *
* @param workerHost worker host * @param workerHost worker host
* @param needCheckWorkerAlive need check worker alive
* @param checkOwner need check process instance owner
*/ */
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive, boolean checkOwner) { private void failoverWorker(String workerHost) {
logger.info("start worker[{}] failover ...", workerHost); if (StringUtils.isEmpty(workerHost)) {
return;
}
long startTime = System.currentTimeMillis();
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) { Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
if (needCheckWorkerAlive) { logger.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size());
if (!checkTaskInstanceNeedFailover(taskInstance)) {
continue;
}
}
ProcessInstance processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
if (workerHost == null ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
|| !checkOwner if (processInstance == null) {
|| processInstance.getHost().equalsIgnoreCase(getLocalAddress())) { processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
// only failover the task owned myself if worker down.
if (processInstance == null) { if (processInstance == null) {
logger.error("failover error, the process {} of task {} do not exists.", logger.error("failover task instance error, processInstance {} of taskInstance {} is null",
taskInstance.getProcessInstanceId(), taskInstance.getId()); taskInstance.getProcessInstanceId(), taskInstance.getId());
continue; continue;
} }
taskInstance.setProcessInstance(processInstance); processInstanceCacheMap.put(processInstance.getId(), processInstance);
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
if (!processInstanceExecMaps.containsKey(processInstance.getId())) {
return;
}
WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId());
StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(taskInstance.getState());
workflowExecuteThreadNotify.addStateEvent(stateEvent);
} }
// only failover the task owned myself if worker down.
if (processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
}
} }
logger.info("end worker[{}] failover ...", workerHost); logger.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
} }
/** /**
* failover master tasks * failover master
* <p>
* failover process instance and associated task instance
* *
* @param masterHost master host * @param masterHost master host
*/ */
private void failoverMaster(String masterHost) { private void failoverMaster(String masterHost) {
logger.info("start master failover ..."); if (StringUtils.isEmpty(masterHost)) {
return;
}
long startTime = System.currentTimeMillis();
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
logger.info("start master[{}] failover, process list size:{}", masterHost, needFailoverProcessInstanceList.size());
logger.info("failover process list size:{} ", needFailoverProcessInstanceList.size());
//updateProcessInstance host is null and insert into command
for (ProcessInstance processInstance : needFailoverProcessInstanceList) { for (ProcessInstance processInstance : needFailoverProcessInstanceList) {
logger.info("failover process instance id: {} host:{}", processInstance.getId(), processInstance.getHost());
if (Constants.NULL.equals(processInstance.getHost())) { if (Constants.NULL.equals(processInstance.getHost())) {
continue; continue;
} }
logger.info("failover process instance id: {}", processInstance.getId());
List<TaskInstance> validTaskInstanceList = processService.findValidTaskListByProcessId(processInstance.getId());
for (TaskInstance taskInstance : validTaskInstanceList) {
if (Constants.NULL.equals(taskInstance.getHost())) {
continue;
}
logger.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance);
}
//updateProcessInstance host is null and insert into command
processService.processNeedFailoverProcessInstances(processInstance); processService.processNeedFailoverProcessInstances(processInstance);
} }
failoverWorker(masterHost, true, false);
logger.info("master failover end"); logger.info("master[{}] failover end, useTime:{}ms", masterHost, System.currentTimeMillis() - startTime);
}
private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance) {
if (taskInstance == null) {
logger.error("failover task instance error, taskInstance is null");
return;
}
if (processInstance == null) {
logger.error("failover task instance error, processInstance {} of taskInstance {} is null",
taskInstance.getProcessInstanceId(), taskInstance.getId());
return;
}
if (!checkTaskInstanceNeedFailover(taskInstance)) {
return;
}
taskInstance.setProcessInstance(processInstance);
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
.create();
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processService.saveTaskInstance(taskInstance);
WorkflowExecuteThread workflowExecuteThreadNotify = processInstanceExecMaps.get(processInstance.getId());
if (workflowExecuteThreadNotify == null) {
return;
}
StateEvent stateEvent = new StateEvent();
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(processInstance.getId());
stateEvent.setExecutionStatus(taskInstance.getState());
workflowExecuteThreadNotify.addStateEvent(stateEvent);
} }
/** /**
@ -357,11 +395,11 @@ public class MasterRegistryClient {
localNodePath = getMasterPath(); localNodePath = getMasterPath();
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime,
masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterMaxCpuloadAvg(),
masterConfig.getMasterReservedMemory(), masterConfig.getMasterReservedMemory(),
Sets.newHashSet(getMasterPath()), Sets.newHashSet(getMasterPath()),
Constants.MASTER_TYPE, Constants.MASTER_TYPE,
registryClient); registryClient);
registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo()); registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo());
registryClient.addConnectionStateListener(this::handleConnectionState); registryClient.addConnectionStateListener(this::handleConnectionState);

Loading…
Cancel
Save