Browse Source

[bug fix]fix bug: After the master is fault-tolerant, it cannot resume operation (#2813)

* feature: add number configuration for master dispatch tasks

* fix bug(#2762) the master would be blocked when worker group not exists

* fix bug(#2762) the master would be blocked when worker group not exists

* fix ut

* fix ut

* fix bug(2781): cannot pause work flow when task state is "submit success"

* fix code smell

* add mysql other param blank judge

* test

* update comments

* update comments

* add ut

* fix bug: Restart the worker service again, the previously submitted successful tasks are not executed

* update comments

* add sleep

* add null point check

* fix bug:After the master is fault-tolerant, it cannot resume operation

* fix bug: do not failover the host is 'NULL' process

Co-authored-by: baoliang <baoliang@analysys.com.cn>
pull/3/MERGE
bao liang 5 years ago committed by GitHub
parent
commit
1caac70215
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  2. 6
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  3. 6
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

3
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -335,6 +335,9 @@ public class ZKMasterClient extends AbstractZKClient {
//updateProcessInstance host is null and insert into command
for(ProcessInstance processInstance : needFailoverProcessInstanceList){
if(Constants.NULL.equals(processInstance.getHost()) ){
continue;
}
processService.processNeedFailoverProcessInstances(processInstance);
}

6
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -121,10 +121,6 @@ public class ProcessService {
logger.info("there is not enough thread for this command: {}", command);
return setWaitingThreadProcess(command, processInstance);
}
if (processInstance.getCommandType().equals(CommandType.RECOVER_TOLERANCE_FAULT_PROCESS)){
delCommandByid(command.getId());
return null;
}
processInstance.setCommandType(command.getCommandType());
processInstance.addHistoryCmd(command.getCommandType());
saveProcessInstance(processInstance);
@ -1484,7 +1480,7 @@ public class ProcessService {
@Transactional(rollbackFor = Exception.class)
public void processNeedFailoverProcessInstances(ProcessInstance processInstance){
//1 update processInstance host is null
processInstance.setHost("null");
processInstance.setHost(Constants.NULL);
processInstanceMapper.updateById(processInstance);
//2 insert into recover command

6
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -125,12 +125,16 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
List<Server> masterServers = new ArrayList<>();
for (Map.Entry<String, String> entry : masterMap.entrySet()) {
Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
if(masterServer == null){
continue;
}
String key = entry.getKey();
masterServer.setZkDirectory(parentPath + "/"+ key);
//set host and port
String[] hostAndPort=key.split(COLON);
String[] hosts=hostAndPort[0].split(DIVISION_STRING);
masterServer.setHost(hosts[hosts.length-1]);// fetch the last one
// fetch the last one
masterServer.setHost(hosts[hosts.length-1]);
masterServer.setPort(Integer.parseInt(hostAndPort[1]));
masterServers.add(masterServer);
}

Loading…
Cancel
Save