Browse Source

refactor zk client and zk config module (#1537)

* refactor zk client & zk config module

* refactor zk client & zk config module

* add license
pull/2/head
DK.Pino 5 years ago committed by qiaozhanwei
parent
commit
4bbf31f807
  1. 4
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java
  2. 7
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java
  3. 5
      dolphinscheduler-common/pom.xml
  4. 24
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  5. 47
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java
  6. 288
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java
  7. 35
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractListener.java
  8. 121
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java
  9. 48
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/DefaultEnsembleProvider.java
  10. 82
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java
  11. 71
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperConfig.java
  12. 232
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java
  13. 19
      dolphinscheduler-common/src/main/resources/zookeeper.properties
  14. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  15. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  16. 148
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  17. 104
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java

4
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java

@ -39,6 +39,8 @@ import java.util.Map;
@Service @Service
public class MonitorService extends BaseService{ public class MonitorService extends BaseService{
@Autowired
private ZookeeperMonitor zookeeperMonitor;
@Autowired @Autowired
private MonitorDBDao monitorDBDao; private MonitorDBDao monitorDBDao;
@ -86,7 +88,7 @@ public class MonitorService extends BaseService{
public Map<String,Object> queryZookeeperState(User loginUser) { public Map<String,Object> queryZookeeperState(User loginUser) {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>(5);
List<ZookeeperRecord> zookeeperRecordList = ZookeeperMonitor.zookeeperInfoList(); List<ZookeeperRecord> zookeeperRecordList = zookeeperMonitor.zookeeperInfoList();
result.put(Constants.DATA_LIST, zookeeperRecordList); result.put(Constants.DATA_LIST, zookeeperRecordList);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);

7
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
@ -32,17 +33,17 @@ import java.util.List;
/** /**
* monitor zookeeper info * monitor zookeeper info
*/ */
@Component
public class ZookeeperMonitor extends AbstractZKClient{ public class ZookeeperMonitor extends AbstractZKClient{
private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class); private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class);
private static final String zookeeperList = AbstractZKClient.getZookeeperQuorum();
/** /**
* *
* @return zookeeper info list * @return zookeeper info list
*/ */
public static List<ZookeeperRecord> zookeeperInfoList(){ public List<ZookeeperRecord> zookeeperInfoList(){
String zookeeperServers = zookeeperList.replaceAll("[\\t\\n\\x0B\\f\\r]", ""); String zookeeperServers = getZookeeperQuorum().replaceAll("[\\t\\n\\x0B\\f\\r]", "");
try{ try{
return zookeeperInfoList(zookeeperServers); return zookeeperInfoList(zookeeperServers);
}catch(Exception e){ }catch(Exception e){

5
dolphinscheduler-common/pom.xml

@ -611,5 +611,10 @@
<version>${lombok.version}</version> <version>${lombok.version}</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

24
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -139,42 +139,50 @@ public final class Constants {
/** /**
* MasterServer directory registered in zookeeper * MasterServer directory registered in zookeeper
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "zookeeper.dolphinscheduler.masters"; //public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "zookeeper.dolphinscheduler.masters";
public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/masters";
/** /**
* WorkerServer directory registered in zookeeper * WorkerServer directory registered in zookeeper
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "zookeeper.dolphinscheduler.workers"; //public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "zookeeper.dolphinscheduler.workers";
public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "/workers";
/** /**
* all servers directory registered in zookeeper * all servers directory registered in zookeeper
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS = "zookeeper.dolphinscheduler.dead.servers"; //public static final String ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS = "zookeeper.dolphinscheduler.dead.servers";
public static final String ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS = "/dead-servers";
/** /**
* MasterServer lock directory registered in zookeeper * MasterServer lock directory registered in zookeeper
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = "zookeeper.dolphinscheduler.lock.masters"; //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = "zookeeper.dolphinscheduler.lock.masters";
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = "/lock/masters";
/** /**
* WorkerServer lock directory registered in zookeeper * WorkerServer lock directory registered in zookeeper
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS = "zookeeper.dolphinscheduler.lock.workers"; //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS = "zookeeper.dolphinscheduler.lock.workers";
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS = "/lock/workers";
/** /**
* MasterServer failover directory registered in zookeeper * MasterServer failover directory registered in zookeeper
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "zookeeper.dolphinscheduler.lock.failover.masters"; //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "zookeeper.dolphinscheduler.lock.failover.masters";
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
/** /**
* WorkerServer failover directory registered in zookeeper * WorkerServer failover directory registered in zookeeper
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "zookeeper.dolphinscheduler.lock.failover.workers"; //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "zookeeper.dolphinscheduler.lock.failover.workers";
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
/** /**
* MasterServer startup failover runing and fault tolerance process * MasterServer startup failover runing and fault tolerance process
*/ */
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "zookeeper.dolphinscheduler.lock.failover.startup.masters"; //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "zookeeper.dolphinscheduler.lock.failover.startup.masters";
public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
/** /**
* need send warn times when master server or worker server failover * need send warn times when master server or worker server failover

47
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java

@ -26,27 +26,44 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.Bytes; import org.apache.dolphinscheduler.common.utils.Bytes;
import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.IpUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient; import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.dolphinscheduler.common.zk.DefaultEnsembleProvider;
import org.apache.dolphinscheduler.common.zk.ZookeeperConfig;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
/** /**
* A singleton of a task queue implemented with zookeeper * A singleton of a task queue implemented with zookeeper
* tasks queue implemention * tasks queue implemention
*/ */
public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { public class TaskQueueZkImpl implements ITaskQueue {
private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class); private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);
private static volatile TaskQueueZkImpl instance; private static volatile TaskQueueZkImpl instance;
private CuratorFramework zkClient;
private ZookeeperConfig zookeeperConfig;
private CuratorFramework getZkClient() {
return zkClient;
}
private TaskQueueZkImpl(){ private TaskQueueZkImpl(){
init(); init();
} }
@ -376,6 +393,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* Init the task queue of zookeeper node * Init the task queue of zookeeper node
*/ */
private void init(){ private void init(){
initZkClient();
try { try {
String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL);
@ -394,6 +412,31 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
} }
} }
private void initZkClient() {
Configuration conf = null;
try {
conf = new PropertiesConfiguration(Constants.ZOOKEEPER_PROPERTIES_PATH);
} catch (ConfigurationException ex) {
logger.error("load zookeeper properties file failed, system exit");
System.exit(-1);
}
zookeeperConfig = ZookeeperConfig.getFromConf(conf);
zkClient = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(), "zookeeper quorum can't be null")))
.retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()))
.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs())
.connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs())
.build();
zkClient.start();
try {
zkClient.blockUntilConnected();
} catch (final Exception ex) {
throw new RuntimeException(ex);
}
}
/** /**
* Clear the task queue of zookeeper node * Clear the task queue of zookeeper node
@ -429,7 +472,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {
* @return * @return
*/ */
public String getTasksPath(String key){ public String getTasksPath(String key){
return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_ROOT) + Constants.SINGLE_SLASH + key; return zookeeperConfig.getDsRoot() + Constants.SINGLE_SLASH + key;
} }

288
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java

@ -0,0 +1,288 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.springframework.lang.Nullable;
/**
* A collection of static utility methods to validate input.
*
* <p>This class is modelled after Google Guava's Preconditions class, and partly takes code
* from that class. We add this code to here base in order to reduce external
* dependencies.
*/
public final class Preconditions {
// ------------------------------------------------------------------------
// Null checks
// ------------------------------------------------------------------------
/**
* Ensures that the given object reference is not null.
* Upon violation, a {@code NullPointerException} with no message is thrown.
*
* @param reference The object reference
* @return The object reference itself (generically typed).
*
* @throws NullPointerException Thrown, if the passed reference was null.
*/
public static <T> T checkNotNull(T reference) {
if (reference == null) {
throw new NullPointerException();
}
return reference;
}
/**
* Ensures that the given object reference is not null.
* Upon violation, a {@code NullPointerException} with the given message is thrown.
*
* @param reference The object reference
* @param errorMessage The message for the {@code NullPointerException} that is thrown if the check fails.
* @return The object reference itself (generically typed).
*
* @throws NullPointerException Thrown, if the passed reference was null.
*/
public static <T> T checkNotNull(T reference, @Nullable String errorMessage) {
if (reference == null) {
throw new NullPointerException(String.valueOf(errorMessage));
}
return reference;
}
/**
* Ensures that the given object reference is not null.
* Upon violation, a {@code NullPointerException} with the given message is thrown.
*
* <p>The error message is constructed from a template and an arguments array, after
* a similar fashion as {@link String#format(String, Object...)}, but supporting only
* {@code %s} as a placeholder.
*
* @param reference The object reference
* @param errorMessageTemplate The message template for the {@code NullPointerException}
* that is thrown if the check fails. The template substitutes its
* {@code %s} placeholders with the error message arguments.
* @param errorMessageArgs The arguments for the error message, to be inserted into the
* message template for the {@code %s} placeholders.
*
* @return The object reference itself (generically typed).
*
* @throws NullPointerException Thrown, if the passed reference was null.
*/
public static <T> T checkNotNull(T reference,
@Nullable String errorMessageTemplate,
@Nullable Object... errorMessageArgs) {
if (reference == null) {
throw new NullPointerException(format(errorMessageTemplate, errorMessageArgs));
}
return reference;
}
// ------------------------------------------------------------------------
// Boolean Condition Checking (Argument)
// ------------------------------------------------------------------------
/**
* Checks the given boolean condition, and throws an {@code IllegalArgumentException} if
* the condition is not met (evaluates to {@code false}).
*
* @param condition The condition to check
*
* @throws IllegalArgumentException Thrown, if the condition is violated.
*/
public static void checkArgument(boolean condition) {
if (!condition) {
throw new IllegalArgumentException();
}
}
/**
* Checks the given boolean condition, and throws an {@code IllegalArgumentException} if
* the condition is not met (evaluates to {@code false}). The exception will have the
* given error message.
*
* @param condition The condition to check
* @param errorMessage The message for the {@code IllegalArgumentException} that is thrown if the check fails.
*
* @throws IllegalArgumentException Thrown, if the condition is violated.
*/
public static void checkArgument(boolean condition, @Nullable Object errorMessage) {
if (!condition) {
throw new IllegalArgumentException(String.valueOf(errorMessage));
}
}
/**
* Checks the given boolean condition, and throws an {@code IllegalArgumentException} if
* the condition is not met (evaluates to {@code false}).
*
* @param condition The condition to check
* @param errorMessageTemplate The message template for the {@code IllegalArgumentException}
* that is thrown if the check fails. The template substitutes its
* {@code %s} placeholders with the error message arguments.
* @param errorMessageArgs The arguments for the error message, to be inserted into the
* message template for the {@code %s} placeholders.
*
* @throws IllegalArgumentException Thrown, if the condition is violated.
*/
public static void checkArgument(boolean condition,
@Nullable String errorMessageTemplate,
@Nullable Object... errorMessageArgs) {
if (!condition) {
throw new IllegalArgumentException(format(errorMessageTemplate, errorMessageArgs));
}
}
// ------------------------------------------------------------------------
// Boolean Condition Checking (State)
// ------------------------------------------------------------------------
/**
* Checks the given boolean condition, and throws an {@code IllegalStateException} if
* the condition is not met (evaluates to {@code false}).
*
* @param condition The condition to check
*
* @throws IllegalStateException Thrown, if the condition is violated.
*/
public static void checkState(boolean condition) {
if (!condition) {
throw new IllegalStateException();
}
}
/**
* Checks the given boolean condition, and throws an {@code IllegalStateException} if
* the condition is not met (evaluates to {@code false}). The exception will have the
* given error message.
*
* @param condition The condition to check
* @param errorMessage The message for the {@code IllegalStateException} that is thrown if the check fails.
*
* @throws IllegalStateException Thrown, if the condition is violated.
*/
public static void checkState(boolean condition, @Nullable Object errorMessage) {
if (!condition) {
throw new IllegalStateException(String.valueOf(errorMessage));
}
}
/**
* Checks the given boolean condition, and throws an {@code IllegalStateException} if
* the condition is not met (evaluates to {@code false}).
*
* @param condition The condition to check
* @param errorMessageTemplate The message template for the {@code IllegalStateException}
* that is thrown if the check fails. The template substitutes its
* {@code %s} placeholders with the error message arguments.
* @param errorMessageArgs The arguments for the error message, to be inserted into the
* message template for the {@code %s} placeholders.
*
* @throws IllegalStateException Thrown, if the condition is violated.
*/
public static void checkState(boolean condition,
@Nullable String errorMessageTemplate,
@Nullable Object... errorMessageArgs) {
if (!condition) {
throw new IllegalStateException(format(errorMessageTemplate, errorMessageArgs));
}
}
/**
* Ensures that the given index is valid for an array, list or string of the given size.
*
* @param index index to check
* @param size size of the array, list or string
*
* @throws IllegalArgumentException Thrown, if size is negative.
* @throws IndexOutOfBoundsException Thrown, if the index negative or greater than or equal to size
*/
public static void checkElementIndex(int index, int size) {
checkArgument(size >= 0, "Size was negative.");
if (index < 0 || index >= size) {
throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + size);
}
}
/**
* Ensures that the given index is valid for an array, list or string of the given size.
*
* @param index index to check
* @param size size of the array, list or string
* @param errorMessage The message for the {@code IndexOutOfBoundsException} that is thrown if the check fails.
*
* @throws IllegalArgumentException Thrown, if size is negative.
* @throws IndexOutOfBoundsException Thrown, if the index negative or greater than or equal to size
*/
public static void checkElementIndex(int index, int size, @Nullable String errorMessage) {
checkArgument(size >= 0, "Size was negative.");
if (index < 0 || index >= size) {
throw new IndexOutOfBoundsException(String.valueOf(errorMessage) + " Index: " + index + ", Size: " + size);
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
/**
* A simplified formatting method. Similar to {@link String#format(String, Object...)}, but
* with lower overhead (only String parameters, no locale, no format validation).
*
* <p>This method is taken quasi verbatim from the Guava Preconditions class.
*/
private static String format(@Nullable String template, @Nullable Object... args) {
final int numArgs = args == null ? 0 : args.length;
template = String.valueOf(template); // null -> "null"
// start substituting the arguments into the '%s' placeholders
StringBuilder builder = new StringBuilder(template.length() + 16 * numArgs);
int templateStart = 0;
int i = 0;
while (i < numArgs) {
int placeholderStart = template.indexOf("%s", templateStart);
if (placeholderStart == -1) {
break;
}
builder.append(template.substring(templateStart, placeholderStart));
builder.append(args[i++]);
templateStart = placeholderStart + 2;
}
builder.append(template.substring(templateStart));
// if we run out of placeholders, append the extra args in square braces
if (i < numArgs) {
builder.append(" [");
builder.append(args[i++]);
while (i < numArgs) {
builder.append(", ");
builder.append(args[i++]);
}
builder.append(']');
}
return builder.toString();
}
// ------------------------------------------------------------------------
/** Private constructor to prevent instantiation. */
private Preconditions() {}
}

35
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractListener.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.zk;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
public abstract class AbstractListener implements TreeCacheListener {
@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
String path = null == event.getData() ? "" : event.getData().getPath();
if (path.isEmpty()) {
return;
}
dataChanged(client, event, path);
}
protected abstract void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path);
}

121
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java

@ -47,98 +47,15 @@ import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* abstract zookeeper client * abstract zookeeper client
*/ */
public abstract class AbstractZKClient { public abstract class AbstractZKClient extends ZookeeperCachedOperator{
private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
/**
* load configuration file
*/
protected static Configuration conf;
protected CuratorFramework zkClient = null;
/** /**
* server stop or not * server stop or not
*/ */
protected IStoppable stoppable = null; protected IStoppable stoppable = null;
static {
try {
conf = new PropertiesConfiguration(Constants.ZOOKEEPER_PROPERTIES_PATH);
}catch (ConfigurationException e){
logger.error("load configuration failed : " + e.getMessage(),e);
System.exit(1);
}
}
public AbstractZKClient() {
// retry strategy
RetryPolicy retryPolicy = new ExponentialBackoffRetry(
conf.getInt(Constants.ZOOKEEPER_RETRY_SLEEP),
conf.getInt(Constants.ZOOKEEPER_RETRY_MAXTIME));
try{
// crate zookeeper client
zkClient = CuratorFrameworkFactory.builder()
.connectString(getZookeeperQuorum())
.retryPolicy(retryPolicy)
.sessionTimeoutMs(1000 * conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT))
.connectionTimeoutMs(1000 * conf.getInt(Constants.ZOOKEEPER_CONNECTION_TIMEOUT))
.build();
zkClient.start();
initStateLister();
}catch(Exception e){
logger.error("create zookeeper connect failed : " + e.getMessage(),e);
System.exit(-1);
}
}
/**
*
* register status monitoring events for zookeeper clients
*/
public void initStateLister(){
if(zkClient == null) {
return;
}
// add ConnectionStateListener monitoring zookeeper connection state
ConnectionStateListener csLister = new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
logger.info("state changed , current state : " + newState.name());
/**
* probably session expired
*/
if(newState == ConnectionState.LOST){
// if lost , then exit
logger.info("current zookeepr connection state : connection lost ");
}
}
};
zkClient.getConnectionStateListenable().addListener(csLister);
}
public void start() {
zkClient.start();
logger.info("zookeeper start ...");
}
public void close() {
zkClient.getZookeeperClient().close();
zkClient.close();
logger.info("zookeeper close ...");
}
/** /**
* heartbeat for zookeeper * heartbeat for zookeeper
* @param znode zookeeper node * @param znode zookeeper node
@ -328,18 +245,8 @@ public abstract class AbstractZKClient {
* *
* @return zookeeper quorum * @return zookeeper quorum
*/ */
public static String getZookeeperQuorum(){ public String getZookeeperQuorum(){
StringBuilder sb = new StringBuilder(); return getZookeeperConfig().getServerList();
String[] zookeeperParamslist = conf.getStringArray(Constants.ZOOKEEPER_QUORUM);
for (String param : zookeeperParamslist) {
sb.append(param).append(Constants.COMMA);
}
if(sb.length() > 0){
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
} }
/** /**
@ -420,7 +327,7 @@ public abstract class AbstractZKClient {
* @return get worker node parent path * @return get worker node parent path
*/ */
protected String getWorkerZNodeParentPath(){ protected String getWorkerZNodeParentPath(){
return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS); return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
} }
/** /**
@ -428,7 +335,7 @@ public abstract class AbstractZKClient {
* @return get master node parent path * @return get master node parent path
*/ */
protected String getMasterZNodeParentPath(){ protected String getMasterZNodeParentPath(){
return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS); return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
} }
/** /**
@ -436,7 +343,15 @@ public abstract class AbstractZKClient {
* @return get master lock path * @return get master lock path
*/ */
public String getMasterLockPath(){ public String getMasterLockPath(){
return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS); return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
}
/**
*
* @return get master lock path
*/
public String getWorkerLockPath(){
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS;
} }
/** /**
@ -464,7 +379,7 @@ public abstract class AbstractZKClient {
* @return get dead server node parent path * @return get dead server node parent path
*/ */
protected String getDeadZNodeParentPath(){ protected String getDeadZNodeParentPath(){
return conf.getString(ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS); return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
} }
/** /**
@ -472,7 +387,7 @@ public abstract class AbstractZKClient {
* @return get master start up lock path * @return get master start up lock path
*/ */
public String getMasterStartUpLockPath(){ public String getMasterStartUpLockPath(){
return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS); return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
} }
/** /**
@ -480,7 +395,7 @@ public abstract class AbstractZKClient {
* @return get master failover lock path * @return get master failover lock path
*/ */
public String getMasterFailoverLockPath(){ public String getMasterFailoverLockPath(){
return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS); return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
} }
/** /**
@ -488,7 +403,7 @@ public abstract class AbstractZKClient {
* @return get worker failover lock path * @return get worker failover lock path
*/ */
public String getWorkerFailoverLockPath(){ public String getWorkerFailoverLockPath(){
return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS); return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
} }
/** /**

48
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/DefaultEnsembleProvider.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.zk;
import org.apache.curator.ensemble.EnsembleProvider;
import java.io.IOException;
/**
* default conf provider
*/
public class DefaultEnsembleProvider implements EnsembleProvider {
private final String serverList;
public DefaultEnsembleProvider(String serverList){
this.serverList = serverList;
}
@Override
public void start() throws Exception {
//NOP
}
@Override
public String getConnectionString() {
return serverList;
}
@Override
public void close() throws IOException {
//NOP
}
}

82
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.zk;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dolphinscheduler.common.utils.Preconditions.*;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
@Component
public class ZookeeperCachedOperator extends ZookeeperOperator {
private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class);
//kay is zk path, value is TreeCache
private ConcurrentHashMap<String, TreeCache> allCaches = new ConcurrentHashMap<>();
/**
* @param cachePath zk path
* @param listener operator
*/
public void registerListener(final String cachePath, final TreeCacheListener listener) {
TreeCache newCache = new TreeCache(zkClient, cachePath);
logger.info("add listener to zk path: {}", cachePath);
try {
newCache.start();
} catch (Exception e) {
logger.error("add listener to zk path: {} failed", cachePath);
throw new RuntimeException(e);
}
newCache.getListenable().addListener(listener);
allCaches.put(cachePath, newCache);
}
public String getFromCache(final String cachePath, final String key) {
ChildData resultInCache = allCaches.get(checkNotNull(cachePath)).getCurrentData(key);
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8);
}
return null;
}
public TreeCache getTreeCache(final String cachePath) {
return allCaches.get(checkNotNull(cachePath));
}
public void close() {
allCaches.forEach((path, cache) -> {
cache.close();
try {
Thread.sleep(500);
} catch (InterruptedException ignore) {
}
});
super.close();
}
}

71
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperConfig.java

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.zk;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.configuration.Configuration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
/**
* zookeeper conf
*/
@Component
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@PropertySource("classpath:zookeeper.properties")
public class ZookeeperConfig {
//zk connect config
@Value("${zookeeper.quorum}")
private String serverList;
@Value("${zookeeper.retry.base.sleep:100}")
private int baseSleepTimeMs;
@Value("${zookeeper.retry.max.sleep:30000}")
private int maxSleepMs;
@Value("${zookeeper.retry.maxtime:10}")
private int maxRetries;
@Value("${zookeeper.session.timeout:60000}")
private int sessionTimeoutMs;
@Value("${zookeeper.connection.timeout:30000}")
private int connectionTimeoutMs;
@Value("${zookeeper.connection.digest: }")
private String digest;
//ds scheduler install config
@Value("${zookeeper.dolphinscheduler.root:/dolphinscheduler}")
private String dsRoot;
public static ZookeeperConfig getFromConf(Configuration conf){
return ZookeeperConfig.builder().serverList(conf.getString("zookeeper.quorum")).baseSleepTimeMs(conf.getInt("zookeeper.retry.base.sleep"))
.maxSleepMs(conf.getInt("zookeeper.retry.max.sleep")).maxRetries(conf.getInt("zookeeper.retry.maxtime"))
.sessionTimeoutMs(conf.getInt("zookeeper.session.timeout")).connectionTimeoutMs(conf.getInt("zookeeper.connection.timeout"))
.dsRoot(conf.getString("zookeeper.dolphinscheduler.root")).build();
}
}

232
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.zk;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.nio.charset.StandardCharsets;
import java.util.List;
import static org.apache.dolphinscheduler.common.utils.Preconditions.*;
import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull;
/**
* zk base operator
*/
@Component
public class ZookeeperOperator implements InitializingBean {
private final Logger logger = LoggerFactory.getLogger(ZookeeperOperator.class);
@Autowired
private ZookeeperConfig zookeeperConfig;
protected CuratorFramework zkClient;
@Override
public void afterPropertiesSet() throws Exception {
this.zkClient = buildClient();
initStateLister();
//init();
}
//for subclass
//protected void init(){}
public void initStateLister() {
checkNotNull(zkClient);
zkClient.getConnectionStateListenable().addListener((client, newState) -> {
if(newState == ConnectionState.LOST){
logger.error("connection lost from zookeeper");
} else if(newState == ConnectionState.RECONNECTED){
logger.info("reconnected to zookeeper");
} else if(newState == ConnectionState.SUSPENDED){
logger.warn("connection SUSPENDED to zookeeper");
}
});
}
private CuratorFramework buildClient() {
logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList());
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null")))
.retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs()));
//these has default value
if (0 != zookeeperConfig.getSessionTimeoutMs()) {
builder.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs());
}
if (0 != zookeeperConfig.getConnectionTimeoutMs()) {
builder.connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs());
}
if (StringUtils.isNotBlank(zookeeperConfig.getDigest())) {
builder.authorization("digest", zookeeperConfig.getDigest().getBytes(StandardCharsets.UTF_8)).aclProvider(new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
@Override
public List<ACL> getAclForPath(final String path) {
return ZooDefs.Ids.CREATOR_ALL_ACL;
}
});
}
zkClient = builder.build();
zkClient.start();
try {
zkClient.blockUntilConnected();
} catch (final Exception ex) {
throw new RuntimeException(ex);
}
return zkClient;
}
public String get(final String key) {
try {
return new String(zkClient.getData().forPath(key), StandardCharsets.UTF_8);
} catch (Exception ex) {
logger.error("get key : {}", key, ex);
}
return null;
}
public List<String> getChildrenKeys(final String key) {
List<String> values;
try {
values = zkClient.getChildren().forPath(key);
if (CollectionUtils.isEmpty(values)) {
logger.warn("getChildrenKeys key : {} is empty", key);
}
return values;
} catch (InterruptedException ex) {
logger.error("getChildrenKeys key : {} InterruptedException", key);
throw new IllegalStateException(ex);
} catch (Exception ex) {
logger.error("getChildrenKeys key : {}", key, ex);
throw new RuntimeException(ex);
}
}
public boolean isExisted(final String key) {
try {
return zkClient.checkExists().forPath(key) != null;
} catch (Exception ex) {
logger.error("isExisted key : {}", key, ex);
}
return false;
}
public void persist(final String key, final String value) {
try {
if (!isExisted(key)) {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8));
} else {
update(key, value);
}
} catch (Exception ex) {
logger.error("persist key : {} , value : {}", key, value, ex);
}
}
public void update(final String key, final String value) {
try {
zkClient.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)).and().commit();
} catch (Exception ex) {
logger.error("update key : {} , value : {}", key, value, ex);
}
}
public void persistEphemeral(final String key, final String value) {
try {
if (isExisted(key)) {
try {
zkClient.delete().deletingChildrenIfNeeded().forPath(key);
} catch (KeeperException.NoNodeException ignore) {
//NOP
}
}
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
} catch (final Exception ex) {
logger.error("persistEphemeral key : {} , value : {}", key, value, ex);
}
}
public void persistEphemeral(String key, String value, boolean overwrite) {
try {
if (overwrite) {
persistEphemeral(key, value);
} else {
if (!isExisted(key)) {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
}
}
} catch (final Exception ex) {
logger.error("persistEphemeral key : {} , value : {}, overwrite : {}", key, value, overwrite, ex);
}
}
public void persistEphemeralSequential(final String key, String value) {
try {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8));
} catch (final Exception ex) {
logger.error("persistEphemeralSequential key : {}", key, ex);
}
}
public void remove(final String key) {
try {
if (isExisted(key)) {
zkClient.delete().deletingChildrenIfNeeded().forPath(key);
}
} catch (KeeperException.NoNodeException ignore) {
//NOP
} catch (final Exception ex) {
logger.error("remove key : {}", key, ex);
}
}
public CuratorFramework getZkClient() {
return zkClient;
}
public ZookeeperConfig getZookeeperConfig() {
return zookeeperConfig;
}
public void close() {
CloseableUtils.closeQuietly(zkClient);
}
}

19
dolphinscheduler-common/src/main/resources/zookeeper.properties

@ -22,21 +22,22 @@ zookeeper.quorum=localhost:2181
zookeeper.dolphinscheduler.root=/dolphinscheduler zookeeper.dolphinscheduler.root=/dolphinscheduler
#zookeeper server dirctory #zookeeper server dirctory
zookeeper.dolphinscheduler.dead.servers=/dolphinscheduler/dead-servers #zookeeper.dolphinscheduler.dead.servers=/dolphinscheduler/dead-servers
zookeeper.dolphinscheduler.masters=/dolphinscheduler/masters #zookeeper.dolphinscheduler.masters=/dolphinscheduler/masters
zookeeper.dolphinscheduler.workers=/dolphinscheduler/workers #zookeeper.dolphinscheduler.workers=/dolphinscheduler/workers
#zookeeper lock dirctory #zookeeper lock dirctory
zookeeper.dolphinscheduler.lock.masters=/dolphinscheduler/lock/masters #zookeeper.dolphinscheduler.lock.masters=/dolphinscheduler/lock/masters
zookeeper.dolphinscheduler.lock.workers=/dolphinscheduler/lock/workers #zookeeper.dolphinscheduler.lock.workers=/dolphinscheduler/lock/workers
#dolphinscheduler failover directory #dolphinscheduler failover directory
zookeeper.dolphinscheduler.lock.failover.masters=/dolphinscheduler/lock/failover/masters #zookeeper.dolphinscheduler.lock.failover.masters=/dolphinscheduler/lock/failover/masters
zookeeper.dolphinscheduler.lock.failover.workers=/dolphinscheduler/lock/failover/workers #zookeeper.dolphinscheduler.lock.failover.workers=/dolphinscheduler/lock/failover/workers
zookeeper.dolphinscheduler.lock.failover.startup.masters=/dolphinscheduler/lock/failover/startup-masters #zookeeper.dolphinscheduler.lock.failover.startup.masters=/dolphinscheduler/lock/failover/startup-masters
#dolphinscheduler failover directory #dolphinscheduler failover directory
zookeeper.session.timeout=300 zookeeper.session.timeout=300
zookeeper.connection.timeout=300 zookeeper.connection.timeout=300
zookeeper.retry.sleep=1000 zookeeper.retry.base.sleep=100
zookeeper.retry.max.sleep=30000
zookeeper.retry.maxtime=5 zookeeper.retry.maxtime=5

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -58,6 +58,7 @@ public class MasterServer implements IStoppable {
/** /**
* zk master client * zk master client
*/ */
@Autowired
private ZKMasterClient zkMasterClient = null; private ZKMasterClient zkMasterClient = null;
/** /**
@ -105,11 +106,10 @@ public class MasterServer implements IStoppable {
*/ */
@PostConstruct @PostConstruct
public void run(){ public void run(){
zkMasterClient.init();
masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread");
zkMasterClient = ZKMasterClient.getZKMasterClient(processDao);
heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.defaulMasterHeartbeatThreadNum); heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.defaulMasterHeartbeatThreadNum);
// heartbeat thread implement // heartbeat thread implement

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -68,6 +68,7 @@ public class WorkerServer implements IStoppable {
/** /**
* zk worker client * zk worker client
*/ */
@Autowired
private ZKWorkerClient zkWorkerClient = null; private ZKWorkerClient zkWorkerClient = null;
@ -137,8 +138,7 @@ public class WorkerServer implements IStoppable {
*/ */
@PostConstruct @PostConstruct
public void run(){ public void run(){
zkWorkerClient.init();
zkWorkerClient = ZKWorkerClient.getZKWorkerClient();
this.taskQueue = TaskQueueFactory.getTaskQueueInstance(); this.taskQueue = TaskQueueFactory.getTaskQueueInstance();

148
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -16,10 +16,12 @@
*/ */
package org.apache.dolphinscheduler.server.zk; package org.apache.dolphinscheduler.server.zk;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.zk.AbstractListener;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient; import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory; import org.apache.dolphinscheduler.dao.DaoFactory;
@ -36,6 +38,8 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
@ -47,6 +51,7 @@ import java.util.concurrent.ThreadFactory;
* *
* single instance * single instance
*/ */
@Component
public class ZKMasterClient extends AbstractZKClient { public class ZKMasterClient extends AbstractZKClient {
/** /**
@ -71,53 +76,14 @@ public class ZKMasterClient extends AbstractZKClient {
/** /**
* flow database access * flow database access
*/ */
@Autowired
private ProcessDao processDao; private ProcessDao processDao;
/**
* zkMasterClient
*/
private static ZKMasterClient zkMasterClient = null;
/**
* master path children cache
*/
private PathChildrenCache masterPathChildrenCache;
/**
* worker path children cache
*/
private PathChildrenCache workerPathChildrenCache;
/**
* constructor
*
* @param processDao process dao
*/
private ZKMasterClient(ProcessDao processDao){
this.processDao = processDao;
init();
}
/** /**
* default constructor * default constructor
*/ */
private ZKMasterClient(){} private ZKMasterClient(){}
/**
* get zkMasterClient
*
* @param processDao process dao
* @return ZKMasterClient zookeeper master client
*/
public static synchronized ZKMasterClient getZKMasterClient(ProcessDao processDao){
if(zkMasterClient == null){
zkMasterClient = new ZKMasterClient(processDao);
}
zkMasterClient.processDao = processDao;
return zkMasterClient;
}
/** /**
* init * init
*/ */
@ -157,22 +123,6 @@ public class ZKMasterClient extends AbstractZKClient {
} }
} }
@Override
public void close(){
try {
if(masterPathChildrenCache != null){
masterPathChildrenCache.close();
}
if(workerPathChildrenCache != null){
workerPathChildrenCache.close();
}
super.close();
} catch (Exception ignore) {
}
}
/** /**
* init dao * init dao
@ -209,41 +159,29 @@ public class ZKMasterClient extends AbstractZKClient {
} }
/** /**
* monitor master * monitor master
*/ */
public void listenerMaster(){ public void listenerMaster(){
masterPathChildrenCache = new PathChildrenCache(zkClient, registerListener(getZNodeParentPath(ZKNodeType.MASTER), new AbstractListener() {
getZNodeParentPath(ZKNodeType.MASTER), true ,defaultThreadFactory); @Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
try { switch (event.getType()) {
masterPathChildrenCache.start(); case NODE_ADDED:
masterPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { logger.info("master node added : {}", path);
@Override break;
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { case NODE_REMOVED:
switch (event.getType()) { String serverHost = getHostByEventDataPath(path);
case CHILD_ADDED: if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) {
logger.info("master node added : {}",event.getData().getPath()); return;
break; }
case CHILD_REMOVED: removeZKNodePath(path, ZKNodeType.MASTER, true);
String path = event.getData().getPath(); break;
String serverHost = getHostByEventDataPath(path); default:
if(checkServerSelfDead(serverHost, ZKNodeType.MASTER)){ break;
return;
}
removeZKNodePath(path, ZKNodeType.MASTER, true);
break;
case CHILD_UPDATED:
break;
default:
break;
}
} }
}); }
}catch (Exception e){ });
logger.error("monitor master failed : " + e.getMessage(),e);
}
} }
/** /**
@ -338,30 +276,22 @@ public class ZKMasterClient extends AbstractZKClient {
* monitor worker * monitor worker
*/ */
public void listenerWorker(){ public void listenerWorker(){
workerPathChildrenCache = new PathChildrenCache(zkClient, registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() {
getZNodeParentPath(ZKNodeType.WORKER),true ,defaultThreadFactory); @Override
try { protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
workerPathChildrenCache.start(); switch (event.getType()) {
workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { case NODE_ADDED:
@Override logger.info("worker node added : {}", path);
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { break;
switch (event.getType()) { case NODE_REMOVED:
case CHILD_ADDED: logger.info("worker node deleted : {}", path);
logger.info("node added : {}" ,event.getData().getPath()); removeZKNodePath(path, ZKNodeType.WORKER, true);
break; break;
case CHILD_REMOVED: default:
String path = event.getData().getPath(); break;
logger.info("node deleted : {}",event.getData().getPath());
removeZKNodePath(path, ZKNodeType.WORKER, true);
break;
default:
break;
}
} }
}); }
}catch (Exception e){ });
logger.error("listener worker failed : " + e.getMessage(),e);
}
} }

104
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java

@ -16,8 +16,10 @@
*/ */
package org.apache.dolphinscheduler.server.zk; package org.apache.dolphinscheduler.server.zk;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.zk.AbstractListener;
import org.apache.dolphinscheduler.common.zk.AbstractZKClient; import org.apache.dolphinscheduler.common.zk.AbstractZKClient;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -27,6 +29,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -35,6 +38,7 @@ import java.util.concurrent.ThreadFactory;
* zookeeper worker client * zookeeper worker client
* single instance * single instance
*/ */
@Component
public class ZKWorkerClient extends AbstractZKClient { public class ZKWorkerClient extends AbstractZKClient {
/** /**
@ -42,11 +46,6 @@ public class ZKWorkerClient extends AbstractZKClient {
*/ */
private static final Logger logger = LoggerFactory.getLogger(ZKWorkerClient.class); private static final Logger logger = LoggerFactory.getLogger(ZKWorkerClient.class);
/**
* thread factory
*/
private static final ThreadFactory defaultThreadFactory = ThreadUtils.newGenericThreadFactory("Worker-Main-Thread");
/** /**
* worker znode * worker znode
@ -54,24 +53,10 @@ public class ZKWorkerClient extends AbstractZKClient {
private String workerZNode = null; private String workerZNode = null;
/**
* zookeeper worker client
*/
private static ZKWorkerClient zkWorkerClient = null;
/**
* worker path children cache
*/
private PathChildrenCache workerPathChildrenCache;
private ZKWorkerClient(){
init();
}
/** /**
* init * init
*/ */
private void init(){ public void init(){
// init system znode // init system znode
this.initSystemZNode(); this.initSystemZNode();
@ -83,31 +68,6 @@ public class ZKWorkerClient extends AbstractZKClient {
this.registWorker(); this.registWorker();
} }
@Override
public void close(){
try {
if(workerPathChildrenCache != null){
workerPathChildrenCache.close();
}
super.close();
} catch (Exception ignore) {
}
}
/**
* get zookeeper worker client
*
* @return ZKWorkerClient
*/
public static synchronized ZKWorkerClient getZKWorkerClient(){
if(zkWorkerClient == null){
zkWorkerClient = new ZKWorkerClient();
}
return zkWorkerClient;
}
/** /**
* register worker * register worker
*/ */
@ -128,34 +88,25 @@ public class ZKWorkerClient extends AbstractZKClient {
* monitor worker * monitor worker
*/ */
private void listenerWorker(){ private void listenerWorker(){
workerPathChildrenCache = new PathChildrenCache(zkClient, getZNodeParentPath(ZKNodeType.WORKER), true, defaultThreadFactory); registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() {
try { @Override
workerPathChildrenCache.start(); protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { switch (event.getType()) {
@Override case NODE_ADDED:
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { logger.info("worker node added : {}", path);
switch (event.getType()) { break;
case CHILD_ADDED: case NODE_REMOVED:
logger.info("node added : {}" ,event.getData().getPath()); //find myself dead
break; String serverHost = getHostByEventDataPath(path);
case CHILD_REMOVED: if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
String path = event.getData().getPath(); return;
//find myself dead }
String serverHost = getHostByEventDataPath(path); break;
if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){ default:
return; break;
}
break;
case CHILD_UPDATED:
break;
default:
break;
}
} }
}); }
}catch (Exception e){ });
logger.error("monitor worker failed : " + e.getMessage(),e);
}
} }
@ -167,13 +118,4 @@ public class ZKWorkerClient extends AbstractZKClient {
return workerZNode; return workerZNode;
} }
/**
* get worker lock path
* @return worker lock path
*/
public String getWorkerLockPath(){
return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS);
}
} }

Loading…
Cancel
Save