From 1caac70215ee62a3bcd984ad05ce4a996f85dd24 Mon Sep 17 00:00:00 2001 From: bao liang <29528966+lenboo@users.noreply.github.com> Date: Wed, 27 May 2020 11:33:10 +0800 Subject: [PATCH] [bug fix]fix bug: After the master is fault-tolerant, it cannot resume operation (#2813) * feature: add number configuration for master dispatch tasks * fix bug(#2762) the master would be blocked when worker group not exists * fix bug(#2762) the master would be blocked when worker group not exists * fix ut * fix ut * fix bug(2781): cannot pause work flow when task state is "submit success" * fix code smell * add mysql other param blank judge * test * update comments * update comments * add ut * fix bug: Restart the worker service again, the previously submitted successful tasks are not executed * update comments * add sleep * add null point check * fix bug:After the master is fault-tolerant, it cannot resume operation * fix bug: do not failover the host is 'NULL' process Co-authored-by: baoliang --- .../apache/dolphinscheduler/server/zk/ZKMasterClient.java | 3 +++ .../dolphinscheduler/service/process/ProcessService.java | 6 +----- .../dolphinscheduler/service/zk/AbstractZKClient.java | 6 +++++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 69aecee444..1b807a7278 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -335,6 +335,9 @@ public class ZKMasterClient extends AbstractZKClient { //updateProcessInstance host is null and insert into command for(ProcessInstance processInstance : needFailoverProcessInstanceList){ + if(Constants.NULL.equals(processInstance.getHost()) ){ + continue; + } processService.processNeedFailoverProcessInstances(processInstance); } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 36e2b60d18..0bab35a821 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -121,10 +121,6 @@ public class ProcessService { logger.info("there is not enough thread for this command: {}", command); return setWaitingThreadProcess(command, processInstance); } - if (processInstance.getCommandType().equals(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS)){ - delCommandByid(command.getId()); - return null; - } processInstance.setCommandType(command.getCommandType()); processInstance.addHistoryCmd(command.getCommandType()); saveProcessInstance(processInstance); @@ -1484,7 +1480,7 @@ public class ProcessService { @Transactional(rollbackFor = Exception.class) public void processNeedFailoverProcessInstances(ProcessInstance processInstance){ //1 update processInstance host is null - processInstance.setHost("null"); + processInstance.setHost(Constants.NULL); processInstanceMapper.updateById(processInstance); //2 insert into recover command diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index acbbe76188..2960969d86 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -125,12 +125,16 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { List masterServers = new ArrayList<>(); for (Map.Entry entry : masterMap.entrySet()) { Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); + if(masterServer == null){ + continue; + } String key = entry.getKey(); masterServer.setZkDirectory(parentPath + "/"+ key); //set host and port String[] hostAndPort=key.split(COLON); String[] hosts=hostAndPort[0].split(DIVISION_STRING); - masterServer.setHost(hosts[hosts.length-1]);// fetch the last one + // fetch the last one + masterServer.setHost(hosts[hosts.length-1]); masterServer.setPort(Integer.parseInt(hostAndPort[1])); masterServers.add(masterServer); }