Browse Source

chang master/worker failover process.

pull/2/head
lenboo 5 years ago
parent
commit
b1fbd2170e
  1. 17
      escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java
  2. 7
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java
  3. 7
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java
  4. 17
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapper.java
  5. 15
      escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapperProvider.java
  6. 3
      escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java
  7. 27
      escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java
  8. 134
      escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

17
escheduler-dao/src/main/java/cn/escheduler/dao/ProcessDao.java

@ -58,7 +58,7 @@ public class ProcessDao extends AbstractBaseDao {
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXEUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
// ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()};
@Autowired
@ -97,6 +97,9 @@ public class ProcessDao extends AbstractBaseDao {
@Autowired
private ErrorCommandMapper errorCommandMapper;
@Autowired
private WorkerServerMapper workerServerMapper;
/**
* task queue impl
*/
@ -122,6 +125,7 @@ public class ProcessDao extends AbstractBaseDao {
udfFuncMapper = getMapper(UdfFuncMapper.class);
resourceMapper = getMapper(ResourceMapper.class);
workerGroupMapper = getMapper(WorkerGroupMapper.class);
workerServerMapper = getMapper(WorkerServerMapper.class);
taskQueue = TaskQueueFactory.getTaskQueueInstance();
}
@ -1636,6 +1640,17 @@ public class ProcessDao extends AbstractBaseDao {
return workerGroupMapper.queryById(workerGroupId);
}
/**
* query worker server by host
* @param host
* @return
*/
public List<WorkerServer> queryWorkerServerByHost(String host){
return workerServerMapper.queryWorkerByHost(host);
}
}

7
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/ProcessInstanceMapperProvider.java

@ -402,7 +402,12 @@ public class ProcessInstanceMapperProvider {
FROM(TABLE_NAME);
WHERE("`host` = #{host} and `state` in (" + strStates.toString() +")");
Object host = parameter.get("host");
if(host != null && StringUtils.isNotEmpty(host.toString())){
WHERE("`host` = #{host} ");
}
WHERE("`state` in (" + strStates.toString() +")");
ORDER_BY("`id` asc");

7
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/TaskInstanceMapperProvider.java

@ -228,7 +228,12 @@ public class TaskInstanceMapperProvider {
SELECT("*, UNIX_TIMESTAMP(end_time)-UNIX_TIMESTAMP(start_time) as duration");
FROM(TABLE_NAME);
WHERE("`host` = #{host} and `state` in (" + strStates.toString() +")");
Object host = parameter.get("host");
if(host != null && StringUtils.isNotEmpty(host.toString())){
WHERE("`host` = #{host} ");
}
WHERE("`state` in (" + strStates.toString() +")");
ORDER_BY("`id` asc");
}
}.toString();

17
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapper.java

@ -42,6 +42,23 @@ public interface WorkerServerMapper {
@SelectProvider(type = WorkerServerMapperProvider.class, method = "queryAllWorker")
List<WorkerServer> queryAllWorker();
/**
* query worker list
*
* @return
*/
@Results(value = {
@Result(property = "id", column = "id", javaType = Integer.class, jdbcType = JdbcType.INTEGER),
@Result(property = "host", column = "host", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "port", column = "port", javaType = int.class, jdbcType = JdbcType.INTEGER),
@Result(property = "zkDirectory", column = "zk_directory", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "resInfo", column = "res_info", javaType = String.class, jdbcType = JdbcType.VARCHAR),
@Result(property = "createTime", column = "create_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP),
@Result(property = "lastHeartbeatTime", column = "last_heartbeat_time", javaType = Date.class, jdbcType = JdbcType.TIMESTAMP)
})
@SelectProvider(type = WorkerServerMapperProvider.class, method = "queryWorkerByHost")
List<WorkerServer> queryWorkerByHost(@Param("host") String host);
/**
* insert worker server
*

15
escheduler-dao/src/main/java/cn/escheduler/dao/mapper/WorkerServerMapperProvider.java

@ -37,6 +37,21 @@ public class WorkerServerMapperProvider {
}}.toString();
}
/**
* query worker list
* @return
*/
public String queryWorkerByHost(Map<String, Object> parameter) {
return new SQL() {{
SELECT("*");
FROM(TABLE_NAME);
WHERE("host = #{host}");
}}.toString();
}
/**
* insert worker server
* @param parameter

3
escheduler-dao/src/main/java/cn/escheduler/dao/utils/DagHelper.java

@ -105,8 +105,7 @@ public class DagHelper {
}
for (TaskNode taskNode : tmpTaskNodeList) {
if ( !taskNode.isForbidden()
&& null == findNodeByName(destTaskNodeList, taskNode.getName())) {
if (null == findNodeByName(destTaskNodeList, taskNode.getName())) {
destTaskNodeList.add(taskNode);
}
}

27
escheduler-server/src/main/java/cn/escheduler/server/master/runner/MasterExecThread.java

@ -411,6 +411,25 @@ public class MasterExecThread implements Runnable {
return taskInstance;
}
private Collection<String> getStartVertex(String parentNodeName, DAG<String, TaskNode, TaskNodeRelation> dag){
Collection<String> startVertex = null;
if(StringUtils.isNotEmpty(parentNodeName)){
startVertex = dag.getSubsequentNodes(parentNodeName);
}else{
startVertex = dag.getBeginNode();
}
for(String start : startVertex){
TaskNode node = dag.getNode(start);
if(node.isForbidden()){
}
}
return startVertex;
}
/**
* get post task instance by node
*
@ -421,12 +440,8 @@ public class MasterExecThread implements Runnable {
private List<TaskInstance> getPostTaskInstanceByNode(DAG<String, TaskNode, TaskNodeRelation> dag, String parentNodeName){
List<TaskInstance> postTaskList = new ArrayList<>();
Collection<String> startVertex = null;
if(StringUtils.isNotEmpty(parentNodeName)){
startVertex = dag.getSubsequentNodes(parentNodeName);
}else{
startVertex = dag.getBeginNode();
}
Collection<String> startVertex = getStartVertex(parentNodeName, dag);
for (String nodeName : startVertex){
// encapsulation task instance

134
escheduler-server/src/main/java/cn/escheduler/server/zk/ZKMasterClient.java

@ -18,6 +18,7 @@ package cn.escheduler.server.zk;
import cn.escheduler.common.Constants;
import cn.escheduler.common.enums.ExecutionStatus;
import cn.escheduler.common.enums.ZKNodeType;
import cn.escheduler.common.utils.CollectionUtils;
import cn.escheduler.common.utils.DateUtils;
import cn.escheduler.common.utils.OSUtils;
@ -28,11 +29,11 @@ import cn.escheduler.dao.ProcessDao;
import cn.escheduler.dao.ServerDao;
import cn.escheduler.dao.model.ProcessInstance;
import cn.escheduler.dao.model.TaskInstance;
import cn.escheduler.dao.model.WorkerServer;
import cn.escheduler.server.ResInfo;
import cn.escheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
@ -135,7 +136,8 @@ public class ZKMasterClient extends AbstractZKClient {
// check if fault tolerance is required,failure and tolerance
if (getActiveMasterNum() == 1) {
processDao.masterStartupFaultTolerant();
// processDao.masterStartupFaultTolerant();
failoverMaster(null);
}
}catch (Exception e){
@ -191,31 +193,20 @@ public class ZKMasterClient extends AbstractZKClient {
Date now = new Date();
createTime = now ;
try {
String osHost = OSUtils.getHost();
// encapsulation master znnode
masterZNode = masterZNodeParentPath + "/" + OSUtils.getHost() + "_";
List<String> masterZNodeList = zkClient.getChildren().forPath(masterZNodeParentPath);
if (CollectionUtils.isNotEmpty(masterZNodeList)){
boolean flag = false;
for (String masterZNode : masterZNodeList){
if (masterZNode.startsWith(OSUtils.getHost())){
flag = true;
break;
}
}
if (flag){
logger.error("register failure , master already started on host : {}" , OSUtils.getHost());
// exit system
System.exit(-1);
}
// zookeeper node exists, cannot start a new one.
if(checkZKNodeExists(osHost, ZKNodeType.MASTER)){
logger.error("register failure , master already started on host : {}" , osHost);
// exit system
System.exit(-1);
}
// specify the format of stored data in ZK nodes
String heartbeatZKInfo = getOsInfo(now);
// create temporary sequence nodes for master znode
masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(masterZNode, heartbeatZKInfo.getBytes());
masterZNode = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(
masterZNodeParentPath + "/" + OSUtils.getHost() + "_", heartbeatZKInfo.getBytes());
logger.info("register master node {} success" , masterZNode);
@ -239,6 +230,46 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
* check the zookeeper node already exists
* @param host
* @param zkNodeType
* @return
* @throws Exception
*/
private boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) throws Exception {
String path = null;
switch (zkNodeType){
case MASTER:
path = masterZNodeParentPath;
break;
case WORKER:
path = workerZNodeParentPath;
break;
case DEAD_SERVER:
path = deadServerZNodeParentPath;
break;
default:
break;
}
if(StringUtils.isEmpty(path)){
logger.error("check zk node exists error, host:{}, zk node type:{}", host, zkNodeType.toString());
return false;
}
List<String> masterZNodeList = null;
masterZNodeList = zkClient.getChildren().forPath(path);
if (CollectionUtils.isNotEmpty(masterZNodeList)){
for (String masterZNode : masterZNodeList){
if (masterZNode.startsWith(host)){
return true;
}
}
}
return false;
}
/**
* monitor master
*/
@ -281,7 +312,7 @@ public class ZKMasterClient extends AbstractZKClient {
alertDao.sendServerStopedAlert(1, masterHost, "Master-Server");
}
if(StringUtils.isNotEmpty(masterHost)){
FailoverMaster(masterHost);
failoverMaster(masterHost);
}
}catch (Exception e){
logger.error("master failover failed : " + e.getMessage(),e);
@ -365,7 +396,7 @@ public class ZKMasterClient extends AbstractZKClient {
}
if(StringUtils.isNotEmpty(workerHost)){
FailoverWorker(workerHost);
failoverWorker(workerHost, true);
}
}catch (Exception e){
logger.error("worker failover failed : " + e.getMessage(),e);
@ -457,27 +488,76 @@ public class ZKMasterClient extends AbstractZKClient {
}
/**
* task needs failover if task start before worker starts
*
* @param taskInstance
* @return
*/
private boolean checkTaskInstanceNeedFailover(TaskInstance taskInstance) throws Exception {
boolean taskNeedFailover = true;
// if the worker node exists in zookeeper, we must check the task starts after the worker
if(checkZKNodeExists(taskInstance.getHost(), ZKNodeType.WORKER)){
//if task start after worker starts, there is no need to failover the task.
if(checkTaskAfterWorkerStart(taskInstance)){
taskNeedFailover = false;
}
}
return taskNeedFailover;
}
/**
* check task start after the worker server starts.
* @param taskInstance
* @return
*/
private boolean checkTaskAfterWorkerStart(TaskInstance taskInstance) {
Date workerServerStartDate = null;
List<WorkerServer> workerServers = processDao.queryWorkerServerByHost(taskInstance.getHost());
if(workerServers.size() > 0){
workerServerStartDate = workerServers.get(0).getCreateTime();
}
if(workerServerStartDate != null){
return taskInstance.getStartTime().after(workerServerStartDate);
}else{
return false;
}
}
/**
* failover worker tasks
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* @param workerHost
*/
private void FailoverWorker(String workerHost){
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
logger.info("start worker[{}] failover ...", workerHost);
List<TaskInstance> needFailoverTaskInstanceList = processDao.queryNeedFailoverTaskInstances(workerHost);
for(TaskInstance taskInstance : needFailoverTaskInstanceList){
if(needCheckWorkerAlive){
if(!checkTaskInstanceNeedFailover(taskInstance)){
continue;
}
}
ProcessInstance instance = processDao.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
if(instance!=null){
taskInstance.setProcessInstance(instance);
}
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskInstance);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
processDao.saveTaskInstance(taskInstance);
}
//updateProcessInstance state value is NEED_FAULT_TOLERANCE
processDao.updateNeedFailoverTaskInstances(workerHost);
//update task Instance state value is NEED_FAULT_TOLERANCE
// processDao.updateNeedFailoverTaskInstances(workerHost);
logger.info("end worker[{}] failover ...", workerHost);
}
@ -485,7 +565,7 @@ public class ZKMasterClient extends AbstractZKClient {
* failover master tasks
* @param masterHost
*/
private void FailoverMaster(String masterHost) {
private void failoverMaster(String masterHost) {
logger.info("start master failover ...");
List<ProcessInstance> needFailoverProcessInstanceList = processDao.queryNeedFailoverProcessInstances(masterHost);

Loading…
Cancel
Save