Browse Source

Merge pull request #3402 from dailidong/dev

[fix-#3680][service]  Zookeeper does not start, printed logs have no error messages
pull/3/MERGE
Kirs 4 years ago committed by GitHub
parent
commit
8afc0cc815
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 190
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/threadutils/ThreadUtilsTest.java
  2. 19
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java
  3. 3
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClientTest.java

190
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/threadutils/ThreadUtilsTest.java

@ -1,190 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.threadutils;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Calendar;
import java.util.concurrent.*;
import static org.junit.Assert.*;
public class ThreadUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(ThreadUtilsTest.class);
/**
* create a naming thread
*/
@Test
public void testNewDaemonFixedThreadExecutor() {
// create core size and max size are all 3
ExecutorService testExec = ThreadUtils.newDaemonFixedThreadExecutor("test-exec-thread",10);
for (int i = 0; i < 19; i++) {
final int index = i;
testExec.submit(() -> {
System.out.println("do some work index " + index);
});
}
assertFalse(testExec.isShutdown());
testExec.shutdownNow();
assertTrue(testExec.isShutdown());
}
/**
* test schedulerThreadExecutor as for print time in scheduler
* default check thread is 1
*/
@Test
public void testNewDaemonScheduleThreadExecutor() {
ScheduledExecutorService scheduleService = ThreadUtils.newDaemonThreadScheduledExecutor("scheduler-thread", 1);
Calendar start = Calendar.getInstance();
Calendar globalTimer = Calendar.getInstance();
globalTimer.set(2019, Calendar.DECEMBER, 1, 0, 0, 0);
// current
Calendar end = Calendar.getInstance();
end.set(2019, Calendar.DECEMBER, 1, 0, 0, 3);
Runnable schedulerTask = new Runnable() {
@Override
public void run() {
start.set(2019, Calendar.DECEMBER, 1, 0, 0, 0);
int index = 0;
// send heart beat work
while (start.getTime().getTime() <= end.getTime().getTime()) {
System.out.println("worker here");
System.out.println(index ++);
start.add(Calendar.SECOND, 1);
globalTimer.add(Calendar.SECOND, 1);
}
System.out.println("time is " + System.currentTimeMillis());
}
};
scheduleService.scheduleAtFixedRate(schedulerTask, 2, 10, TimeUnit.SECONDS);
assertFalse(scheduleService.isShutdown());
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduleService.shutdownNow();
assertTrue(scheduleService.isShutdown());
}
/**
* test stopper is working normal
*/
@Test
public void testStopper() {
assertTrue(Stopper.isRunning());
Stopper.stop();
assertTrue(Stopper.isStopped());
}
/**
* test threadPoolExecutors with 3 workers and current each 5 tasks
* @throws InterruptedException
*/
@Test
public void testThreadInfo() throws InterruptedException {
ThreadPoolExecutors workers = ThreadPoolExecutors.getInstance("worker", 3);
for (int i = 0; i < 5; ++i ) {
int index = i;
workers.execute(() -> {
for (int j = 0; j < 10; ++j) {
try {
Thread.sleep(1000);
System.out.printf("worker %d is doing the task", index);
System.out.println();
workers.printStatus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
workers.submit(() -> {
for (int j = 0; j < 10; ++j) {
try {
Thread.sleep(1000);
System.out.printf("worker_2 %d is doing the task", index);
System.out.println();
workers.printStatus();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
Thread.sleep(50001);
workers.shutdown();
}
/**
* test a single daemon thread pool
*/
@Test
public void testNewDaemonSingleThreadExecutor() {
ExecutorService threadTest = ThreadUtils.newDaemonSingleThreadExecutor("thread_test");
threadTest.execute(() -> {
for (int i = 0; i < 100; ++i) {
System.out.println("daemon working ");
}
});
assertFalse(threadTest.isShutdown());
threadTest.shutdownNow();
assertTrue(threadTest.isShutdown());
}
@Test
public void testNewDaemonCachedThreadPool() {
ThreadPoolExecutor threadPoolExecutor = ThreadUtils.newDaemonCachedThreadPool("threadTest-");
Thread thread1 = threadPoolExecutor.getThreadFactory().newThread(() -> {
for (int i = 0; i < 10; ++i) {
System.out.println("this task is with index " + i );
}
});
assertTrue(thread1.getName().startsWith("threadTest-"));
assertFalse(threadPoolExecutor.isShutdown());
threadPoolExecutor.shutdown();
assertTrue(threadPoolExecutor.isShutdown());
}
@Test
public void testNewDaemonCachedThreadPoolWithThreadNumber() {
ThreadPoolExecutor threadPoolExecutor = ThreadUtils.newDaemonCachedThreadPool("threadTest--", 3, 10);
for (int i = 0; i < 10; ++ i) {
threadPoolExecutor.getThreadFactory().newThread(() -> {
assertEquals(3, threadPoolExecutor.getActiveCount());
System.out.println("this task is first work to do");
});
}
assertFalse(threadPoolExecutor.isShutdown());
threadPoolExecutor.shutdown();
assertTrue(threadPoolExecutor.isShutdown());
}
}

19
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClient.java

@ -24,6 +24,7 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@ -32,6 +33,7 @@ import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
@ -55,9 +57,10 @@ public class CuratorZookeeperClient implements InitializingBean {
}
private CuratorFramework buildClient() {
logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList());
logger.info("zookeeper registry center init, server lists is: [{}]", zookeeperConfig.getServerList());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null")))
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(), "zookeeper quorum can't be null")))
.retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()));
//these has default value
@ -84,7 +87,9 @@ public class CuratorZookeeperClient implements InitializingBean {
zkClient = builder.build();
zkClient.start();
try {
zkClient.blockUntilConnected();
logger.info("trying to connect zookeeper server list:{}", zookeeperConfig.getServerList());
zkClient.blockUntilConnected(30, TimeUnit.SECONDS);
} catch (final Exception ex) {
throw new RuntimeException(ex);
}
@ -95,12 +100,14 @@ public class CuratorZookeeperClient implements InitializingBean {
checkNotNull(zkClient);
zkClient.getConnectionStateListenable().addListener((client, newState) -> {
if(newState == ConnectionState.LOST){
if (newState == ConnectionState.LOST) {
logger.error("connection lost from zookeeper");
} else if(newState == ConnectionState.RECONNECTED){
} else if (newState == ConnectionState.RECONNECTED) {
logger.info("reconnected to zookeeper");
} else if(newState == ConnectionState.SUSPENDED){
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("connection SUSPENDED to zookeeper");
} else if (newState == ConnectionState.CONNECTED) {
logger.info("connected to zookeeper server list:[{}]", zookeeperConfig.getServerList());
}
});
}

3
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/CuratorZookeeperClientTest.java

@ -59,9 +59,8 @@ public class CuratorZookeeperClientTest {
zookeeperConfig.setDsRoot("/dolphinscheduler");
zookeeperConfig.setMaxWaitTime(30000);
zookeeperClient.setZookeeperConfig(zookeeperConfig);
System.out.println("start");
zookeeperClient.afterPropertiesSet();
System.out.println("end");
Assert.assertNotNull(zookeeperClient.getZkClient());
}
}
Loading…
Cancel
Save