Browse Source

change the failover process.

pull/2/head
lenboo 6 years ago
parent
commit
d0ca95ead2
  1. 2
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  2. 5
      escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java
  3. 76
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

2
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -58,6 +58,7 @@ public class ProcessDao extends AbstractBaseDao {
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXEUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()};
@Autowired
@ -1569,7 +1570,6 @@ public class ProcessDao extends AbstractBaseDao {
for (ProcessInstance processInstance:processInstanceList){
processNeedFailoverProcessInstances(processInstance);
}
}
@Transactional(value = "TransactionManager",rollbackFor = Exception.class)

5
escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java

@ -273,9 +273,8 @@ public class ProcessUtils {
/**
* find logs and kill yarn tasks
* @param taskInstance
* @throws IOException
*/
public static void killYarnJob(TaskInstance taskInstance) throws Exception {
public static void killYarnJob(TaskInstance taskInstance) {
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
LogClient logClient = new LogClient(taskInstance.getHost(), Constants.RPC_PORT);
@ -295,7 +294,7 @@ public class ProcessUtils {
} catch (Exception e) {
logger.error("kill yarn job failed : " + e.getMessage(),e);
throw new RuntimeException("kill yarn job fail");
// throw new RuntimeException("kill yarn job fail");
}
}
}

76
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

@ -30,6 +30,7 @@ import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.server.ResInfo;
import cn.escheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@ -279,17 +280,9 @@ public class ZKMasterClient extends AbstractZKClient {
for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) {
alertDao.sendServerStopedAlert(1, masterHost, "Master-Server");
}
logger.info("start master failover ...");
List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);
//updateProcessInstance host is null and insert into command
for(ProcessInstance processInstance : needFailoverProcessInstanceList){
processDao.processNeedFailoverProcessInstances(processInstance);
if(StringUtils.isNotEmpty(masterHost)){
FailoverMaster(masterHost);
}
logger.info("master failover end");
}catch (Exception e){
logger.error("master failover failed : " + e.getMessage(),e);
}finally {
@ -331,6 +324,8 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
* monitor worker
*/
@ -369,23 +364,9 @@ public class ZKMasterClient extends AbstractZKClient {
alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server");
}
logger.info("start worker failover ...");
List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);
for(TaskInstance taskInstance : needFailoverTaskInstanceList){
ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if(instance!=null){
taskInstance.setProcessInstance(instance);
}
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskInstance);
}
//updateProcessInstance state value is NEED_FAULT_TOLERANCE
processDao.updateNeedFailoverTaskInstances(workerHost);
logger.info("worker failover end");
if(StringUtils.isNotEmpty(workerHost)){
FailoverWorker(workerHost);
}
}catch (Exception e){
logger.error("worker failover failed : " + e.getMessage(),e);
}
@ -476,6 +457,46 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
* failover worker tasks
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* @param workerHost
*/
private void FailoverWorker(String workerHost){
logger.info("start worker[{}] failover ...", workerHost);
List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);
for(TaskInstance taskInstance : needFailoverTaskInstanceList){
ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if(instance!=null){
taskInstance.setProcessInstance(instance);
}
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskInstance);
}
//updateProcessInstance state value is NEED_FAULT_TOLERANCE
processDao.updateNeedFailoverTaskInstances(workerHost);
logger.info("end worker[{}] failover ...", workerHost);
}
/**
* failover master tasks
* @param masterHost
*/
private void FailoverMaster(String masterHost) {
logger.info("start master failover ...");
List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);
//updateProcessInstance host is null and insert into command
for(ProcessInstance processInstance : needFailoverProcessInstanceList){
processDao.processNeedFailoverProcessInstances(processInstance);
}
logger.info("master failover end");
}
/**
* get host ip
@ -488,6 +509,7 @@ public class ZKMasterClient extends AbstractZKClient {
if(startIndex >= endIndex){
logger.error("parse ip error");
return "";
}
return path.substring(startIndex, endIndex);
}

Loading…
Cancel
Save