diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java index 68f3f12f6c..91e58ef6cc 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java @@ -39,6 +39,8 @@ import java.util.Map; @Service public class MonitorService extends BaseService{ + @Autowired + private ZookeeperMonitor zookeeperMonitor; @Autowired private MonitorDBDao monitorDBDao; @@ -86,7 +88,7 @@ public class MonitorService extends BaseService{ public Map queryZookeeperState(User loginUser) { Map result = new HashMap<>(5); - List zookeeperRecordList = ZookeeperMonitor.zookeeperInfoList(); + List zookeeperRecordList = zookeeperMonitor.zookeeperInfoList(); result.put(Constants.DATA_LIST, zookeeperRecordList); putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java index a6a47b2ce3..040d00ee2c 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Date; @@ -32,17 +33,17 @@ import java.util.List; /** * monitor zookeeper info */ +@Component public class ZookeeperMonitor extends AbstractZKClient{ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class); - private static final String zookeeperList = AbstractZKClient.getZookeeperQuorum(); /** * * @return zookeeper info list */ - public static List zookeeperInfoList(){ - String zookeeperServers = zookeeperList.replaceAll("[\\t\\n\\x0B\\f\\r]", ""); + public List zookeeperInfoList(){ + String zookeeperServers = getZookeeperQuorum().replaceAll("[\\t\\n\\x0B\\f\\r]", ""); try{ return zookeeperInfoList(zookeeperServers); }catch(Exception e){ diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index 1481190f9e..1bfac571ba 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -611,5 +611,10 @@ ${lombok.version} compile + + + org.springframework + spring-context + diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index c675ad55bd..339cb6c548 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -139,42 +139,50 @@ public final class Constants { /** * MasterServer directory registered in zookeeper */ - public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "zookeeper.dolphinscheduler.masters"; + //public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "zookeeper.dolphinscheduler.masters"; + public static final String ZOOKEEPER_DOLPHINSCHEDULER_MASTERS = "/masters"; /** * WorkerServer directory registered in zookeeper */ - public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "zookeeper.dolphinscheduler.workers"; + //public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "zookeeper.dolphinscheduler.workers"; + public static final String ZOOKEEPER_DOLPHINSCHEDULER_WORKERS = "/workers"; /** * all servers directory registered in zookeeper */ - public static final String ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS = "zookeeper.dolphinscheduler.dead.servers"; + //public static final String ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS = "zookeeper.dolphinscheduler.dead.servers"; + public static final String ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS = "/dead-servers"; /** * MasterServer lock directory registered in zookeeper */ - public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = "zookeeper.dolphinscheduler.lock.masters"; + //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = "zookeeper.dolphinscheduler.lock.masters"; + public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS = "/lock/masters"; /** * WorkerServer lock directory registered in zookeeper */ - public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS = "zookeeper.dolphinscheduler.lock.workers"; + //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS = "zookeeper.dolphinscheduler.lock.workers"; + public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS = "/lock/workers"; /** * MasterServer failover directory registered in zookeeper */ - public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "zookeeper.dolphinscheduler.lock.failover.masters"; + //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "zookeeper.dolphinscheduler.lock.failover.masters"; + public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters"; /** * WorkerServer failover directory registered in zookeeper */ - public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "zookeeper.dolphinscheduler.lock.failover.workers"; + //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "zookeeper.dolphinscheduler.lock.failover.workers"; + public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers"; /** * MasterServer startup failover runing and fault tolerance process */ - public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "zookeeper.dolphinscheduler.lock.failover.startup.masters"; + //public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "zookeeper.dolphinscheduler.lock.failover.startup.masters"; + public static final String ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters"; /** * need send warn times when master server or worker server failover diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java index d2096f6cd1..9bc7359174 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/queue/TaskQueueZkImpl.java @@ -26,27 +26,44 @@ import java.util.List; import java.util.Set; import java.util.TreeSet; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.Bytes; import org.apache.dolphinscheduler.common.utils.IpUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.zk.AbstractZKClient; import org.apache.curator.framework.CuratorFramework; +import org.apache.dolphinscheduler.common.zk.DefaultEnsembleProvider; +import org.apache.dolphinscheduler.common.zk.ZookeeperConfig; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; + /** * A singleton of a task queue implemented with zookeeper * tasks queue implemention */ -public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { +public class TaskQueueZkImpl implements ITaskQueue { private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class); private static volatile TaskQueueZkImpl instance; + private CuratorFramework zkClient; + + private ZookeeperConfig zookeeperConfig; + + private CuratorFramework getZkClient() { + return zkClient; + } + private TaskQueueZkImpl(){ init(); } @@ -376,6 +393,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * Init the task queue of zookeeper node */ private void init(){ + initZkClient(); try { String tasksQueuePath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_QUEUE); String tasksCancelPath = getTasksPath(Constants.DOLPHINSCHEDULER_TASKS_KILL); @@ -394,6 +412,31 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { } } + private void initZkClient() { + + Configuration conf = null; + try { + conf = new PropertiesConfiguration(Constants.ZOOKEEPER_PROPERTIES_PATH); + } catch (ConfigurationException ex) { + logger.error("load zookeeper properties file failed, system exit"); + System.exit(-1); + } + zookeeperConfig = ZookeeperConfig.getFromConf(conf); + + zkClient = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(), "zookeeper quorum can't be null"))) + .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs())) + .sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs()) + .connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs()) + .build(); + + zkClient.start(); + try { + zkClient.blockUntilConnected(); + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + } + /** * Clear the task queue of zookeeper node @@ -429,7 +472,7 @@ public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue { * @return */ public String getTasksPath(String key){ - return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_ROOT) + Constants.SINGLE_SLASH + key; + return zookeeperConfig.getDsRoot() + Constants.SINGLE_SLASH + key; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java new file mode 100644 index 0000000000..92337f5de6 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/Preconditions.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.utils; + +import org.springframework.lang.Nullable; + +/** + * A collection of static utility methods to validate input. + * + *

This class is modelled after Google Guava's Preconditions class, and partly takes code + * from that class. We add this code to here base in order to reduce external + * dependencies. + */ +public final class Preconditions { + + // ------------------------------------------------------------------------ + // Null checks + // ------------------------------------------------------------------------ + + /** + * Ensures that the given object reference is not null. + * Upon violation, a {@code NullPointerException} with no message is thrown. + * + * @param reference The object reference + * @return The object reference itself (generically typed). + * + * @throws NullPointerException Thrown, if the passed reference was null. + */ + public static T checkNotNull(T reference) { + if (reference == null) { + throw new NullPointerException(); + } + return reference; + } + + /** + * Ensures that the given object reference is not null. + * Upon violation, a {@code NullPointerException} with the given message is thrown. + * + * @param reference The object reference + * @param errorMessage The message for the {@code NullPointerException} that is thrown if the check fails. + * @return The object reference itself (generically typed). + * + * @throws NullPointerException Thrown, if the passed reference was null. + */ + public static T checkNotNull(T reference, @Nullable String errorMessage) { + if (reference == null) { + throw new NullPointerException(String.valueOf(errorMessage)); + } + return reference; + } + + /** + * Ensures that the given object reference is not null. + * Upon violation, a {@code NullPointerException} with the given message is thrown. + * + *

The error message is constructed from a template and an arguments array, after + * a similar fashion as {@link String#format(String, Object...)}, but supporting only + * {@code %s} as a placeholder. + * + * @param reference The object reference + * @param errorMessageTemplate The message template for the {@code NullPointerException} + * that is thrown if the check fails. The template substitutes its + * {@code %s} placeholders with the error message arguments. + * @param errorMessageArgs The arguments for the error message, to be inserted into the + * message template for the {@code %s} placeholders. + * + * @return The object reference itself (generically typed). + * + * @throws NullPointerException Thrown, if the passed reference was null. + */ + public static T checkNotNull(T reference, + @Nullable String errorMessageTemplate, + @Nullable Object... errorMessageArgs) { + + if (reference == null) { + throw new NullPointerException(format(errorMessageTemplate, errorMessageArgs)); + } + return reference; + } + + // ------------------------------------------------------------------------ + // Boolean Condition Checking (Argument) + // ------------------------------------------------------------------------ + + /** + * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if + * the condition is not met (evaluates to {@code false}). + * + * @param condition The condition to check + * + * @throws IllegalArgumentException Thrown, if the condition is violated. + */ + public static void checkArgument(boolean condition) { + if (!condition) { + throw new IllegalArgumentException(); + } + } + + /** + * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if + * the condition is not met (evaluates to {@code false}). The exception will have the + * given error message. + * + * @param condition The condition to check + * @param errorMessage The message for the {@code IllegalArgumentException} that is thrown if the check fails. + * + * @throws IllegalArgumentException Thrown, if the condition is violated. + */ + public static void checkArgument(boolean condition, @Nullable Object errorMessage) { + if (!condition) { + throw new IllegalArgumentException(String.valueOf(errorMessage)); + } + } + + /** + * Checks the given boolean condition, and throws an {@code IllegalArgumentException} if + * the condition is not met (evaluates to {@code false}). + * + * @param condition The condition to check + * @param errorMessageTemplate The message template for the {@code IllegalArgumentException} + * that is thrown if the check fails. The template substitutes its + * {@code %s} placeholders with the error message arguments. + * @param errorMessageArgs The arguments for the error message, to be inserted into the + * message template for the {@code %s} placeholders. + * + * @throws IllegalArgumentException Thrown, if the condition is violated. + */ + public static void checkArgument(boolean condition, + @Nullable String errorMessageTemplate, + @Nullable Object... errorMessageArgs) { + + if (!condition) { + throw new IllegalArgumentException(format(errorMessageTemplate, errorMessageArgs)); + } + } + + // ------------------------------------------------------------------------ + // Boolean Condition Checking (State) + // ------------------------------------------------------------------------ + + /** + * Checks the given boolean condition, and throws an {@code IllegalStateException} if + * the condition is not met (evaluates to {@code false}). + * + * @param condition The condition to check + * + * @throws IllegalStateException Thrown, if the condition is violated. + */ + public static void checkState(boolean condition) { + if (!condition) { + throw new IllegalStateException(); + } + } + + /** + * Checks the given boolean condition, and throws an {@code IllegalStateException} if + * the condition is not met (evaluates to {@code false}). The exception will have the + * given error message. + * + * @param condition The condition to check + * @param errorMessage The message for the {@code IllegalStateException} that is thrown if the check fails. + * + * @throws IllegalStateException Thrown, if the condition is violated. + */ + public static void checkState(boolean condition, @Nullable Object errorMessage) { + if (!condition) { + throw new IllegalStateException(String.valueOf(errorMessage)); + } + } + + /** + * Checks the given boolean condition, and throws an {@code IllegalStateException} if + * the condition is not met (evaluates to {@code false}). + * + * @param condition The condition to check + * @param errorMessageTemplate The message template for the {@code IllegalStateException} + * that is thrown if the check fails. The template substitutes its + * {@code %s} placeholders with the error message arguments. + * @param errorMessageArgs The arguments for the error message, to be inserted into the + * message template for the {@code %s} placeholders. + * + * @throws IllegalStateException Thrown, if the condition is violated. + */ + public static void checkState(boolean condition, + @Nullable String errorMessageTemplate, + @Nullable Object... errorMessageArgs) { + + if (!condition) { + throw new IllegalStateException(format(errorMessageTemplate, errorMessageArgs)); + } + } + + /** + * Ensures that the given index is valid for an array, list or string of the given size. + * + * @param index index to check + * @param size size of the array, list or string + * + * @throws IllegalArgumentException Thrown, if size is negative. + * @throws IndexOutOfBoundsException Thrown, if the index negative or greater than or equal to size + */ + public static void checkElementIndex(int index, int size) { + checkArgument(size >= 0, "Size was negative."); + if (index < 0 || index >= size) { + throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + size); + } + } + + /** + * Ensures that the given index is valid for an array, list or string of the given size. + * + * @param index index to check + * @param size size of the array, list or string + * @param errorMessage The message for the {@code IndexOutOfBoundsException} that is thrown if the check fails. + * + * @throws IllegalArgumentException Thrown, if size is negative. + * @throws IndexOutOfBoundsException Thrown, if the index negative or greater than or equal to size + */ + public static void checkElementIndex(int index, int size, @Nullable String errorMessage) { + checkArgument(size >= 0, "Size was negative."); + if (index < 0 || index >= size) { + throw new IndexOutOfBoundsException(String.valueOf(errorMessage) + " Index: " + index + ", Size: " + size); + } + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + /** + * A simplified formatting method. Similar to {@link String#format(String, Object...)}, but + * with lower overhead (only String parameters, no locale, no format validation). + * + *

This method is taken quasi verbatim from the Guava Preconditions class. + */ + private static String format(@Nullable String template, @Nullable Object... args) { + final int numArgs = args == null ? 0 : args.length; + template = String.valueOf(template); // null -> "null" + + // start substituting the arguments into the '%s' placeholders + StringBuilder builder = new StringBuilder(template.length() + 16 * numArgs); + int templateStart = 0; + int i = 0; + while (i < numArgs) { + int placeholderStart = template.indexOf("%s", templateStart); + if (placeholderStart == -1) { + break; + } + builder.append(template.substring(templateStart, placeholderStart)); + builder.append(args[i++]); + templateStart = placeholderStart + 2; + } + builder.append(template.substring(templateStart)); + + // if we run out of placeholders, append the extra args in square braces + if (i < numArgs) { + builder.append(" ["); + builder.append(args[i++]); + while (i < numArgs) { + builder.append(", "); + builder.append(args[i++]); + } + builder.append(']'); + } + + return builder.toString(); + } + + // ------------------------------------------------------------------------ + + /** Private constructor to prevent instantiation. */ + private Preconditions() {} +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractListener.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractListener.java new file mode 100644 index 0000000000..d84b9f7e11 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.zk; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; + +public abstract class AbstractListener implements TreeCacheListener { + + @Override + public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception { + String path = null == event.getData() ? "" : event.getData().getPath(); + if (path.isEmpty()) { + return; + } + dataChanged(client, event, path); + } + + protected abstract void dataChanged(final CuratorFramework client, final TreeCacheEvent event, final String path); +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java index f2861ecc7e..bfcfb53108 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/AbstractZKClient.java @@ -47,98 +47,15 @@ import static org.apache.dolphinscheduler.common.Constants.*; /** * abstract zookeeper client */ -public abstract class AbstractZKClient { +public abstract class AbstractZKClient extends ZookeeperCachedOperator{ private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); - /** - * load configuration file - */ - protected static Configuration conf; - - protected CuratorFramework zkClient = null; - /** * server stop or not */ protected IStoppable stoppable = null; - - static { - try { - conf = new PropertiesConfiguration(Constants.ZOOKEEPER_PROPERTIES_PATH); - }catch (ConfigurationException e){ - logger.error("load configuration failed : " + e.getMessage(),e); - System.exit(1); - } - } - - - public AbstractZKClient() { - - // retry strategy - RetryPolicy retryPolicy = new ExponentialBackoffRetry( - conf.getInt(Constants.ZOOKEEPER_RETRY_SLEEP), - conf.getInt(Constants.ZOOKEEPER_RETRY_MAXTIME)); - - try{ - // crate zookeeper client - zkClient = CuratorFrameworkFactory.builder() - .connectString(getZookeeperQuorum()) - .retryPolicy(retryPolicy) - .sessionTimeoutMs(1000 * conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT)) - .connectionTimeoutMs(1000 * conf.getInt(Constants.ZOOKEEPER_CONNECTION_TIMEOUT)) - .build(); - - zkClient.start(); - initStateLister(); - - }catch(Exception e){ - logger.error("create zookeeper connect failed : " + e.getMessage(),e); - System.exit(-1); - } - } - - /** - * - * register status monitoring events for zookeeper clients - */ - public void initStateLister(){ - if(zkClient == null) { - return; - } - // add ConnectionStateListener monitoring zookeeper connection state - ConnectionStateListener csLister = new ConnectionStateListener() { - - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - logger.info("state changed , current state : " + newState.name()); - /** - * probably session expired - */ - if(newState == ConnectionState.LOST){ - // if lost , then exit - logger.info("current zookeepr connection state : connection lost "); - } - } - }; - - zkClient.getConnectionStateListenable().addListener(csLister); - } - - - public void start() { - zkClient.start(); - logger.info("zookeeper start ..."); - } - - public void close() { - zkClient.getZookeeperClient().close(); - zkClient.close(); - logger.info("zookeeper close ..."); - } - - /** * heartbeat for zookeeper * @param znode zookeeper node @@ -328,18 +245,8 @@ public abstract class AbstractZKClient { * * @return zookeeper quorum */ - public static String getZookeeperQuorum(){ - StringBuilder sb = new StringBuilder(); - String[] zookeeperParamslist = conf.getStringArray(Constants.ZOOKEEPER_QUORUM); - for (String param : zookeeperParamslist) { - sb.append(param).append(Constants.COMMA); - } - - if(sb.length() > 0){ - sb.deleteCharAt(sb.length() - 1); - } - - return sb.toString(); + public String getZookeeperQuorum(){ + return getZookeeperConfig().getServerList(); } /** @@ -420,7 +327,7 @@ public abstract class AbstractZKClient { * @return get worker node parent path */ protected String getWorkerZNodeParentPath(){ - return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS); + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; } /** @@ -428,7 +335,7 @@ public abstract class AbstractZKClient { * @return get master node parent path */ protected String getMasterZNodeParentPath(){ - return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS); + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS; } /** @@ -436,7 +343,15 @@ public abstract class AbstractZKClient { * @return get master lock path */ public String getMasterLockPath(){ - return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS); + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS; + } + + /** + * + * @return get master lock path + */ + public String getWorkerLockPath(){ + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS; } /** @@ -464,7 +379,7 @@ public abstract class AbstractZKClient { * @return get dead server node parent path */ protected String getDeadZNodeParentPath(){ - return conf.getString(ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS); + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS; } /** @@ -472,7 +387,7 @@ public abstract class AbstractZKClient { * @return get master start up lock path */ public String getMasterStartUpLockPath(){ - return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS); + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS; } /** @@ -480,7 +395,7 @@ public abstract class AbstractZKClient { * @return get master failover lock path */ public String getMasterFailoverLockPath(){ - return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS); + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS; } /** @@ -488,7 +403,7 @@ public abstract class AbstractZKClient { * @return get worker failover lock path */ public String getWorkerFailoverLockPath(){ - return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS); + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS; } /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/DefaultEnsembleProvider.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/DefaultEnsembleProvider.java new file mode 100644 index 0000000000..0cf06c0503 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/DefaultEnsembleProvider.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.zk; + +import org.apache.curator.ensemble.EnsembleProvider; + +import java.io.IOException; + +/** + * default conf provider + */ +public class DefaultEnsembleProvider implements EnsembleProvider { + + private final String serverList; + + public DefaultEnsembleProvider(String serverList){ + this.serverList = serverList; + } + + @Override + public void start() throws Exception { + //NOP + } + + @Override + public String getConnectionString() { + return serverList; + } + + @Override + public void close() throws IOException { + //NOP + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java new file mode 100644 index 0000000000..cf4980147e --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperCachedOperator.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.zk; + +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.dolphinscheduler.common.utils.Preconditions.*; +import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; + +@Component +public class ZookeeperCachedOperator extends ZookeeperOperator { + + private final Logger logger = LoggerFactory.getLogger(ZookeeperCachedOperator.class); + + //kay is zk path, value is TreeCache + private ConcurrentHashMap allCaches = new ConcurrentHashMap<>(); + + /** + * @param cachePath zk path + * @param listener operator + */ + public void registerListener(final String cachePath, final TreeCacheListener listener) { + TreeCache newCache = new TreeCache(zkClient, cachePath); + logger.info("add listener to zk path: {}", cachePath); + try { + newCache.start(); + } catch (Exception e) { + logger.error("add listener to zk path: {} failed", cachePath); + throw new RuntimeException(e); + } + + newCache.getListenable().addListener(listener); + + allCaches.put(cachePath, newCache); + } + + public String getFromCache(final String cachePath, final String key) { + ChildData resultInCache = allCaches.get(checkNotNull(cachePath)).getCurrentData(key); + if (null != resultInCache) { + return null == resultInCache.getData() ? null : new String(resultInCache.getData(), StandardCharsets.UTF_8); + } + return null; + } + + public TreeCache getTreeCache(final String cachePath) { + return allCaches.get(checkNotNull(cachePath)); + } + + public void close() { + + allCaches.forEach((path, cache) -> { + cache.close(); + try { + Thread.sleep(500); + } catch (InterruptedException ignore) { + } + }); + super.close(); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperConfig.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperConfig.java new file mode 100644 index 0000000000..6e471bbbfb --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperConfig.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.zk; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.configuration.Configuration; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.PropertySource; +import org.springframework.stereotype.Component; + +/** + * zookeeper conf + */ +@Component +@Data +@AllArgsConstructor +@NoArgsConstructor +@Builder +@PropertySource("classpath:zookeeper.properties") +public class ZookeeperConfig { + + //zk connect config + @Value("${zookeeper.quorum}") + private String serverList; + + @Value("${zookeeper.retry.base.sleep:100}") + private int baseSleepTimeMs; + + @Value("${zookeeper.retry.max.sleep:30000}") + private int maxSleepMs; + + @Value("${zookeeper.retry.maxtime:10}") + private int maxRetries; + + @Value("${zookeeper.session.timeout:60000}") + private int sessionTimeoutMs; + + @Value("${zookeeper.connection.timeout:30000}") + private int connectionTimeoutMs; + + @Value("${zookeeper.connection.digest: }") + private String digest; + + //ds scheduler install config + @Value("${zookeeper.dolphinscheduler.root:/dolphinscheduler}") + private String dsRoot; + + public static ZookeeperConfig getFromConf(Configuration conf){ + return ZookeeperConfig.builder().serverList(conf.getString("zookeeper.quorum")).baseSleepTimeMs(conf.getInt("zookeeper.retry.base.sleep")) + .maxSleepMs(conf.getInt("zookeeper.retry.max.sleep")).maxRetries(conf.getInt("zookeeper.retry.maxtime")) + .sessionTimeoutMs(conf.getInt("zookeeper.session.timeout")).connectionTimeoutMs(conf.getInt("zookeeper.connection.timeout")) + .dsRoot(conf.getString("zookeeper.dolphinscheduler.root")).build(); + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java new file mode 100644 index 0000000000..f4d72f436e --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/zk/ZookeeperOperator.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.zk; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.retry.ExponentialBackoffRetry; +import org.apache.curator.utils.CloseableUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.apache.dolphinscheduler.common.utils.Preconditions.*; +import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; + +/** + * zk base operator + */ +@Component +public class ZookeeperOperator implements InitializingBean { + + private final Logger logger = LoggerFactory.getLogger(ZookeeperOperator.class); + + @Autowired + private ZookeeperConfig zookeeperConfig; + + protected CuratorFramework zkClient; + + @Override + public void afterPropertiesSet() throws Exception { + this.zkClient = buildClient(); + initStateLister(); + //init(); + } + + //for subclass + //protected void init(){} + + public void initStateLister() { + checkNotNull(zkClient); + + zkClient.getConnectionStateListenable().addListener((client, newState) -> { + if(newState == ConnectionState.LOST){ + logger.error("connection lost from zookeeper"); + } else if(newState == ConnectionState.RECONNECTED){ + logger.info("reconnected to zookeeper"); + } else if(newState == ConnectionState.SUSPENDED){ + logger.warn("connection SUSPENDED to zookeeper"); + } + }); + } + + private CuratorFramework buildClient() { + logger.info("zookeeper registry center init, server lists is: {}.", zookeeperConfig.getServerList()); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().ensembleProvider(new DefaultEnsembleProvider(checkNotNull(zookeeperConfig.getServerList(),"zookeeper quorum can't be null"))) + .retryPolicy(new ExponentialBackoffRetry(zookeeperConfig.getBaseSleepTimeMs(), zookeeperConfig.getMaxRetries(), zookeeperConfig.getMaxSleepMs())); + + //these has default value + if (0 != zookeeperConfig.getSessionTimeoutMs()) { + builder.sessionTimeoutMs(zookeeperConfig.getSessionTimeoutMs()); + } + if (0 != zookeeperConfig.getConnectionTimeoutMs()) { + builder.connectionTimeoutMs(zookeeperConfig.getConnectionTimeoutMs()); + } + if (StringUtils.isNotBlank(zookeeperConfig.getDigest())) { + builder.authorization("digest", zookeeperConfig.getDigest().getBytes(StandardCharsets.UTF_8)).aclProvider(new ACLProvider() { + + @Override + public List getDefaultAcl() { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + + @Override + public List getAclForPath(final String path) { + return ZooDefs.Ids.CREATOR_ALL_ACL; + } + }); + } + zkClient = builder.build(); + zkClient.start(); + try { + zkClient.blockUntilConnected(); + } catch (final Exception ex) { + throw new RuntimeException(ex); + } + return zkClient; + } + + public String get(final String key) { + try { + return new String(zkClient.getData().forPath(key), StandardCharsets.UTF_8); + } catch (Exception ex) { + logger.error("get key : {}", key, ex); + } + return null; + } + + public List getChildrenKeys(final String key) { + List values; + try { + values = zkClient.getChildren().forPath(key); + if (CollectionUtils.isEmpty(values)) { + logger.warn("getChildrenKeys key : {} is empty", key); + } + return values; + } catch (InterruptedException ex) { + logger.error("getChildrenKeys key : {} InterruptedException", key); + throw new IllegalStateException(ex); + } catch (Exception ex) { + logger.error("getChildrenKeys key : {}", key, ex); + throw new RuntimeException(ex); + } + } + + public boolean isExisted(final String key) { + try { + return zkClient.checkExists().forPath(key) != null; + } catch (Exception ex) { + logger.error("isExisted key : {}", key, ex); + } + return false; + } + + public void persist(final String key, final String value) { + try { + if (!isExisted(key)) { + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + } else { + update(key, value); + } + } catch (Exception ex) { + logger.error("persist key : {} , value : {}", key, value, ex); + } + } + + public void update(final String key, final String value) { + try { + zkClient.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)).and().commit(); + } catch (Exception ex) { + logger.error("update key : {} , value : {}", key, value, ex); + } + } + + public void persistEphemeral(final String key, final String value) { + try { + if (isExisted(key)) { + try { + zkClient.delete().deletingChildrenIfNeeded().forPath(key); + } catch (KeeperException.NoNodeException ignore) { + //NOP + } + } + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + } catch (final Exception ex) { + logger.error("persistEphemeral key : {} , value : {}", key, value, ex); + } + } + + public void persistEphemeral(String key, String value, boolean overwrite) { + try { + if (overwrite) { + persistEphemeral(key, value); + } else { + if (!isExisted(key)) { + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + } + } + } catch (final Exception ex) { + logger.error("persistEphemeral key : {} , value : {}, overwrite : {}", key, value, overwrite, ex); + } + } + + public void persistEphemeralSequential(final String key, String value) { + try { + zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes(StandardCharsets.UTF_8)); + } catch (final Exception ex) { + logger.error("persistEphemeralSequential key : {}", key, ex); + } + } + + public void remove(final String key) { + try { + if (isExisted(key)) { + zkClient.delete().deletingChildrenIfNeeded().forPath(key); + } + } catch (KeeperException.NoNodeException ignore) { + //NOP + } catch (final Exception ex) { + logger.error("remove key : {}", key, ex); + } + } + + public CuratorFramework getZkClient() { + return zkClient; + } + + public ZookeeperConfig getZookeeperConfig() { + return zookeeperConfig; + } + + public void close() { + CloseableUtils.closeQuietly(zkClient); + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/main/resources/zookeeper.properties b/dolphinscheduler-common/src/main/resources/zookeeper.properties index 66b533c0c1..a560de4f23 100644 --- a/dolphinscheduler-common/src/main/resources/zookeeper.properties +++ b/dolphinscheduler-common/src/main/resources/zookeeper.properties @@ -22,21 +22,22 @@ zookeeper.quorum=localhost:2181 zookeeper.dolphinscheduler.root=/dolphinscheduler #zookeeper server dirctory -zookeeper.dolphinscheduler.dead.servers=/dolphinscheduler/dead-servers -zookeeper.dolphinscheduler.masters=/dolphinscheduler/masters -zookeeper.dolphinscheduler.workers=/dolphinscheduler/workers +#zookeeper.dolphinscheduler.dead.servers=/dolphinscheduler/dead-servers +#zookeeper.dolphinscheduler.masters=/dolphinscheduler/masters +#zookeeper.dolphinscheduler.workers=/dolphinscheduler/workers #zookeeper lock dirctory -zookeeper.dolphinscheduler.lock.masters=/dolphinscheduler/lock/masters -zookeeper.dolphinscheduler.lock.workers=/dolphinscheduler/lock/workers +#zookeeper.dolphinscheduler.lock.masters=/dolphinscheduler/lock/masters +#zookeeper.dolphinscheduler.lock.workers=/dolphinscheduler/lock/workers #dolphinscheduler failover directory -zookeeper.dolphinscheduler.lock.failover.masters=/dolphinscheduler/lock/failover/masters -zookeeper.dolphinscheduler.lock.failover.workers=/dolphinscheduler/lock/failover/workers -zookeeper.dolphinscheduler.lock.failover.startup.masters=/dolphinscheduler/lock/failover/startup-masters +#zookeeper.dolphinscheduler.lock.failover.masters=/dolphinscheduler/lock/failover/masters +#zookeeper.dolphinscheduler.lock.failover.workers=/dolphinscheduler/lock/failover/workers +#zookeeper.dolphinscheduler.lock.failover.startup.masters=/dolphinscheduler/lock/failover/startup-masters #dolphinscheduler failover directory zookeeper.session.timeout=300 zookeeper.connection.timeout=300 -zookeeper.retry.sleep=1000 +zookeeper.retry.base.sleep=100 +zookeeper.retry.max.sleep=30000 zookeeper.retry.maxtime=5 \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 621b8839b0..8297cd0403 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -58,6 +58,7 @@ public class MasterServer implements IStoppable { /** * zk master client */ + @Autowired private ZKMasterClient zkMasterClient = null; /** @@ -105,11 +106,10 @@ public class MasterServer implements IStoppable { */ @PostConstruct public void run(){ + zkMasterClient.init(); masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); - zkMasterClient = ZKMasterClient.getZKMasterClient(processDao); - heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.defaulMasterHeartbeatThreadNum); // heartbeat thread implement diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index be12b2f80e..96f5ba0b5d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -68,6 +68,7 @@ public class WorkerServer implements IStoppable { /** * zk worker client */ + @Autowired private ZKWorkerClient zkWorkerClient = null; @@ -137,8 +138,7 @@ public class WorkerServer implements IStoppable { */ @PostConstruct public void run(){ - - zkWorkerClient = ZKWorkerClient.getZKWorkerClient(); + zkWorkerClient.init(); this.taskQueue = TaskQueueFactory.getTaskQueueInstance(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 253e5502d0..2aec6ecaf6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -16,10 +16,12 @@ */ package org.apache.dolphinscheduler.server.zk; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.zk.AbstractListener; import org.apache.dolphinscheduler.common.zk.AbstractZKClient; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.DaoFactory; @@ -36,6 +38,8 @@ import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; import java.util.Date; import java.util.List; @@ -47,6 +51,7 @@ import java.util.concurrent.ThreadFactory; * * single instance */ +@Component public class ZKMasterClient extends AbstractZKClient { /** @@ -71,53 +76,14 @@ public class ZKMasterClient extends AbstractZKClient { /** * flow database access */ + @Autowired private ProcessDao processDao; - /** - * zkMasterClient - */ - private static ZKMasterClient zkMasterClient = null; - - /** - * master path children cache - */ - private PathChildrenCache masterPathChildrenCache; - - /** - * worker path children cache - */ - private PathChildrenCache workerPathChildrenCache; - - /** - * constructor - * - * @param processDao process dao - */ - private ZKMasterClient(ProcessDao processDao){ - this.processDao = processDao; - init(); - } - /** * default constructor */ private ZKMasterClient(){} - /** - * get zkMasterClient - * - * @param processDao process dao - * @return ZKMasterClient zookeeper master client - */ - public static synchronized ZKMasterClient getZKMasterClient(ProcessDao processDao){ - if(zkMasterClient == null){ - zkMasterClient = new ZKMasterClient(processDao); - } - zkMasterClient.processDao = processDao; - - return zkMasterClient; - } - /** * init */ @@ -157,22 +123,6 @@ public class ZKMasterClient extends AbstractZKClient { } } - @Override - public void close(){ - try { - if(masterPathChildrenCache != null){ - masterPathChildrenCache.close(); - } - if(workerPathChildrenCache != null){ - workerPathChildrenCache.close(); - } - super.close(); - } catch (Exception ignore) { - } - } - - - /** * init dao @@ -209,41 +159,29 @@ public class ZKMasterClient extends AbstractZKClient { } - /** * monitor master */ public void listenerMaster(){ - masterPathChildrenCache = new PathChildrenCache(zkClient, - getZNodeParentPath(ZKNodeType.MASTER), true ,defaultThreadFactory); - - try { - masterPathChildrenCache.start(); - masterPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - switch (event.getType()) { - case CHILD_ADDED: - logger.info("master node added : {}",event.getData().getPath()); - break; - case CHILD_REMOVED: - String path = event.getData().getPath(); - String serverHost = getHostByEventDataPath(path); - if(checkServerSelfDead(serverHost, ZKNodeType.MASTER)){ - return; - } - removeZKNodePath(path, ZKNodeType.MASTER, true); - break; - case CHILD_UPDATED: - break; - default: - break; - } + registerListener(getZNodeParentPath(ZKNodeType.MASTER), new AbstractListener() { + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + switch (event.getType()) { + case NODE_ADDED: + logger.info("master node added : {}", path); + break; + case NODE_REMOVED: + String serverHost = getHostByEventDataPath(path); + if (checkServerSelfDead(serverHost, ZKNodeType.MASTER)) { + return; + } + removeZKNodePath(path, ZKNodeType.MASTER, true); + break; + default: + break; } - }); - }catch (Exception e){ - logger.error("monitor master failed : " + e.getMessage(),e); - } + } + }); } /** @@ -338,30 +276,22 @@ public class ZKMasterClient extends AbstractZKClient { * monitor worker */ public void listenerWorker(){ - workerPathChildrenCache = new PathChildrenCache(zkClient, - getZNodeParentPath(ZKNodeType.WORKER),true ,defaultThreadFactory); - try { - workerPathChildrenCache.start(); - workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { - switch (event.getType()) { - case CHILD_ADDED: - logger.info("node added : {}" ,event.getData().getPath()); - break; - case CHILD_REMOVED: - String path = event.getData().getPath(); - logger.info("node deleted : {}",event.getData().getPath()); - removeZKNodePath(path, ZKNodeType.WORKER, true); - break; - default: - break; - } + registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() { + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + switch (event.getType()) { + case NODE_ADDED: + logger.info("worker node added : {}", path); + break; + case NODE_REMOVED: + logger.info("worker node deleted : {}", path); + removeZKNodePath(path, ZKNodeType.WORKER, true); + break; + default: + break; } - }); - }catch (Exception e){ - logger.error("listener worker failed : " + e.getMessage(),e); - } + } + }); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java index 31dc1dab42..0dd1cf15be 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java @@ -16,8 +16,10 @@ */ package org.apache.dolphinscheduler.server.zk; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; +import org.apache.dolphinscheduler.common.zk.AbstractListener; import org.apache.dolphinscheduler.common.zk.AbstractZKClient; import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; @@ -27,6 +29,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ThreadUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import java.util.concurrent.ThreadFactory; @@ -35,6 +38,7 @@ import java.util.concurrent.ThreadFactory; * zookeeper worker client * single instance */ +@Component public class ZKWorkerClient extends AbstractZKClient { /** @@ -42,11 +46,6 @@ public class ZKWorkerClient extends AbstractZKClient { */ private static final Logger logger = LoggerFactory.getLogger(ZKWorkerClient.class); - /** - * thread factory - */ - private static final ThreadFactory defaultThreadFactory = ThreadUtils.newGenericThreadFactory("Worker-Main-Thread"); - /** * worker znode @@ -54,24 +53,10 @@ public class ZKWorkerClient extends AbstractZKClient { private String workerZNode = null; - /** - * zookeeper worker client - */ - private static ZKWorkerClient zkWorkerClient = null; - - /** - * worker path children cache - */ - private PathChildrenCache workerPathChildrenCache; - - private ZKWorkerClient(){ - init(); - } - /** * init */ - private void init(){ + public void init(){ // init system znode this.initSystemZNode(); @@ -83,31 +68,6 @@ public class ZKWorkerClient extends AbstractZKClient { this.registWorker(); } - @Override - public void close(){ - try { - if(workerPathChildrenCache != null){ - workerPathChildrenCache.close(); - } - super.close(); - } catch (Exception ignore) { - } - } - - - /** - * get zookeeper worker client - * - * @return ZKWorkerClient - */ - public static synchronized ZKWorkerClient getZKWorkerClient(){ - if(zkWorkerClient == null){ - zkWorkerClient = new ZKWorkerClient(); - } - return zkWorkerClient; - } - - /** * register worker */ @@ -128,34 +88,25 @@ public class ZKWorkerClient extends AbstractZKClient { * monitor worker */ private void listenerWorker(){ - workerPathChildrenCache = new PathChildrenCache(zkClient, getZNodeParentPath(ZKNodeType.WORKER), true, defaultThreadFactory); - try { - workerPathChildrenCache.start(); - workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { - switch (event.getType()) { - case CHILD_ADDED: - logger.info("node added : {}" ,event.getData().getPath()); - break; - case CHILD_REMOVED: - String path = event.getData().getPath(); - //find myself dead - String serverHost = getHostByEventDataPath(path); - if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){ - return; - } - break; - case CHILD_UPDATED: - break; - default: - break; - } + registerListener(getZNodeParentPath(ZKNodeType.WORKER), new AbstractListener() { + @Override + protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { + switch (event.getType()) { + case NODE_ADDED: + logger.info("worker node added : {}", path); + break; + case NODE_REMOVED: + //find myself dead + String serverHost = getHostByEventDataPath(path); + if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){ + return; + } + break; + default: + break; } - }); - }catch (Exception e){ - logger.error("monitor worker failed : " + e.getMessage(),e); - } + } + }); } @@ -167,13 +118,4 @@ public class ZKWorkerClient extends AbstractZKClient { return workerZNode; } - /** - * get worker lock path - * @return worker lock path - */ - public String getWorkerLockPath(){ - return conf.getString(Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_WORKERS); - } - - }