diff --git a/dolphinscheduler-dao/src/main/resources/application.properties b/dolphinscheduler-dao/src/main/resources/application.properties
index 34bb9f916b..07abe37835 100644
--- a/dolphinscheduler-dao/src/main/resources/application.properties
+++ b/dolphinscheduler-dao/src/main/resources/application.properties
@@ -95,45 +95,6 @@ mybatis-plus.configuration.cache-enabled=false
mybatis-plus.configuration.call-setters-on-nulls=true
mybatis-plus.configuration.jdbc-type-for-null=null
-# master settings
-# master execute thread num
-master.exec.threads=100
-
-# master execute task number in parallel
-master.exec.task.num=20
-
-# master heartbeat interval
-master.heartbeat.interval=10
-
-# master commit task retry times
-master.task.commit.retryTimes=5
-
-# master commit task interval
-master.task.commit.interval=1000
-
-
-# only less than cpu avg load, master server can work. default value : the number of cpu cores * 2
-master.max.cpuload.avg=100
-
-# only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G.
-master.reserved.memory=0.1
-
-# worker settings
-# worker execute thread num
-worker.exec.threads=100
-
-# worker heartbeat interval
-worker.heartbeat.interval=10
-
-# submit the number of tasks at a time
-worker.fetch.task.num = 3
-
-# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2
-worker.max.cpuload.avg=100
-
-# only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G.
-worker.reserved.memory=0.1
-
# data quality analysis is not currently in use. please ignore the following configuration
# task record
task.record.flag=false
diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml
index 8dfee12127..fc9b1484ae 100644
--- a/dolphinscheduler-server/pom.xml
+++ b/dolphinscheduler-server/pom.xml
@@ -128,7 +128,11 @@
mockito-core
test
-
+
+ org.springframework
+ spring-test
+
+
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
index 86dd1c9083..b81e458a62 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
@@ -22,25 +22,25 @@ import org.springframework.stereotype.Component;
@Component
public class MasterConfig {
- @Value("${master.exec.threads}")
+ @Value("${master.exec.threads:100}")
private int masterExecThreads;
- @Value("${master.exec.task.num}")
+ @Value("${master.exec.task.num:20}")
private int masterExecTaskNum;
- @Value("${master.heartbeat.interval}")
+ @Value("${master.heartbeat.interval:10}")
private int masterHeartbeatInterval;
- @Value("${master.task.commit.retryTimes}")
+ @Value("${master.task.commit.retryTimes:5}")
private int masterTaskCommitRetryTimes;
- @Value("${master.task.commit.interval}")
+ @Value("${master.task.commit.interval:1000}")
private int masterTaskCommitInterval;
- @Value("${master.max.cpuload.avg}")
+ @Value("${master.max.cpuload.avg:100}")
private double masterMaxCpuloadAvg;
- @Value("${master.reserved.memory}")
+ @Value("${master.reserved.memory:0.1}")
private double masterReservedMemory;
@Value("${master.host.selector:lowerWeight}")
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index 792f9229d0..d2fd92665d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -22,19 +22,19 @@ import org.springframework.stereotype.Component;
@Component
public class WorkerConfig {
- @Value("${worker.exec.threads}")
+ @Value("${worker.exec.threads: 100}")
private int workerExecThreads;
- @Value("${worker.heartbeat.interval}")
+ @Value("${worker.heartbeat.interval: 10}")
private int workerHeartbeatInterval;
- @Value("${worker.fetch.task.num}")
+ @Value("${worker.fetch.task.num: 3}")
private int workerFetchTaskNum;
- @Value("${worker.max.cpuload.avg}")
+ @Value("${worker.max.cpuload.avg:100}")
private int workerMaxCpuloadAvg;
- @Value("${worker.reserved.memory}")
+ @Value("${worker.reserved.memory:0.1}")
private double workerReservedMemory;
@Value("${worker.group: default}")
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
new file mode 100644
index 0000000000..bff2e79fa4
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.master.registry;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.server.master.config.MasterConfig;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * master registry test
+ */
+@RunWith(SpringRunner.class)
+@ContextConfiguration(classes={SpringZKServer.class, MasterRegistry.class,ZookeeperRegistryCenter.class, MasterConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
+public class MasterRegistryTest {
+
+ @Autowired
+ private MasterRegistry masterRegistry;
+
+ @Autowired
+ private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+ @Autowired
+ private MasterConfig masterConfig;
+
+ @Test
+ public void testRegistry() throws InterruptedException {
+ masterRegistry.registry();
+ String masterPath = zookeeperRegistryCenter.getMasterPath();
+ Assert.assertEquals(ZookeeperRegistryCenter.MASTER_PATH, masterPath);
+ TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
+ String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort());
+ String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath);
+ Assert.assertEquals(5, heartbeat.split(",").length);
+ }
+
+ @Test
+ public void testUnRegistry() throws InterruptedException {
+ masterRegistry.registry();
+ TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
+ masterRegistry.unRegistry();
+ String masterPath = zookeeperRegistryCenter.getMasterPath();
+ List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(masterPath);
+ Assert.assertTrue(childrenKeys.isEmpty());
+ }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
new file mode 100644
index 0000000000..cdb169fe78
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.registry;
+
+import org.apache.dolphinscheduler.remote.utils.Constants;
+import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
+import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
+import org.apache.dolphinscheduler.server.zk.SpringZKServer;
+import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
+
+/**
+ * worker registry test
+ */
+@RunWith(SpringRunner.class)
+@ContextConfiguration(classes={SpringZKServer.class, WorkerRegistry.class,ZookeeperRegistryCenter.class, WorkerConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
+
+public class WorkerRegistryTest {
+
+ @Autowired
+ private WorkerRegistry workerRegistry;
+
+ @Autowired
+ private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+ @Autowired
+ private WorkerConfig workerConfig;
+
+ @Test
+ public void testRegistry() throws InterruptedException {
+ workerRegistry.registry();
+ String workerPath = zookeeperRegistryCenter.getWorkerPath();
+ Assert.assertEquals(ZookeeperRegistryCenter.WORKER_PATH, workerPath);
+ Assert.assertEquals(DEFAULT_WORKER_GROUP, workerConfig.getWorkerGroup().trim());
+ String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort());
+ TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node
+ String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath);
+ Assert.assertEquals(5, heartbeat.split(",").length);
+ }
+
+ @Test
+ public void testUnRegistry() throws InterruptedException {
+ workerRegistry.registry();
+ TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node
+ workerRegistry.unRegistry();
+ String workerPath = zookeeperRegistryCenter.getWorkerPath();
+ String workerGroupPath = workerPath + "/" + workerConfig.getWorkerGroup().trim();
+ List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath);
+ Assert.assertTrue(childrenKeys.isEmpty());
+ }
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/SpringZKServer.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/SpringZKServer.java
new file mode 100644
index 0000000000..ec42cad1ce
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/SpringZKServer.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.zk;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.PriorityOrdered;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * just for test
+ */
+@Service
+public class SpringZKServer implements PriorityOrdered {
+
+ private static final Logger logger = LoggerFactory.getLogger(SpringZKServer.class);
+
+ private static volatile PublicZooKeeperServerMain zkServer = null;
+
+ public static final int DEFAULT_ZK_TEST_PORT = 2181;
+
+ public static final String DEFAULT_ZK_STR = "localhost:" + DEFAULT_ZK_TEST_PORT;
+
+ private static String dataDir = null;
+
+ private static final AtomicBoolean isStarted = new AtomicBoolean(false);
+
+ @PostConstruct
+ public void start() {
+ try {
+ startLocalZkServer(DEFAULT_ZK_TEST_PORT);
+ } catch (Exception e) {
+ logger.error("Failed to start ZK: " + e);
+ }
+ }
+
+ public static boolean isStarted(){
+ return isStarted.get();
+ }
+
+
+ @Override
+ public int getOrder() {
+ return PriorityOrdered.HIGHEST_PRECEDENCE;
+ }
+
+ static class PublicZooKeeperServerMain extends ZooKeeperServerMain {
+
+ @Override
+ public void initializeAndRun(String[] args)
+ throws QuorumPeerConfig.ConfigException, IOException {
+ super.initializeAndRun(args);
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ }
+ }
+
+ /**
+ * Starts a local Zk instance with a generated empty data directory
+ *
+ * @param port The port to listen on
+ */
+ public void startLocalZkServer(final int port) {
+ startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + File.separator + "test-" + System.currentTimeMillis());
+ }
+
+ /**
+ * Starts a local Zk instance
+ *
+ * @param port The port to listen on
+ * @param dataDirPath The path for the Zk data directory
+ */
+ private void startLocalZkServer(final int port, final String dataDirPath) {
+ if (zkServer != null) {
+ throw new RuntimeException("Zookeeper server is already started!");
+ }
+ try {
+ zkServer = new PublicZooKeeperServerMain();
+ logger.info("Zookeeper data path : {} ", dataDirPath);
+ dataDir = dataDirPath;
+ final String[] args = new String[]{Integer.toString(port), dataDirPath};
+ Thread init = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ System.setProperty("zookeeper.jmx.log4j.disable", "true");
+ zkServer.initializeAndRun(args);
+ } catch (QuorumPeerConfig.ConfigException e) {
+ logger.warn("Caught exception while starting ZK", e);
+ } catch (IOException e) {
+ logger.warn("Caught exception while starting ZK", e);
+ }
+ }
+ }, "init-zk-thread");
+ init.start();
+ } catch (Exception e) {
+ logger.warn("Caught exception while starting ZK", e);
+ throw new RuntimeException(e);
+ }
+
+ CuratorFramework zkClient = CuratorFrameworkFactory.builder()
+ .connectString(DEFAULT_ZK_STR)
+ .retryPolicy(new ExponentialBackoffRetry(10,100))
+ .sessionTimeoutMs(1000 * 30)
+ .connectionTimeoutMs(1000 * 30)
+ .build();
+
+ try {
+ zkClient.blockUntilConnected(10, TimeUnit.SECONDS);
+ zkClient.close();
+ } catch (InterruptedException ignore) {
+ }
+ isStarted.compareAndSet(false, true);
+ logger.info("zk server started");
+ }
+
+ @PreDestroy
+ public void stop() {
+ try {
+ stopLocalZkServer(true);
+ logger.info("zk server stopped");
+
+ } catch (Exception e) {
+ logger.error("Failed to stop ZK ",e);
+ }
+ }
+
+ /**
+ * Stops a local Zk instance.
+ *
+ * @param deleteDataDir Whether or not to delete the data directory
+ */
+ private void stopLocalZkServer(final boolean deleteDataDir) {
+ if (zkServer != null) {
+ try {
+ zkServer.shutdown();
+ zkServer = null;
+ if (deleteDataDir) {
+ org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir));
+ }
+ isStarted.compareAndSet(true, false);
+ } catch (Exception e) {
+ logger.warn("Caught exception while stopping ZK server", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/StandaloneZKServerForTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/StandaloneZKServerForTest.java
deleted file mode 100644
index 679862f102..0000000000
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/StandaloneZKServerForTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dolphinscheduler.server.zk;
-
-import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Properties;
-
-
-/**
- * just for test
- */
-@Ignore
-public class StandaloneZKServerForTest {
-
- private static final Logger logger = LoggerFactory.getLogger(StandaloneZKServerForTest.class);
-
- private static volatile ZooKeeperServerMain zkServer = null;
-
-
- @Before
- public void before() {
- logger.info("standalone zookeeper server for test service start ");
-
- ThreadPoolExecutors.getInstance().execute(new Runnable() {
- @Override
- public void run() {
-
- //delete zk data dir ?
- File zkFile = new File(System.getProperty("java.io.tmpdir"), "zookeeper");
-
- startStandaloneServer("2000", zkFile.getAbsolutePath(), "2181", "10", "5");
- }
- });
-
- }
-
-
- /**
- * start zk server
- * @param tickTime zookeeper ticktime
- * @param dataDir zookeeper data dir
- * @param clientPort zookeeper client port
- * @param initLimit zookeeper init limit
- * @param syncLimit zookeeper sync limit
- */
- private void startStandaloneServer(String tickTime, String dataDir, String clientPort, String initLimit, String syncLimit) {
- Properties props = new Properties();
- props.setProperty("tickTime", tickTime);
- props.setProperty("dataDir", dataDir);
- props.setProperty("clientPort", clientPort);
- props.setProperty("initLimit", initLimit);
- props.setProperty("syncLimit", syncLimit);
-
- QuorumPeerConfig quorumConfig = new QuorumPeerConfig();
- try {
- quorumConfig.parseProperties(props);
-
- if(zkServer == null ){
-
- synchronized (StandaloneZKServerForTest.class){
- if(zkServer == null ){
- zkServer = new ZooKeeperServerMain();
- final ServerConfig config = new ServerConfig();
- config.readFrom(quorumConfig);
- zkServer.runFromConfig(config);
- }
- }
-
- }
-
- } catch (Exception e) {
- logger.error("start standalone server fail!", e);
- }
- }
-
-
-}
\ No newline at end of file