Browse Source

[FIX-3929] because of no lock, start up failover would dispatch two same tasks. (#4004)

* fix #3966 sub process doesnot send alert mail after process instance ending.

* fix bug 3964: sub_process The timeout warning does not take effect
add timeout warning for sub_process/dependent task.

* fix code smell

* fix code smell

* fix code smell

* update worker group inherit from parent

* remove stdout in logback configuration

* fix bug #3929 condition task would post error when failover.

* remove unused test

* add comments

* add skip node judge

* fix bug 3929: because of no lock, start up failover would dispatch two same tasks.

Co-authored-by: baoliang <baoliang@analysys.com.cn>
pull/3/MERGE
bao liang 4 years ago committed by GitHub
parent
commit
026957d024
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  2. 3
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  3. 19
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
  4. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -78,14 +78,12 @@ public class ZKMasterClient extends AbstractZKClient {
while (!checkZKNodeExists(OSUtils.getHost(), ZKNodeType.MASTER)){ while (!checkZKNodeExists(OSUtils.getHost(), ZKNodeType.MASTER)){
ThreadUtils.sleep(SLEEP_TIME_MILLIS); ThreadUtils.sleep(SLEEP_TIME_MILLIS);
} }
// startup tolerant
// self tolerant
if (getActiveMasterNum() == 1) { if (getActiveMasterNum() == 1) {
failoverWorker(null, true); removeZKNodePath(null, ZKNodeType.MASTER, true);
failoverMaster(null); removeZKNodePath(null, ZKNodeType.WORKER, true);
} }
registerListener();
}catch (Exception e){ }catch (Exception e){
logger.error("master start up exception",e); logger.error("master start up exception",e);
}finally { }finally {
@ -131,9 +129,16 @@ public class ZKMasterClient extends AbstractZKClient {
mutex = new InterProcessMutex(getZkClient(), failoverPath); mutex = new InterProcessMutex(getZkClient(), failoverPath);
mutex.acquire(); mutex.acquire();
String serverHost = getHostByEventDataPath(path); String serverHost = null;
if(StringUtils.isNotEmpty(path)){
serverHost = getHostByEventDataPath(path);
if(StringUtils.isEmpty(serverHost)){
logger.error("server down error: unknown path: {}", path);
return;
}
// handle dead server // handle dead server
handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP); handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
}
//failover server //failover server
if(failover){ if(failover){
failoverServerWhenDown(serverHost, zkNodeType); failoverServerWhenDown(serverHost, zkNodeType);
@ -155,9 +160,6 @@ public class ZKMasterClient extends AbstractZKClient {
* @throws Exception exception * @throws Exception exception
*/ */
private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception { private void failoverServerWhenDown(String serverHost, ZKNodeType zkNodeType) throws Exception {
if (StringUtils.isEmpty(serverHost)) {
return;
}
switch (zkNodeType) { switch (zkNodeType) {
case MASTER: case MASTER:
failoverMaster(serverHost); failoverMaster(serverHost);
@ -333,8 +335,11 @@ public class ZKMasterClient extends AbstractZKClient {
List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost); List<ProcessInstance> needFailoverProcessInstanceList = processService.queryNeedFailoverProcessInstances(masterHost);
logger.info("failover process list size:{} ", needFailoverProcessInstanceList.size());
//updateProcessInstance host is null and insert into command //updateProcessInstance host is null and insert into command
for(ProcessInstance processInstance : needFailoverProcessInstanceList){ for(ProcessInstance processInstance : needFailoverProcessInstanceList){
logger.info("failover process instance id: {} host:{}",
processInstance.getId(), processInstance.getHost());
if(Constants.NULL.equals(processInstance.getHost()) ){ if(Constants.NULL.equals(processInstance.getHost()) ){
continue; continue;
} }

3
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -1521,10 +1521,13 @@ public class ProcessService {
*/ */
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void processNeedFailoverProcessInstances(ProcessInstance processInstance){ public void processNeedFailoverProcessInstances(ProcessInstance processInstance){
logger.info("set null host to process instance:{}", processInstance.getId());
//1 update processInstance host is null //1 update processInstance host is null
processInstance.setHost(Constants.NULL); processInstance.setHost(Constants.NULL);
processInstanceMapper.updateById(processInstance); processInstanceMapper.updateById(processInstance);
logger.info("create failover command for process instance:{}", processInstance.getId());
//2 insert into recover command //2 insert into recover command
Command cmd = new Command(); Command cmd = new Command();
cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId()); cmd.setProcessDefinitionId(processInstance.getProcessDefinitionId());

19
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java

@ -39,14 +39,6 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
*/ */
@Override @Override
protected void registerListener() { protected void registerListener() {
treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot() + "/nodes");
logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot());
try {
treeCache.start();
} catch (Exception e) {
logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
throw new RuntimeException(e);
}
treeCache.getListenable().addListener((client, event) -> { treeCache.getListenable().addListener((client, event) -> {
String path = null == event.getData() ? "" : event.getData().getPath(); String path = null == event.getData() ? "" : event.getData().getPath();
@ -55,7 +47,18 @@ public class ZookeeperCachedOperator extends ZookeeperOperator {
} }
dataChanged(client, event, path); dataChanged(client, event, path);
}); });
}
@Override
protected void treeCacheStart() {
treeCache = new TreeCache(zkClient, getZookeeperConfig().getDsRoot() + "/nodes");
logger.info("add listener to zk path: {}", getZookeeperConfig().getDsRoot());
try {
treeCache.start();
} catch (Exception e) {
logger.error("add listener to zk path: {} failed", getZookeeperConfig().getDsRoot());
throw new RuntimeException(e);
}
} }
//for sub class //for sub class

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java

@ -56,7 +56,7 @@ public class ZookeeperOperator implements InitializingBean {
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
this.zkClient = buildClient(); this.zkClient = buildClient();
initStateLister(); initStateLister();
registerListener(); treeCacheStart();
} }
/** /**
@ -64,6 +64,8 @@ public class ZookeeperOperator implements InitializingBean {
*/ */
protected void registerListener(){} protected void registerListener(){}
protected void treeCacheStart(){}
public void initStateLister() { public void initStateLister() {
checkNotNull(zkClient); checkNotNull(zkClient);

Loading…
Cancel
Save