diff --git a/.github/workflows/ci_ut.yml b/.github/workflows/ci_ut.yml index 583874c81a..5512d2f8b5 100644 --- a/.github/workflows/ci_ut.yml +++ b/.github/workflows/ci_ut.yml @@ -48,7 +48,7 @@ jobs: - name: Bootstrap database run: | sed -i "s/: root/: test/g" $(pwd)/docker/docker-swarm/docker-compose.yml - docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml create --force-recreate dolphinscheduler-zookeeper dolphinscheduler-postgresql + docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up --no-start --force-recreate dolphinscheduler-zookeeper dolphinscheduler-postgresql sudo cp $(pwd)/sql/dolphinscheduler_postgre.sql $(docker volume inspect docker-swarm_dolphinscheduler-postgresql-initdb | grep "Mountpoint" | awk -F "\"" '{print $4}') docker-compose -f $(pwd)/docker/docker-swarm/docker-compose.yml up -d dolphinscheduler-zookeeper dolphinscheduler-postgresql - name: Set up JDK 1.8 diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index c2ea2c4073..5e00784f83 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; -import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.server.worker.WorkerServer; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; @@ -73,12 +72,6 @@ public class MasterServer implements IStoppable { */ private NettyRemotingServer nettyRemotingServer; - /** - * master registry - */ - @Autowired - private MasterRegistry masterRegistry; - /** * zk master client */ @@ -108,25 +101,17 @@ public class MasterServer implements IStoppable { */ @PostConstruct public void run() { - try { - //init remoting server - NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(masterConfig.getListenPort()); - this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); - this.nettyRemotingServer.start(); - - this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this); - - } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new RuntimeException(e); - } + // init remoting server + NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(masterConfig.getListenPort()); + this.nettyRemotingServer = new NettyRemotingServer(serverConfig); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); + this.nettyRemotingServer.start(); // self tolerant - this.zkMasterClient.start(); + this.zkMasterClient.start(this); // scheduler start this.masterSchedulerService.start(); @@ -183,10 +168,9 @@ public class MasterServer implements IStoppable { } catch (Exception e) { logger.warn("thread sleep exception ", e); } - // + //close this.masterSchedulerService.close(); this.nettyRemotingServer.close(); - this.masterRegistry.unRegistry(); this.zkMasterClient.close(); //close quartz try { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 10880bf94f..28345e7932 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -105,37 +105,35 @@ public class WorkerServer implements IStoppable { new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args); } - /** * worker server run */ @PostConstruct public void run() { + // alert-server client registry + alertClientService = new AlertClientService(workerConfig.getAlertListenHost(), Constants.ALERT_RPC_PORT); + + // init remoting server + NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(workerConfig.getListenPort()); + this.nettyRemotingServer = new NettyRemotingServer(serverConfig); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); + this.nettyRemotingServer.start(); + + // worker registry try { - logger.info("start worker server..."); - - //init remoting server - NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(workerConfig.getListenPort()); - this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor()); - this.nettyRemotingServer.start(); - + this.workerRegistry.registry(); this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this); Set workerZkPaths = this.workerRegistry.getWorkerZkPaths(); this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP); - // worker registry - this.workerRegistry.registry(); - - // retry report task status - this.retryReportTaskStatusThread.start(); } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); } + // task execute manager this.workerManagerThread.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 665a62c26d..9274a6c690 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.MasterServer; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -73,8 +74,7 @@ public class ZKMasterClient extends AbstractZKClient { @Autowired private MasterRegistry masterRegistry; - public void start() { - + public void start(MasterServer masterServer) { InterProcessMutex mutex = null; try { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master @@ -82,9 +82,9 @@ public class ZKMasterClient extends AbstractZKClient { mutex = new InterProcessMutex(getZkClient(), znodeLock); mutex.acquire(); - // Master registry + // master registry masterRegistry.registry(); - + masterRegistry.getZookeeperRegistryCenter().setStoppable(masterServer); String registPath = this.masterRegistry.getMasterPath(); masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java index 8ea9ea5ba3..e441986279 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperOperator.java @@ -29,12 +29,13 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; @@ -136,10 +137,10 @@ public class ZookeeperOperator implements InitializingBean { } public List getChildrenKeys(final String key) { - List values; try { - values = zkClient.getChildren().forPath(key); - return values; + return zkClient.getChildren().forPath(key); + } catch (NoNodeException ex) { + return new ArrayList<>(); } catch (InterruptedException ex) { logger.error("getChildrenKeys key : {} InterruptedException", key); throw new IllegalStateException(ex); @@ -193,7 +194,7 @@ public class ZookeeperOperator implements InitializingBean { if (isExisted(key)) { try { zkClient.delete().deletingChildrenIfNeeded().forPath(key); - } catch (KeeperException.NoNodeException ignore) { + } catch (NoNodeException ignore) { //NOP } } @@ -230,7 +231,7 @@ public class ZookeeperOperator implements InitializingBean { if (isExisted(key)) { zkClient.delete().deletingChildrenIfNeeded().forPath(key); } - } catch (KeeperException.NoNodeException ignore) { + } catch (NoNodeException ignore) { //NOP } catch (final Exception ex) { logger.error("remove key : {}", key, ex); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java index f828c0772f..7823c9b1af 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java @@ -113,4 +113,19 @@ public class RegisterOperatorTest { Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); } + @Test + public void testGetChildrenKeysWithNoNodeException() throws Exception { + testAfterPropertiesSet(); + String path = registerOperator.getDeadZNodeParentPath(); + Assert.assertEquals(0, registerOperator.getChildrenKeys(path).size()); + } + + @Test + public void testNoNodeException() throws Exception { + testAfterPropertiesSet(); + String path = registerOperator.getDeadZNodeParentPath(); + registerOperator.persistEphemeral(path, "test"); + registerOperator.remove(path); + } + } \ No newline at end of file