diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 69aecee444..1b807a7278 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -335,6 +335,9 @@ public class ZKMasterClient extends AbstractZKClient { //updateProcessInstance host is null and insert into command for(ProcessInstance processInstance : needFailoverProcessInstanceList){ + if(Constants.NULL.equals(processInstance.getHost()) ){ + continue; + } processService.processNeedFailoverProcessInstances(processInstance); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 36e2b60d18..0bab35a821 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -121,10 +121,6 @@ public class ProcessService { logger.info("there is not enough thread for this command: {}", command); return setWaitingThreadProcess(command, processInstance); } - if (processInstance.getCommandType().equals(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS)){ - delCommandByid(command.getId()); - return null; - } processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); @@ -1484,7 +1480,7 @@ public class ProcessService { @Transactional(rollbackFor = Exception.class) public void processNeedFailoverProcessInstances(ProcessInstance processInstance){ //1 update processInstance host is null - processInstance.setHost("null"); + processInstance.setHost(Constants.NULL); processInstanceMapper.updateById(processInstance); //2 insert into recover command diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index acbbe76188..2960969d86 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -125,12 +125,16 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { List masterServers = new ArrayList<>(); for (Map.Entry entry : masterMap.entrySet()) { Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); + if(masterServer == null){ + continue; + } String key = entry.getKey(); masterServer.setZkDirectory(parentPath + "/"+ key); //set host and port String[] hostAndPort=key.split(COLON); String[] hosts=hostAndPort[0].split(DIVISION_STRING); - masterServer.setHost(hosts[hosts.length-1]);// fetch the last one + // fetch the last one + masterServer.setHost(hosts[hosts.length-1]); masterServer.setPort(Integer.parseInt(hostAndPort[1])); masterServers.add(masterServer); }