diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index a69533928f..dbba51b3dc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -78,14 +78,12 @@ public class ZKMasterClient extends AbstractZKClient { while (!checkZKNodeExists(OSUtils.getHost(), ZKNodeType.MASTER)){ ThreadUtils.sleep(SLEEP_TIME_MILLIS); } - - - // self tolerant + // startup tolerant if (getActiveMasterNum() == 1) { - failoverWorker(null, true); - failoverMaster(null); + removeZKNodePath(null, ZKNodeType.MASTER, true); + removeZKNodePath(null, ZKNodeType.WORKER, true); } - + registerListener(); }catch (Exception e){ logger.error("master start up exception",e); }finally { @@ -131,9 +129,16 @@ public class ZKMasterClient extends AbstractZKClient { mutex = new InterProcessMutex(getZkClient(), failoverPath); mutex.acquire(); - String serverHost = getHostByEventDataPath(path); - // handle dead server - handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); + String serverHost = null; + if(StringUtils.isNotEmpty(path)){ + serverHost = getHostByEventDataPath(path); + if(StringUtils.isEmpty(serverHost)){ + logger.error("server down error: unknown path: {}", path); + return; + } + // handle dead server + handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); + } //failover server if(failover){ failoverServerWhenDown(serverHost, zkNodeType); @@ -155,9 +160,6 @@ public class ZKMasterClient extends AbstractZKClient { * @throws Exception exception */ private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { - if (StringUtils.isEmpty(serverHost)) { - return; - } switch (zkNodeType) { case MASTER: failoverMaster(serverHost); @@ -333,8 +335,11 @@ public class ZKMasterClient extends AbstractZKClient { List needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); + logger.info("failover process list size:{} ", needFailoverProcessInstanceList.size()); //updateProcessInstance host is null and insert into command for(ProcessInstance processInstance : needFailoverProcessInstanceList){ + logger.info("failover process instance id: {} host:{}", + processInstance.getId(), processInstance.getHost()); if(Constants.NULL.equals(processInstance.getHost()) ){ continue; } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 77d4a6d97b..77f75a4c13 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1521,10 +1521,13 @@ public class ProcessService { */ @Transactional(rollbackFor = Exception.class) public void processNeedFailoverProcessInstances(ProcessInstance processInstance){ + logger.info("set null host to process instance:{}", processInstance.getId()); //1 update processInstance host is null processInstance.setHost(Constants.NULL); processInstanceMapper.updateById(processInstance); + logger.info("create failover command for process instance:{}", processInstance.getId()); + //2 insert into recover command Command cmd = new Command(); cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId()); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java index e71cb74e15..647a3c9189 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java @@ -39,14 +39,6 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { */ @Override protected void registerListener() { - treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot() + "/nodes"); - logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot()); - try { - treeCache.start(); - } catch (Exception e) { - logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot()); - throw new RuntimeException(e); - } treeCache.getListenable().addListener((client, event) -> { String path = null == event.getData() ? "" : event.getData().getPath(); @@ -55,7 +47,18 @@ public class ZookeeperCachedOperator extends ZookeeperOperator { } dataChanged(client, event, path); }); + } + @Override + protected void treeCacheStart() { + treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot() + "/nodes"); + logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot()); + try { + treeCache.start(); + } catch (Exception e) { + logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot()); + throw new RuntimeException(e); + } } //for sub class diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java index a2cabce805..2396da485a 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java @@ -56,7 +56,7 @@ public class ZookeeperOperator implements InitializingBean { public void afterPropertiesSet() throws Exception { this.zkClient = buildClient(); initStateLister(); - registerListener(); + treeCacheStart(); } /** @@ -64,6 +64,8 @@ public class ZookeeperOperator implements InitializingBean { */ protected void registerListener(){} + protected void treeCacheStart(){} + public void initStateLister() { checkNotNull(zkClient);