From d0ca95ead29da7755805997592152a91ef481faf Mon Sep 17 00:00:00 2001 From: lenboo Date: Sat, 8 Jun 2019 00:18:43 +0800 Subject: [PATCH] change the failover process. --- .../java/cn/escheduler/dao/ProcessDao.java | 2 +- .../escheduler/server/utils/ProcessUtils.java | 5 +- .../escheduler/server/zk/ZKMasterClient.java | 76 ++++++++++++------- 3 files changed, 52 insertions(+), 31 deletions(-) diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index d7dfa6225f..946bf3496b 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -58,6 +58,7 @@ public class ProcessDao extends AbstractBaseDao { private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), ExecutionStatus.RUNNING_EXEUTION.ordinal(), ExecutionStatus.READY_PAUSE.ordinal(), + ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(), ExecutionStatus.READY_STOP.ordinal()}; @Autowired @@ -1569,7 +1570,6 @@ public class ProcessDao extends AbstractBaseDao { for (ProcessInstance processInstance:processInstanceList){ processNeedFailoverProcessInstances(processInstance); } - } @Transactional(value = "TransactionManager",rollbackFor = Exception.class) diff --git a/escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java b/escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java index baf82de0df..4980ef3b41 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/utils/ProcessUtils.java @@ -273,9 +273,8 @@ public class ProcessUtils { /** * find logs and kill yarn tasks * @param taskInstance - * @throws IOException */ - public static void killYarnJob(TaskInstance taskInstance) throws Exception { + public static void killYarnJob(TaskInstance taskInstance) { try { Thread.sleep(Constants.SLEEP_TIME_MILLIS); LogClient logClient = new LogClient(taskInstance.getHost(), Constants.RPC_PORT); @@ -295,7 +294,7 @@ public class ProcessUtils { } catch (Exception e) { logger.error("kill yarn job failed : " + e.getMessage(),e); - throw new RuntimeException("kill yarn job fail"); +// throw new RuntimeException("kill yarn job fail"); } } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java index 85f9f6a7d2..fbabb2a82b 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java @@ -30,6 +30,7 @@ import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; import cn.escheduler.server.ResInfo; import cn.escheduler.server.utils.ProcessUtils; +import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.PathChildrenCache; @@ -279,17 +280,9 @@ public class ZKMasterClient extends AbstractZKClient { for (int i = 0; i < Constants.ESCHEDULER_WARN_TIMES_FAILOVER;i++) { alertDao.sendServerStopedAlert(1, masterHost, "Master-Server"); } - - logger.info("start master failover ..."); - - List needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost); - - //updateProcessInstance host is null and insert into command - for(ProcessInstance processInstance : needFailoverProcessInstanceList){ - processDao.processNeedFailoverProcessInstances(processInstance); + if(StringUtils.isNotEmpty(masterHost)){ + FailoverMaster(masterHost); } - - logger.info("master failover end"); }catch (Exception e){ logger.error("master failover failed : " + e.getMessage(),e); }finally { @@ -331,6 +324,8 @@ public class ZKMasterClient extends AbstractZKClient { } + + /** * monitor worker */ @@ -369,23 +364,9 @@ public class ZKMasterClient extends AbstractZKClient { alertDao.sendServerStopedAlert(1, workerHost, "Worker-Server"); } - logger.info("start worker failover ..."); - - - List needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost); - for(TaskInstance taskInstance : needFailoverTaskInstanceList){ - ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); - if(instance!=null){ - taskInstance.setProcessInstance(instance); - } - // only kill yarn job if exists , the local thread has exited - ProcessUtils.killYarnJob(taskInstance); - } - - //updateProcessInstance state value is NEED_FAULT_TOLERANCE - processDao.updateNeedFailoverTaskInstances(workerHost); - - logger.info("worker failover end"); + if(StringUtils.isNotEmpty(workerHost)){ + FailoverWorker(workerHost); + } }catch (Exception e){ logger.error("worker failover failed : " + e.getMessage(),e); } @@ -476,6 +457,46 @@ public class ZKMasterClient extends AbstractZKClient { } + /** + * failover worker tasks + * 1. kill yarn job if there are yarn jobs in tasks. + * 2. change task state from running to need failover. + * @param workerHost + */ + private void FailoverWorker(String workerHost){ + logger.info("start worker[{}] failover ...", workerHost); + + List needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost); + for(TaskInstance taskInstance : needFailoverTaskInstanceList){ + ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); + if(instance!=null){ + taskInstance.setProcessInstance(instance); + } + // only kill yarn job if exists , the local thread has exited + ProcessUtils.killYarnJob(taskInstance); + } + + //updateProcessInstance state value is NEED_FAULT_TOLERANCE + processDao.updateNeedFailoverTaskInstances(workerHost); + logger.info("end worker[{}] failover ...", workerHost); + } + + /** + * failover master tasks + * @param masterHost + */ + private void FailoverMaster(String masterHost) { + logger.info("start master failover ..."); + + List needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost); + + //updateProcessInstance host is null and insert into command + for(ProcessInstance processInstance : needFailoverProcessInstanceList){ + processDao.processNeedFailoverProcessInstances(processInstance); + } + + logger.info("master failover end"); + } /** * get host ip @@ -488,6 +509,7 @@ public class ZKMasterClient extends AbstractZKClient { if(startIndex >= endIndex){ logger.error("parse ip error"); + return ""; } return path.substring(startIndex, endIndex); }