From b1fbd2170e1635788337ea05326afcb5e3d7ca91 Mon Sep 17 00:00:00 2001 From: lenboo Date: Wed, 12 Jun 2019 19:52:00 +0800 Subject: [PATCH] chang master/worker failover process. --- .../java/cn/escheduler/dao/ProcessDao.java | 17 ++- .../mapper/ProcessInstanceMapperProvider.java | 7 +- .../mapper/TaskInstanceMapperProvider.java | 7 +- .../dao/mapper/WorkerServerMapper.java | 17 +++ .../mapper/WorkerServerMapperProvider.java | 15 ++ .../cn/escheduler/dao/utils/DagHelper.java | 3 +- .../master/runner/MasterExecThread.java | 27 +++- .../escheduler/server/zk/ZKMasterClient.java | 134 ++++++++++++++---- 8 files changed, 189 insertions(+), 38 deletions(-) diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java index 946bf3496b..0241016941 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java @@ -58,7 +58,7 @@ public class ProcessDao extends AbstractBaseDao { private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(), ExecutionStatus.RUNNING_EXEUTION.ordinal(), ExecutionStatus.READY_PAUSE.ordinal(), - ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(), +// ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(), ExecutionStatus.READY_STOP.ordinal()}; @Autowired @@ -97,6 +97,9 @@ public class ProcessDao extends AbstractBaseDao { @Autowired private ErrorCommandMapper errorCommandMapper; + @Autowired + private WorkerServerMapper workerServerMapper; + /** * task queue impl */ @@ -122,6 +125,7 @@ public class ProcessDao extends AbstractBaseDao { udfFuncMapper = getMapper(UdfFuncMapper.class); resourceMapper = getMapper(ResourceMapper.class); workerGroupMapper = getMapper(WorkerGroupMapper.class); + workerServerMapper = getMapper(WorkerServerMapper.class); taskQueue = TaskQueueFactory.getTaskQueueInstance(); } @@ -1636,6 +1640,17 @@ public class ProcessDao extends AbstractBaseDao { return workerGroupMapper.queryById(workerGroupId); } + /** + * query worker server by host + * @param host + * @return + */ + public List queryWorkerServerByHost(String host){ + + return workerServerMapper.queryWorkerByHost(host); + + } + } diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java index a6d09d431b..9ff57fc3c4 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java @@ -402,7 +402,12 @@ public class ProcessInstanceMapperProvider { FROM(TABLE_NAME); - WHERE("`host` = #{host} and `state` in (" + strStates.toString() +")"); + Object host = parameter.get("host"); + if(host != null && StringUtils.isNotEmpty(host.toString())){ + + WHERE("`host` = #{host} "); + } + WHERE("`state` in (" + strStates.toString() +")"); ORDER_BY("`id` asc"); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java index 511b0970fe..ce1e69f197 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java @@ -228,7 +228,12 @@ public class TaskInstanceMapperProvider { SELECT("*, UNIX_TIMESTAMP(end_time)-UNIX_TIMESTAMP(start_time) as duration"); FROM(TABLE_NAME); - WHERE("`host` = #{host} and `state` in (" + strStates.toString() +")"); + Object host = parameter.get("host"); + if(host != null && StringUtils.isNotEmpty(host.toString())){ + + WHERE("`host` = #{host} "); + } + WHERE("`state` in (" + strStates.toString() +")"); ORDER_BY("`id` asc"); } }.toString(); diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapper.java index b5ea3aa878..5e511a4edd 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapper.java @@ -42,6 +42,23 @@ public interface WorkerServerMapper { @SelectProvider(type = WorkerServerMapperProvider.class, method = "queryAllWorker") List queryAllWorker(); + /** + * query worker list + * + * @return + */ + @Results(value = { + @Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER), + @Result(property = "host", column = "host", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "port", column = "port", javaType = int.class, jdbcType = JdbcType.INTEGER), + @Result(property = "zkDirectory", column = "zk_directory", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "resInfo", column = "res_info", javaType = String.class, jdbcType = JdbcType.VARCHAR), + @Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP), + @Result(property = "lastHeartbeatTime", column = "last_heartbeat_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP) + }) + @SelectProvider(type = WorkerServerMapperProvider.class, method = "queryWorkerByHost") + List queryWorkerByHost(@Param("host") String host); + /** * insert worker server * diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapperProvider.java b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapperProvider.java index bd5af7deda..15b330e077 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapperProvider.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapperProvider.java @@ -37,6 +37,21 @@ public class WorkerServerMapperProvider { }}.toString(); } + /** + * query worker list + * @return + */ + public String queryWorkerByHost(Map parameter) { + return new SQL() {{ + SELECT("*"); + + FROM(TABLE_NAME); + + WHERE("host = #{host}"); + }}.toString(); + } + + /** * insert worker server * @param parameter diff --git a/escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java b/escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java index bc52e85062..a80a74d2d2 100644 --- a/escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java +++ b/escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java @@ -105,8 +105,7 @@ public class DagHelper { } for (TaskNode taskNode : tmpTaskNodeList) { - if ( !taskNode.isForbidden() - && null == findNodeByName(destTaskNodeList, taskNode.getName())) { + if (null == findNodeByName(destTaskNodeList, taskNode.getName())) { destTaskNodeList.add(taskNode); } } diff --git a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java index f12726c3ab..359cff1ac2 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java @@ -411,6 +411,25 @@ public class MasterExecThread implements Runnable { return taskInstance; } + private Collection getStartVertex(String parentNodeName, DAG dag){ + Collection startVertex = null; + if(StringUtils.isNotEmpty(parentNodeName)){ + startVertex = dag.getSubsequentNodes(parentNodeName); + }else{ + startVertex = dag.getBeginNode(); + } + + for(String start : startVertex){ + TaskNode node = dag.getNode(start); + if(node.isForbidden()){ + + } + + } + + return startVertex; + } + /** * get post task instance by node * @@ -421,12 +440,8 @@ public class MasterExecThread implements Runnable { private List getPostTaskInstanceByNode(DAG dag, String parentNodeName){ List postTaskList = new ArrayList<>(); - Collection startVertex = null; - if(StringUtils.isNotEmpty(parentNodeName)){ - startVertex = dag.getSubsequentNodes(parentNodeName); - }else{ - startVertex = dag.getBeginNode(); - } + Collection startVertex = getStartVertex(parentNodeName, dag); + for (String nodeName : startVertex){ // encapsulation task instance diff --git a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java index fbabb2a82b..3066504a46 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java @@ -18,6 +18,7 @@ package cn.escheduler.server.zk; import cn.escheduler.common.Constants; import cn.escheduler.common.enums.ExecutionStatus; +import cn.escheduler.common.enums.ZKNodeType; import cn.escheduler.common.utils.CollectionUtils; import cn.escheduler.common.utils.DateUtils; import cn.escheduler.common.utils.OSUtils; @@ -28,11 +29,11 @@ import cn.escheduler.dao.ProcessDao; import cn.escheduler.dao.ServerDao; import cn.escheduler.dao.model.ProcessInstance; import cn.escheduler.dao.model.TaskInstance; +import cn.escheduler.dao.model.WorkerServer; import cn.escheduler.server.ResInfo; import cn.escheduler.server.utils.ProcessUtils; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; @@ -135,7 +136,8 @@ public class ZKMasterClient extends AbstractZKClient { // check if fault tolerance is required,failure and tolerance if (getActiveMasterNum() == 1) { - processDao.masterStartupFaultTolerant(); +// processDao.masterStartupFaultTolerant(); + failoverMaster(null); } }catch (Exception e){ @@ -191,31 +193,20 @@ public class ZKMasterClient extends AbstractZKClient { Date now = new Date(); createTime = now ; try { + String osHost = OSUtils.getHost(); - // encapsulation master znnode - masterZNode = masterZNodeParentPath + "/" + OSUtils.getHost() + "_"; - List masterZNodeList = zkClient.getChildren().forPath(masterZNodeParentPath); - - if (CollectionUtils.isNotEmpty(masterZNodeList)){ - boolean flag = false; - for (String masterZNode : masterZNodeList){ - if (masterZNode.startsWith(OSUtils.getHost())){ - flag = true; - break; - } - } - - if (flag){ - logger.error("register failure , master already started on host : {}" , OSUtils.getHost()); - // exit system - System.exit(-1); - } + // zookeeper node exists, cannot start a new one. + if(checkZKNodeExists(osHost, ZKNodeType.MASTER)){ + logger.error("register failure , master already started on host : {}" , osHost); + // exit system + System.exit(-1); } // specify the format of stored data in ZK nodes String heartbeatZKInfo = getOsInfo(now); // create temporary sequence nodes for master znode - masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(masterZNode, heartbeatZKInfo.getBytes()); + masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath( + masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes()); logger.info("register master node {} success" , masterZNode); @@ -239,6 +230,46 @@ public class ZKMasterClient extends AbstractZKClient { } + /** + * check the zookeeper node already exists + * @param host + * @param zkNodeType + * @return + * @throws Exception + */ + private boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception { + + String path = null; + switch (zkNodeType){ + case MASTER: + path = masterZNodeParentPath; + break; + case WORKER: + path = workerZNodeParentPath; + break; + case DEAD_SERVER: + path = deadServerZNodeParentPath; + break; + default: + break; + } + if(StringUtils.isEmpty(path)){ + logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString()); + return false; + } + + List masterZNodeList = null; + masterZNodeList = zkClient.getChildren().forPath(path); + if (CollectionUtils.isNotEmpty(masterZNodeList)){ + for (String masterZNode : masterZNodeList){ + if (masterZNode.startsWith(host)){ + return true; + } + } + } + return false; + } + /** * monitor master */ @@ -281,7 +312,7 @@ public class ZKMasterClient extends AbstractZKClient { alertDao.sendServerStopedAlert(1, masterHost, "Master-Server"); } if(StringUtils.isNotEmpty(masterHost)){ - FailoverMaster(masterHost); + failoverMaster(masterHost); } }catch (Exception e){ logger.error("master failover failed : " + e.getMessage(),e); @@ -365,7 +396,7 @@ public class ZKMasterClient extends AbstractZKClient { } if(StringUtils.isNotEmpty(workerHost)){ - FailoverWorker(workerHost); + failoverWorker(workerHost, true); } }catch (Exception e){ logger.error("worker failover failed : " + e.getMessage(),e); @@ -457,27 +488,76 @@ public class ZKMasterClient extends AbstractZKClient { } + /** + * task needs failover if task start before worker starts + * + * @param taskInstance + * @return + */ + private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception { + + boolean taskNeedFailover = true; + + // if the worker node exists in zookeeper, we must check the task starts after the worker + if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){ + //if task start after worker starts, there is no need to failover the task. + if(checkTaskAfterWorkerStart(taskInstance)){ + taskNeedFailover = false; + } + } + return taskNeedFailover; + } + + /** + * check task start after the worker server starts. + * @param taskInstance + * @return + */ + private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) { + Date workerServerStartDate = null; + List workerServers = processDao.queryWorkerServerByHost(taskInstance.getHost()); + if(workerServers.size() > 0){ + workerServerStartDate = workerServers.get(0).getCreateTime(); + } + + if(workerServerStartDate != null){ + return taskInstance.getStartTime().after(workerServerStartDate); + + }else{ + return false; + } + } + /** * failover worker tasks * 1. kill yarn job if there are yarn jobs in tasks. * 2. change task state from running to need failover. * @param workerHost */ - private void FailoverWorker(String workerHost){ + private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { logger.info("start worker[{}] failover ...", workerHost); List needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost); for(TaskInstance taskInstance : needFailoverTaskInstanceList){ + if(needCheckWorkerAlive){ + if(!checkTaskInstanceNeedFailover(taskInstance)){ + continue; + } + } + ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId()); if(instance!=null){ taskInstance.setProcessInstance(instance); } // only kill yarn job if exists , the local thread has exited ProcessUtils.killYarnJob(taskInstance); + + taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE); + processDao.saveTaskInstance(taskInstance); } - //updateProcessInstance state value is NEED_FAULT_TOLERANCE - processDao.updateNeedFailoverTaskInstances(workerHost); + //update task Instance state value is NEED_FAULT_TOLERANCE + // processDao.updateNeedFailoverTaskInstances(workerHost); logger.info("end worker[{}] failover ...", workerHost); } @@ -485,7 +565,7 @@ public class ZKMasterClient extends AbstractZKClient { * failover master tasks * @param masterHost */ - private void FailoverMaster(String masterHost) { + private void failoverMaster(String masterHost) { logger.info("start master failover ..."); List needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);