From c61c59ae54ac3df8a686c21521151d2759bb6664 Mon Sep 17 00:00:00 2001 From: Tboy Date: Sun, 15 Mar 2020 19:05:04 +0800 Subject: [PATCH] Refactor worker (#2186) * let quartz use the same datasource * move master/worker config from dao.properties to each config add master/worker registry test --- .../src/main/resources/application.properties | 39 ---- dolphinscheduler-server/pom.xml | 6 +- .../server/master/config/MasterConfig.java | 14 +- .../server/worker/config/WorkerConfig.java | 10 +- .../master/registry/MasterRegistryTest.java | 73 +++++++ .../worker/registry/WorkerRegistryTest.java | 77 ++++++++ .../server/zk/SpringZKServer.java | 178 ++++++++++++++++++ 7 files changed, 345 insertions(+), 52 deletions(-) create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/SpringZKServer.java diff --git a/dolphinscheduler-dao/src/main/resources/application.properties b/dolphinscheduler-dao/src/main/resources/application.properties index c928c72df6..3c666090e6 100644 --- a/dolphinscheduler-dao/src/main/resources/application.properties +++ b/dolphinscheduler-dao/src/main/resources/application.properties @@ -99,45 +99,6 @@ mybatis-plus.configuration.cache-enabled=false mybatis-plus.configuration.call-setters-on-nulls=true mybatis-plus.configuration.jdbc-type-for-null=null -# master settings -# master execute thread num -master.exec.threads=100 - -# master execute task number in parallel -master.exec.task.num=20 - -# master heartbeat interval -master.heartbeat.interval=10 - -# master commit task retry times -master.task.commit.retryTimes=5 - -# master commit task interval -master.task.commit.interval=1000 - - -# only less than cpu avg load, master server can work. default value : the number of cpu cores * 2 -master.max.cpuload.avg=100 - -# only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G. -master.reserved.memory=0.1 - -# worker settings -# worker execute thread num -worker.exec.threads=100 - -# worker heartbeat interval -worker.heartbeat.interval=10 - -# submit the number of tasks at a time -worker.fetch.task.num = 3 - -# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2 -worker.max.cpuload.avg=100 - -# only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G. -worker.reserved.memory=0.1 - # data quality analysis is not currently in use. please ignore the following configuration # task record task.record.flag=false diff --git a/dolphinscheduler-server/pom.xml b/dolphinscheduler-server/pom.xml index e8e84297e2..3d5189484d 100644 --- a/dolphinscheduler-server/pom.xml +++ b/dolphinscheduler-server/pom.xml @@ -138,7 +138,11 @@ mockito-core test - + + org.springframework + spring-test + + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index efb7cff1a7..10b4078417 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -22,25 +22,25 @@ import org.springframework.stereotype.Component; @Component public class MasterConfig { - @Value("${master.exec.threads}") + @Value("${master.exec.threads:100}") private int masterExecThreads; - @Value("${master.exec.task.num}") + @Value("${master.exec.task.num:20}") private int masterExecTaskNum; - @Value("${master.heartbeat.interval}") + @Value("${master.heartbeat.interval:10}") private int masterHeartbeatInterval; - @Value("${master.task.commit.retryTimes}") + @Value("${master.task.commit.retryTimes:5}") private int masterTaskCommitRetryTimes; - @Value("${master.task.commit.interval}") + @Value("${master.task.commit.interval:1000}") private int masterTaskCommitInterval; - @Value("${master.max.cpuload.avg}") + @Value("${master.max.cpuload.avg:100}") private double masterMaxCpuloadAvg; - @Value("${master.reserved.memory}") + @Value("${master.reserved.memory:0.1}") private double masterReservedMemory; public int getMasterExecThreads() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index aaaf5c7805..1636da8121 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -22,19 +22,19 @@ import org.springframework.stereotype.Component; @Component public class WorkerConfig { - @Value("${worker.exec.threads}") + @Value("${worker.exec.threads: 100}") private int workerExecThreads; - @Value("${worker.heartbeat.interval}") + @Value("${worker.heartbeat.interval: 10}") private int workerHeartbeatInterval; - @Value("${worker.fetch.task.num}") + @Value("${worker.fetch.task.num: 3}") private int workerFetchTaskNum; - @Value("${worker.max.cpuload.avg}") + @Value("${worker.max.cpuload.avg:100}") private int workerMaxCpuloadAvg; - @Value("${worker.reserved.memory}") + @Value("${worker.reserved.memory:0.1}") private double workerReservedMemory; public int getWorkerExecThreads() { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java new file mode 100644 index 0000000000..bff2e79fa4 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.registry; + +import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.List; +import java.util.concurrent.TimeUnit; + + +/** + * master registry test + */ +@RunWith(SpringRunner.class) +@ContextConfiguration(classes={SpringZKServer.class, MasterRegistry.class,ZookeeperRegistryCenter.class, MasterConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) +public class MasterRegistryTest { + + @Autowired + private MasterRegistry masterRegistry; + + @Autowired + private ZookeeperRegistryCenter zookeeperRegistryCenter; + + @Autowired + private MasterConfig masterConfig; + + @Test + public void testRegistry() throws InterruptedException { + masterRegistry.registry(); + String masterPath = zookeeperRegistryCenter.getMasterPath(); + Assert.assertEquals(ZookeeperRegistryCenter.MASTER_PATH, masterPath); + TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node + String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort()); + String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath); + Assert.assertEquals(5, heartbeat.split(",").length); + } + + @Test + public void testUnRegistry() throws InterruptedException { + masterRegistry.registry(); + TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node + masterRegistry.unRegistry(); + String masterPath = zookeeperRegistryCenter.getMasterPath(); + List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(masterPath); + Assert.assertTrue(childrenKeys.isEmpty()); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java new file mode 100644 index 0000000000..cdb169fe78 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.registry; + +import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; + +/** + * worker registry test + */ +@RunWith(SpringRunner.class) +@ContextConfiguration(classes={SpringZKServer.class, WorkerRegistry.class,ZookeeperRegistryCenter.class, WorkerConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) + +public class WorkerRegistryTest { + + @Autowired + private WorkerRegistry workerRegistry; + + @Autowired + private ZookeeperRegistryCenter zookeeperRegistryCenter; + + @Autowired + private WorkerConfig workerConfig; + + @Test + public void testRegistry() throws InterruptedException { + workerRegistry.registry(); + String workerPath = zookeeperRegistryCenter.getWorkerPath(); + Assert.assertEquals(ZookeeperRegistryCenter.WORKER_PATH, workerPath); + Assert.assertEquals(DEFAULT_WORKER_GROUP, workerConfig.getWorkerGroup().trim()); + String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (Constants.LOCAL_ADDRESS + ":" + workerConfig.getListenPort()); + TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node + String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath); + Assert.assertEquals(5, heartbeat.split(",").length); + } + + @Test + public void testUnRegistry() throws InterruptedException { + workerRegistry.registry(); + TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node + workerRegistry.unRegistry(); + String workerPath = zookeeperRegistryCenter.getWorkerPath(); + String workerGroupPath = workerPath + "/" + workerConfig.getWorkerGroup().trim(); + List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath); + Assert.assertTrue(childrenKeys.isEmpty()); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/SpringZKServer.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/SpringZKServer.java new file mode 100644 index 0000000000..ec42cad1ce --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/zk/SpringZKServer.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.zk; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.zookeeper.server.ZooKeeperServerMain; +import org.apache.zookeeper.server.quorum.QuorumPeerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.PriorityOrdered; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.io.File; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * just for test + */ +@Service +public class SpringZKServer implements PriorityOrdered { + + private static final Logger logger = LoggerFactory.getLogger(SpringZKServer.class); + + private static volatile PublicZooKeeperServerMain zkServer = null; + + public static final int DEFAULT_ZK_TEST_PORT = 2181; + + public static final String DEFAULT_ZK_STR = "localhost:" + DEFAULT_ZK_TEST_PORT; + + private static String dataDir = null; + + private static final AtomicBoolean isStarted = new AtomicBoolean(false); + + @PostConstruct + public void start() { + try { + startLocalZkServer(DEFAULT_ZK_TEST_PORT); + } catch (Exception e) { + logger.error("Failed to start ZK: " + e); + } + } + + public static boolean isStarted(){ + return isStarted.get(); + } + + + @Override + public int getOrder() { + return PriorityOrdered.HIGHEST_PRECEDENCE; + } + + static class PublicZooKeeperServerMain extends ZooKeeperServerMain { + + @Override + public void initializeAndRun(String[] args) + throws QuorumPeerConfig.ConfigException, IOException { + super.initializeAndRun(args); + } + + @Override + public void shutdown() { + super.shutdown(); + } + } + + /** + * Starts a local Zk instance with a generated empty data directory + * + * @param port The port to listen on + */ + public void startLocalZkServer(final int port) { + startLocalZkServer(port, org.apache.commons.io.FileUtils.getTempDirectoryPath() + File.separator + "test-" + System.currentTimeMillis()); + } + + /** + * Starts a local Zk instance + * + * @param port The port to listen on + * @param dataDirPath The path for the Zk data directory + */ + private void startLocalZkServer(final int port, final String dataDirPath) { + if (zkServer != null) { + throw new RuntimeException("Zookeeper server is already started!"); + } + try { + zkServer = new PublicZooKeeperServerMain(); + logger.info("Zookeeper data path : {} ", dataDirPath); + dataDir = dataDirPath; + final String[] args = new String[]{Integer.toString(port), dataDirPath}; + Thread init = new Thread(new Runnable() { + @Override + public void run() { + try { + System.setProperty("zookeeper.jmx.log4j.disable", "true"); + zkServer.initializeAndRun(args); + } catch (QuorumPeerConfig.ConfigException e) { + logger.warn("Caught exception while starting ZK", e); + } catch (IOException e) { + logger.warn("Caught exception while starting ZK", e); + } + } + }, "init-zk-thread"); + init.start(); + } catch (Exception e) { + logger.warn("Caught exception while starting ZK", e); + throw new RuntimeException(e); + } + + CuratorFramework zkClient = CuratorFrameworkFactory.builder() + .connectString(DEFAULT_ZK_STR) + .retryPolicy(new ExponentialBackoffRetry(10,100)) + .sessionTimeoutMs(1000 * 30) + .connectionTimeoutMs(1000 * 30) + .build(); + + try { + zkClient.blockUntilConnected(10, TimeUnit.SECONDS); + zkClient.close(); + } catch (InterruptedException ignore) { + } + isStarted.compareAndSet(false, true); + logger.info("zk server started"); + } + + @PreDestroy + public void stop() { + try { + stopLocalZkServer(true); + logger.info("zk server stopped"); + + } catch (Exception e) { + logger.error("Failed to stop ZK ",e); + } + } + + /** + * Stops a local Zk instance. + * + * @param deleteDataDir Whether or not to delete the data directory + */ + private void stopLocalZkServer(final boolean deleteDataDir) { + if (zkServer != null) { + try { + zkServer.shutdown(); + zkServer = null; + if (deleteDataDir) { + org.apache.commons.io.FileUtils.deleteDirectory(new File(dataDir)); + } + isStarted.compareAndSet(true, false); + } catch (Exception e) { + logger.warn("Caught exception while stopping ZK server", e); + throw new RuntimeException(e); + } + } + } +} \ No newline at end of file