diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/threadutils/ThreadUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/threadutils/ThreadUtilsTest.java deleted file mode 100644 index 2c76f40c0b..0000000000 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/threadutils/ThreadUtilsTest.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.common.threadutils; - -import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; -import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Calendar; -import java.util.concurrent.*; - -import static org.junit.Assert.*; - - -public class ThreadUtilsTest { - private static final Logger logger = LoggerFactory.getLogger(ThreadUtilsTest.class); - /** - * create a naming thread - */ - @Test - public void testNewDaemonFixedThreadExecutor() { - // create core size and max size are all 3 - ExecutorService testExec = ThreadUtils.newDaemonFixedThreadExecutor("test-exec-thread",10); - - for (int i = 0; i < 19; i++) { - final int index = i; - testExec.submit(() -> { - System.out.println("do some work index " + index); - }); - } - assertFalse(testExec.isShutdown()); - testExec.shutdownNow(); - assertTrue(testExec.isShutdown()); - - } - - /** - * test schedulerThreadExecutor as for print time in scheduler - * default check thread is 1 - */ - @Test - public void testNewDaemonScheduleThreadExecutor() { - - ScheduledExecutorService scheduleService = ThreadUtils.newDaemonThreadScheduledExecutor("scheduler-thread", 1); - Calendar start = Calendar.getInstance(); - Calendar globalTimer = Calendar.getInstance(); - globalTimer.set(2019, Calendar.DECEMBER, 1, 0, 0, 0); - // current - Calendar end = Calendar.getInstance(); - end.set(2019, Calendar.DECEMBER, 1, 0, 0, 3); - Runnable schedulerTask = new Runnable() { - @Override - public void run() { - start.set(2019, Calendar.DECEMBER, 1, 0, 0, 0); - int index = 0; - // send heart beat work - while (start.getTime().getTime() <= end.getTime().getTime()) { - System.out.println("worker here"); - System.out.println(index ++); - start.add(Calendar.SECOND, 1); - globalTimer.add(Calendar.SECOND, 1); - } - System.out.println("time is " + System.currentTimeMillis()); - } - }; - scheduleService.scheduleAtFixedRate(schedulerTask, 2, 10, TimeUnit.SECONDS); - assertFalse(scheduleService.isShutdown()); - try { - Thread.sleep(60000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - scheduleService.shutdownNow(); - assertTrue(scheduleService.isShutdown()); - } - - /** - * test stopper is working normal - */ - @Test - public void testStopper() { - assertTrue(Stopper.isRunning()); - Stopper.stop(); - assertTrue(Stopper.isStopped()); - } - - /** - * test threadPoolExecutors with 3 workers and current each 5 tasks - * @throws InterruptedException - */ - @Test - public void testThreadInfo() throws InterruptedException { - ThreadPoolExecutors workers = ThreadPoolExecutors.getInstance("worker", 3); - for (int i = 0; i < 5; ++i ) { - int index = i; - workers.execute(() -> { - for (int j = 0; j < 10; ++j) { - try { - Thread.sleep(1000); - System.out.printf("worker %d is doing the task", index); - System.out.println(); - workers.printStatus(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - }); - workers.submit(() -> { - for (int j = 0; j < 10; ++j) { - try { - Thread.sleep(1000); - System.out.printf("worker_2 %d is doing the task", index); - System.out.println(); - workers.printStatus(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - }); - } - Thread.sleep(50001); - workers.shutdown(); - } - - /** - * test a single daemon thread pool - */ - @Test - public void testNewDaemonSingleThreadExecutor() { - ExecutorService threadTest = ThreadUtils.newDaemonSingleThreadExecutor("thread_test"); - threadTest.execute(() -> { - for (int i = 0; i < 100; ++i) { - System.out.println("daemon working "); - } - - }); - assertFalse(threadTest.isShutdown()); - threadTest.shutdownNow(); - assertTrue(threadTest.isShutdown()); - } - - @Test - public void testNewDaemonCachedThreadPool() { - - ThreadPoolExecutor threadPoolExecutor = ThreadUtils.newDaemonCachedThreadPool("threadTest-"); - Thread thread1 = threadPoolExecutor.getThreadFactory().newThread(() -> { - for (int i = 0; i < 10; ++i) { - System.out.println("this task is with index " + i ); - } - }); - assertTrue(thread1.getName().startsWith("threadTest-")); - assertFalse(threadPoolExecutor.isShutdown()); - threadPoolExecutor.shutdown(); - assertTrue(threadPoolExecutor.isShutdown()); - } - - @Test - public void testNewDaemonCachedThreadPoolWithThreadNumber() { - ThreadPoolExecutor threadPoolExecutor = ThreadUtils.newDaemonCachedThreadPool("threadTest--", 3, 10); - for (int i = 0; i < 10; ++ i) { - threadPoolExecutor.getThreadFactory().newThread(() -> { - assertEquals(3, threadPoolExecutor.getActiveCount()); - System.out.println("this task is first work to do"); - }); - } - assertFalse(threadPoolExecutor.isShutdown()); - threadPoolExecutor.shutdown(); - assertTrue(threadPoolExecutor.isShutdown()); - } - - - -} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java index c08da0ef72..5a04c5a23b 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java @@ -24,6 +24,7 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -32,6 +33,7 @@ import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; @@ -55,9 +57,10 @@ public class CuratorZookeeperClient implements InitializingBean { } private CuratorFramework buildClient() { - logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList()); + logger.info("zookeeper registry center init, server lists is: [{}]", zookeeperConfig.getServerList()); - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null"))) + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + .ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(), "zookeeper quorum can't be null"))) .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs())); //these has default value @@ -84,7 +87,9 @@ public class CuratorZookeeperClient implements InitializingBean { zkClient = builder.build(); zkClient.start(); try { - zkClient.blockUntilConnected(); + logger.info("trying to connect zookeeper server list:{}", zookeeperConfig.getServerList()); + zkClient.blockUntilConnected(30, TimeUnit.SECONDS); + } catch (final Exception ex) { throw new RuntimeException(ex); } @@ -95,12 +100,14 @@ public class CuratorZookeeperClient implements InitializingBean { checkNotNull(zkClient); zkClient.getConnectionStateListenable().addListener((client, newState) -> { - if(newState == ConnectionState.LOST){ + if (newState == ConnectionState.LOST) { logger.error("connection lost from zookeeper"); - } else if(newState == ConnectionState.RECONNECTED){ + } else if (newState == ConnectionState.RECONNECTED) { logger.info("reconnected to zookeeper"); - } else if(newState == ConnectionState.SUSPENDED){ + } else if (newState == ConnectionState.SUSPENDED) { logger.warn("connection SUSPENDED to zookeeper"); + } else if (newState == ConnectionState.CONNECTED) { + logger.info("connected to zookeeper server list:[{}]", zookeeperConfig.getServerList()); } }); } diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClientTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClientTest.java index c0297799ea..b1c2ec5e25 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClientTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClientTest.java @@ -59,9 +59,8 @@ public class CuratorZookeeperClientTest { zookeeperConfig.setDsRoot("/dolphinscheduler"); zookeeperConfig.setMaxWaitTime(30000); zookeeperClient.setZookeeperConfig(zookeeperConfig); - System.out.println("start"); zookeeperClient.afterPropertiesSet(); - System.out.println("end"); + Assert.assertNotNull(zookeeperClient.getZkClient()); } } \ No newline at end of file