diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 18882a2fb5..f1962c7d8a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -25,6 +26,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; +import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; import org.apache.dolphinscheduler.server.worker.WorkerServer; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; @@ -42,13 +44,10 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.FilterType; - - - @ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class}) }) -public class MasterServer { +public class MasterServer implements IStoppable { /** * logger of MasterServer @@ -73,6 +72,12 @@ public class MasterServer { */ private NettyRemotingServer nettyRemotingServer; + /** + * master registry + */ + @Autowired + private MasterRegistry masterRegistry; + /** * zk master client */ @@ -100,19 +105,26 @@ public class MasterServer { * run master server */ @PostConstruct - public void run(){ + public void run() { + try { + //init remoting server + NettyServerConfig serverConfig = new NettyServerConfig(); + serverConfig.setListenPort(masterConfig.getListenPort()); + this.nettyRemotingServer = new NettyRemotingServer(serverConfig); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); + this.nettyRemotingServer.start(); - //init remoting server - NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(masterConfig.getListenPort()); - this.nettyRemotingServer = new NettyRemotingServer(serverConfig); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor()); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); - this.nettyRemotingServer.start(); + this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this); + + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException(e); + } // self tolerant - this.zkMasterClient.start(); + this.zkMasterClient.start(this); // scheduler start this.masterSchedulerService.start(); @@ -137,7 +149,9 @@ public class MasterServer { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - close("shutdownHook"); + if (Stopper.isRunning()) { + close("shutdownHook"); + } } })); @@ -145,13 +159,14 @@ public class MasterServer { /** * gracefully close + * * @param cause close cause */ public void close(String cause) { try { //execute only once - if(Stopper.isStopped()){ + if (Stopper.isStopped()) { return; } @@ -163,24 +178,32 @@ public class MasterServer { try { //thread sleep 3 seconds for thread quietly stop Thread.sleep(3000L); - }catch (Exception e){ + } catch (Exception e) { logger.warn("thread sleep exception ", e); } // this.masterSchedulerService.close(); this.nettyRemotingServer.close(); + this.masterRegistry.unRegistry(); this.zkMasterClient.close(); //close quartz - try{ + try { QuartzExecutors.getInstance().shutdown(); logger.info("Quartz service stopped"); - }catch (Exception e){ - logger.warn("Quartz service stopped exception:{}",e.getMessage()); + } catch (Exception e) { + logger.warn("Quartz service stopped exception:{}", e.getMessage()); } + } catch (Exception e) { logger.error("master server stop exception ", e); + } finally { System.exit(-1); } } + + @Override + public void stop(String cause) { + close(cause); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index 1872ae0a6e..ac7d8b0ffc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -149,7 +149,7 @@ public class LowerWeightHostManager extends CommonHostManager { String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup); Set hostWeights = new HashSet<>(nodes.size()); for(String node : nodes){ - String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node); + String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node); if(StringUtils.isNotEmpty(heartbeat) && heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ String[] parts = heartbeat.split(COMMA); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index 0f41ba5246..0624f0c552 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -14,8 +14,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.registry; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.registry.HeartBeatTask; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; + +import org.apache.curator.framework.state.ConnectionState; + import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -23,15 +34,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.registry.HeartBeatTask; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -40,7 +42,7 @@ import org.springframework.stereotype.Service; import com.google.common.collect.Sets; /** - * master registry + * master registry */ @Service public class MasterRegistry { @@ -48,7 +50,7 @@ public class MasterRegistry { private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class); /** - * zookeeper registry center + * zookeeper registry center */ @Autowired private ZookeeperRegistryCenter zookeeperRegistryCenter; @@ -65,42 +67,41 @@ public class MasterRegistry { private ScheduledExecutorService heartBeatExecutor; /** - * worker start time + * master start time */ private String startTime; - @PostConstruct - public void init(){ + public void init() { this.startTime = DateUtils.dateToString(new Date()); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } /** - * registry + * registry */ public void registry() { String address = OSUtils.getHost(); String localNodePath = getMasterPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); - zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - if(newState == ConnectionState.LOST){ + zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); + zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener( + (client, newState) -> { + if (newState == ConnectionState.LOST) { logger.error("master : {} connection lost from zookeeper", address); - } else if(newState == ConnectionState.RECONNECTED){ + } else if (newState == ConnectionState.RECONNECTED) { logger.info("master : {} reconnected to zookeeper", address); - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); - } else if(newState == ConnectionState.SUSPENDED){ + zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); + } else if (newState == ConnectionState.SUSPENDED) { logger.warn("master : {} connection SUSPENDED ", address); + zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); } - } - }); + }); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, masterConfig.getMasterReservedMemory(), masterConfig.getMasterMaxCpuloadAvg(), Sets.newHashSet(getMasterPath()), + Constants.MASTER_PREFIX, zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); @@ -108,31 +109,37 @@ public class MasterRegistry { } /** - * remove registry info + * remove registry info */ public void unRegistry() { String address = getLocalAddress(); String localNodePath = getMasterPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); + zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath); logger.info("master node : {} unRegistry to ZK.", address); } /** - * get master path - * @return + * get master path */ - private String getMasterPath() { + public String getMasterPath() { String address = getLocalAddress(); - String localNodePath = this.zookeeperRegistryCenter.getMasterPath() + "/" + address; - return localNodePath; + return this.zookeeperRegistryCenter.getMasterPath() + "/" + address; } /** - * get local address + * get local address * @return */ private String getLocalAddress(){ return OSUtils.getAddr(masterConfig.getListenPort()); } + /** + * get zookeeper registry center + * @return ZookeeperRegistryCenter + */ + public ZookeeperRegistryCenter getZookeeperRegistryCenter() { + return zookeeperRegistryCenter; + } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index bd8c79cce9..90d6ea3620 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -19,16 +19,21 @@ package org.apache.dolphinscheduler.server.registry; import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; -import java.util.Date; -import java.util.Set; - import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; + +import java.util.Date; +import java.util.Set; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HeartBeatTask extends Thread { +/** + * Heart beat task + */ +public class HeartBeatTask implements Runnable { private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); @@ -36,23 +41,39 @@ public class HeartBeatTask extends Thread { private double reservedMemory; private double maxCpuloadAvg; private Set heartBeatPaths; + private String serverType; private ZookeeperRegistryCenter zookeeperRegistryCenter; + /** + * server stop or not + */ + protected IStoppable stoppable = null; public HeartBeatTask(String startTime, double reservedMemory, double maxCpuloadAvg, Set heartBeatPaths, + String serverType, ZookeeperRegistryCenter zookeeperRegistryCenter) { this.startTime = startTime; this.reservedMemory = reservedMemory; this.maxCpuloadAvg = maxCpuloadAvg; this.heartBeatPaths = heartBeatPaths; this.zookeeperRegistryCenter = zookeeperRegistryCenter; + this.serverType = serverType; } @Override public void run() { try { + + // check dead or not in zookeeper + for (String heartBeatPath : heartBeatPaths) { + if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath, serverType)) { + zookeeperRegistryCenter.getStoppable().stop("i was judged to death, release resources and stop myself"); + return; + } + } + double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); double loadAverage = OSUtils.loadAverage(); @@ -78,10 +99,19 @@ public class HeartBeatTask extends Thread { builder.append(OSUtils.getProcessID()); for (String heartBeatPath : heartBeatPaths) { - zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString()); + zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath, builder.toString()); } } catch (Throwable ex) { logger.error("error write heartbeat info", ex); } } + + /** + * for stop server + * + * @param serverStoppable server stoppable interface + */ + public void setStoppable(IStoppable serverStoppable) { + this.stoppable = serverStoppable; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java index 864276ba0e..bae4d141d4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java @@ -93,11 +93,11 @@ public class ZookeeperNodeManager implements InitializingBean { /** * init MasterNodeListener listener */ - registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener()); + registryCenter.getRegisterOperator().addListener(new MasterNodeListener()); /** * init WorkerNodeListener listener */ - registryCenter.getZookeeperCachedOperator().addListener(new WorkerGroupNodeListener()); + registryCenter.getRegisterOperator().addListener(new WorkerGroupNodeListener()); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java index 3ca62bee6a..9017a13a65 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java @@ -17,19 +17,27 @@ package org.apache.dolphinscheduler.server.registry; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; +import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.IStoppable; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + /** - * zookeeper register center + * zookeeper register center */ @Service public class ZookeeperRegistryCenter implements InitializingBean { @@ -38,10 +46,9 @@ public class ZookeeperRegistryCenter implements InitializingBean { @Autowired - protected ZookeeperCachedOperator zookeeperCachedOperator; - + protected RegisterOperator registerOperator; @Autowired - private ZookeeperConfig zookeeperConfig; + private ZookeeperConfig zookeeperConfig; /** * nodes namespace @@ -60,6 +67,8 @@ public class ZookeeperRegistryCenter implements InitializingBean { public final String EMPTY = ""; + private IStoppable stoppable; + @Override public void afterPropertiesSet() throws Exception { NODES = zookeeperConfig.getDsRoot() + "/nodes"; @@ -82,23 +91,22 @@ public class ZookeeperRegistryCenter implements InitializingBean { * init nodes */ private void initNodes() { - zookeeperCachedOperator.persist(MASTER_PATH, EMPTY); - zookeeperCachedOperator.persist(WORKER_PATH, EMPTY); + registerOperator.persist(MASTER_PATH, EMPTY); + registerOperator.persist(WORKER_PATH, EMPTY); } /** * close */ public void close() { - if (isStarted.compareAndSet(true, false)) { - if (zookeeperCachedOperator != null) { - zookeeperCachedOperator.close(); - } + if (isStarted.compareAndSet(true, false) && registerOperator != null) { + registerOperator.close(); } } /** * get master path + * * @return master path */ public String getMasterPath() { @@ -107,6 +115,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get worker path + * * @return worker path */ public String getWorkerPath() { @@ -114,7 +123,8 @@ public class ZookeeperRegistryCenter implements InitializingBean { } /** - * get master nodes directly + * get master nodes directly + * * @return master nodes */ public Set getMasterNodesDirectly() { @@ -123,7 +133,8 @@ public class ZookeeperRegistryCenter implements InitializingBean { } /** - * get worker nodes directly + * get worker nodes directly + * * @return master nodes */ public Set getWorkerNodesDirectly() { @@ -133,6 +144,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get worker group directly + * * @return worker group nodes */ public Set getWorkerGroupDirectly() { @@ -142,6 +154,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get worker group nodes + * * @param workerGroup * @return */ @@ -152,6 +165,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * whether worker path + * * @param path path * @return result */ @@ -161,6 +175,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * whether master path + * * @param path path * @return result */ @@ -170,6 +185,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get worker group path + * * @param workerGroup workerGroup * @return worker group path */ @@ -179,19 +195,53 @@ public class ZookeeperRegistryCenter implements InitializingBean { /** * get children nodes + * * @param key key * @return children nodes */ public List getChildrenKeys(final String key) { - return zookeeperCachedOperator.getChildrenKeys(key); + return registerOperator.getChildrenKeys(key); + } + + /** + * @return get dead server node parent path + */ + public String getDeadZNodeParentPath() { + return registerOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS; + } + + public void setStoppable(IStoppable stoppable) { + this.stoppable = stoppable; + } + + public IStoppable getStoppable() { + return stoppable; } /** - * get zookeeperCachedOperator - * @return zookeeperCachedOperator + * check dead server or not , if dead, stop self + * + * @param zNode node path + * @param serverType master or worker prefix + * @return true if not exists + * @throws Exception errors */ - public ZookeeperCachedOperator getZookeeperCachedOperator() { - return zookeeperCachedOperator; + protected boolean checkIsDeadServer(String zNode, String serverType) throws Exception { + //ip_sequenceno + String[] zNodesPath = zNode.split("\\/"); + String ipSeqNo = zNodesPath[zNodesPath.length - 1]; + + String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; + String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; + + if (!registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath)) { + return true; + } + + return false; } + public RegisterOperator getRegisterOperator() { + return registerOperator; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 6895de3d4a..b408f6b60c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.worker; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.IStoppable; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -29,6 +31,9 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.Set; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -42,7 +47,7 @@ import javax.annotation.PostConstruct; * worker server */ @ComponentScan("org.apache.dolphinscheduler") -public class WorkerServer { +public class WorkerServer implements IStoppable { /** * logger @@ -106,7 +111,15 @@ public class WorkerServer { this.nettyRemotingServer.start(); // worker registry - this.workerRegistry.registry(); + try { + this.workerRegistry.registry(); + this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this); + Set workerZkPaths = this.workerRegistry.getWorkerZkPaths(); + this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new RuntimeException(e); + } // retry report task status this.retryReportTaskStatusThread.start(); @@ -117,7 +130,9 @@ public class WorkerServer { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - close("shutdownHook"); + if (Stopper.isRunning()) { + close("shutdownHook"); + } } })); } @@ -126,7 +141,7 @@ public class WorkerServer { try { //execute only once - if(Stopper.isStopped()){ + if (Stopper.isStopped()) { return; } @@ -138,7 +153,7 @@ public class WorkerServer { try { //thread sleep 3 seconds for thread quitely stop Thread.sleep(3000L); - }catch (Exception e){ + } catch (Exception e) { logger.warn("thread sleep exception", e); } @@ -147,8 +162,13 @@ public class WorkerServer { } catch (Exception e) { logger.error("worker server stop exception ", e); + } finally { System.exit(-1); } } + @Override + public void stop(String cause) { + close(cause); + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 921d0defbb..01e4554962 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -19,6 +19,17 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.SLASH; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.apache.dolphinscheduler.server.registry.HeartBeatTask; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; + +import org.apache.curator.framework.state.ConnectionState; + import java.util.Date; import java.util.Set; import java.util.concurrent.Executors; @@ -27,16 +38,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; -import org.apache.dolphinscheduler.server.registry.HeartBeatTask; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -46,7 +47,7 @@ import com.google.common.collect.Sets; /** - * worker registry + * worker registry */ @Service public class WorkerRegistry { @@ -54,13 +55,13 @@ public class WorkerRegistry { private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); /** - * zookeeper registry center + * zookeeper registry center */ @Autowired private ZookeeperRegistryCenter zookeeperRegistryCenter; /** - * worker config + * worker config */ @Autowired private WorkerConfig workerConfig; @@ -79,14 +80,22 @@ public class WorkerRegistry { private Set workerGroups; @PostConstruct - public void init(){ + public void init() { this.workerGroups = workerConfig.getWorkerGroups(); this.startTime = DateUtils.dateToString(new Date()); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } /** - * registry + * get zookeeper registry center + * @return ZookeeperRegistryCenter + */ + public ZookeeperRegistryCenter getZookeeperRegistryCenter() { + return zookeeperRegistryCenter; + } + + /** + * registry */ public void registry() { String address = OSUtils.getHost(); @@ -94,20 +103,18 @@ public class WorkerRegistry { int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); for (String workerZKPath : workerZkPaths) { - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); - zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { + zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, ""); + zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener( + (client,newState) -> { if (newState == ConnectionState.LOST) { logger.error("worker : {} connection lost from zookeeper", address); } else if (newState == ConnectionState.RECONNECTED) { logger.info("worker : {} reconnected to zookeeper", address); - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); + zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, ""); } else if (newState == ConnectionState.SUSPENDED) { logger.warn("worker : {} connection SUSPENDED ", address); } - } - }); + }); logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); } @@ -115,6 +122,7 @@ public class WorkerRegistry { this.workerConfig.getWorkerReservedMemory(), this.workerConfig.getWorkerMaxCpuloadAvg(), workerZkPaths, + Constants.WORKER_PREFIX, this.zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); @@ -122,22 +130,22 @@ public class WorkerRegistry { } /** - * remove registry info + * remove registry info */ public void unRegistry() { String address = getLocalAddress(); Set workerZkPaths = getWorkerZkPaths(); for (String workerZkPath : workerZkPaths) { - zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath); + zookeeperRegistryCenter.getRegisterOperator().remove(workerZkPath); logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath); } this.heartBeatExecutor.shutdownNow(); } /** - * get worker path + * get worker path */ - private Set getWorkerZkPaths() { + public Set getWorkerZkPaths() { Set workerZkPaths = Sets.newHashSet(); String address = getLocalAddress(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 7f74c8cef2..2d8eed90b9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.master.MasterServer; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -72,8 +73,7 @@ public class ZKMasterClient extends AbstractZKClient { @Autowired private MasterRegistry masterRegistry; - public void start() { - + public void start(MasterServer masterServer) { InterProcessMutex mutex = null; try { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master @@ -83,6 +83,9 @@ public class ZKMasterClient extends AbstractZKClient { // Master registry masterRegistry.registry(); + masterRegistry.getZookeeperRegistryCenter().setStoppable(masterServer); + String registPath = this.masterRegistry.getMasterPath(); + masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP); // init system znode this.initSystemZNode(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index eb914f4830..2c2a1b5e3a 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -17,11 +17,20 @@ package org.apache.dolphinscheduler.server.master.consumer; -import org.apache.dolphinscheduler.common.enums.*; +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Resource; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.config.MasterConfig; @@ -35,8 +44,17 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import org.apache.curator.CuratorZookeeperClient; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -47,17 +65,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - - @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class}) + ZookeeperNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class}) public class TaskPriorityQueueConsumerTest { @@ -503,8 +514,6 @@ public class TaskPriorityQueueConsumerTest { TaskExecutionContext taskExecutionContext = taskPriorityQueueConsumer.getTaskExecutionContext(1); - - Assert.assertNotNull(taskExecutionContext); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java index 81d27bee1d..2488d597fc 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java @@ -17,29 +17,32 @@ package org.apache.dolphinscheduler.server.master.registry; -import org.apache.dolphinscheduler.common.utils.OSUtils; +import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; + import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import org.apache.curator.CuratorZookeeperClient; + +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; /** * master registry test */ @RunWith(SpringRunner.class) -@ContextConfiguration(classes={SpringZKServer.class, MasterRegistry.class,ZookeeperRegistryCenter.class, MasterConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) +@ContextConfiguration(classes = {SpringZKServer.class, MasterRegistry.class, ZookeeperRegistryCenter.class, + MasterConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class}) public class MasterRegistryTest { @Autowired @@ -56,18 +59,20 @@ public class MasterRegistryTest { masterRegistry.registry(); String masterPath = zookeeperRegistryCenter.getMasterPath(); TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node - String masterNodePath = masterPath + Constants.SLASH + (OSUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort())); - String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath); + String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort()); + String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath); Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length); + masterRegistry.unRegistry(); } @Test public void testUnRegistry() throws InterruptedException { + masterRegistry.init(); masterRegistry.registry(); TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node masterRegistry.unRegistry(); String masterPath = zookeeperRegistryCenter.getMasterPath(); - List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(masterPath); + List childrenKeys = zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(masterPath); Assert.assertTrue(childrenKeys.isEmpty()); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java new file mode 100644 index 0000000000..24bb25c97f --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.registry; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; +import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +/** + * zookeeper registry center test + */ +@RunWith(MockitoJUnitRunner.class) +public class ZookeeperRegistryCenterTest { + + @InjectMocks + private ZookeeperRegistryCenter zookeeperRegistryCenter; + + @Mock + protected RegisterOperator registerOperator; + + @Mock + private ZookeeperConfig zookeeperConfig; + + private static final String DS_ROOT = "/dolphinscheduler"; + + @Test + public void testGetDeadZNodeParentPath() { + ZookeeperConfig zookeeperConfig = new ZookeeperConfig(); + zookeeperConfig.setDsRoot(DS_ROOT); + Mockito.when(registerOperator.getZookeeperConfig()).thenReturn(zookeeperConfig); + + String deadZNodeParentPath = zookeeperRegistryCenter.getDeadZNodeParentPath(); + + Assert.assertEquals(deadZNodeParentPath, DS_ROOT + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS); + + } + +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index c38ca3ee9c..f228802f8f 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.processor; -import io.netty.channel.Channel; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingServer; @@ -33,12 +33,18 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseSer import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import org.apache.curator.CuratorZookeeperClient; + +import java.util.Date; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,17 +53,32 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import java.io.IOException; -import java.util.Date; +import io.netty.channel.Channel; /** * test task call back service + * todo refactor it in the form of mock */ @RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class, - ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class, - TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class}) +@ContextConfiguration(classes = { + TaskCallbackServiceTestConfig.class, + SpringZKServer.class, + SpringApplicationContext.class, + MasterRegistry.class, + WorkerRegistry.class, + ZookeeperRegistryCenter.class, + MasterConfig.class, + WorkerConfig.class, + RegisterOperator.class, + ZookeeperConfig.class, + ZookeeperNodeManager.class, + TaskCallbackService.class, + TaskResponseService.class, + TaskAckProcessor.class, + TaskResponseProcessor.class, + TaskExecuteProcessor.class, + CuratorZookeeperClient.class, + TaskExecutionContextCacheManagerImpl.class}) public class TaskCallbackServiceTest { @Autowired @@ -74,6 +95,7 @@ public class TaskCallbackServiceTest { /** * send ack test + * * @throws Exception */ @Test @@ -101,6 +123,7 @@ public class TaskCallbackServiceTest { /** * send result test + * * @throws Exception */ @Test @@ -140,7 +163,7 @@ public class TaskCallbackServiceTest { @Test public void testPause(){ - Assert.assertEquals(5000, taskCallbackService.pause(3));; + Assert.assertEquals(5000, taskCallbackService.pause(3)); } @Test @@ -171,41 +194,4 @@ public class TaskCallbackServiceTest { nettyRemotingServer.close(); nettyRemotingClient.close(); } - -// @Test(expected = IllegalStateException.class) -// public void testSendAckWithIllegalStateException2(){ -// masterRegistry.registry(); -// final NettyServerConfig serverConfig = new NettyServerConfig(); -// serverConfig.setListenPort(30000); -// NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); -// nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); -// nettyRemotingServer.start(); -// -// final NettyClientConfig clientConfig = new NettyClientConfig(); -// NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); -// Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); -// taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); -// channel.close(); -// TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); -// ackCommand.setTaskInstanceId(1); -// ackCommand.setStartTime(new Date()); -// -// nettyRemotingServer.close(); -// -// taskCallbackService.sendAck(1, ackCommand.convert2Command()); -// try { -// Thread.sleep(5000); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// -// Stopper.stop(); -// -// try { -// Thread.sleep(5000); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// } - } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java index 98a78e93c8..0a4307bd30 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java @@ -19,18 +19,20 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Executor; - -import org.apache.curator.framework.imps.CuratorFrameworkImpl; -import org.apache.curator.framework.listen.Listenable; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; +import org.apache.dolphinscheduler.service.zk.RegisterOperator; + +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.state.ConnectionStateListener; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executor; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -61,7 +63,7 @@ public class WorkerRegistryTest { private ZookeeperRegistryCenter zookeeperRegistryCenter; @Mock - private ZookeeperCachedOperator zookeeperCachedOperator; + private RegisterOperator registerOperator; @Mock private CuratorFrameworkImpl zkClient; @@ -69,15 +71,21 @@ public class WorkerRegistryTest { @Mock private WorkerConfig workerConfig; + private static final Set workerGroups; + + static { + workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP); + } + @Before public void before() { - Set workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP); + Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups); Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker"); - Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator); - Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient); - Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn( + Mockito.when(zookeeperRegistryCenter.getRegisterOperator()).thenReturn(registerOperator); + Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient()).thenReturn(zkClient); + Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable()).thenReturn( new Listenable() { @Override public void addListener(ConnectionStateListener connectionStateListener) { @@ -114,7 +122,7 @@ public class WorkerRegistryTest { int i = 0; for (String workerGroup : workerConfig.getWorkerGroups()) { String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (OSUtils.getAddr(workerConfig.getListenPort())); - String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath); + String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(workerZkPath); if (0 == i) { Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/")); } else { @@ -143,6 +151,7 @@ public class WorkerRegistryTest { Assert.assertEquals(0, testWorkerGroupPathZkChildren.size()); Assert.assertEquals(0, defaultWorkerGroupPathZkChildren.size()); + workerRegistry.unRegistry(); } @Test @@ -155,7 +164,7 @@ public class WorkerRegistryTest { for (String workerGroup : workerConfig.getWorkerGroups()) { String workerGroupPath = workerPath + "/" + workerGroup.trim(); - List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath); + List childrenKeys = zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(workerGroupPath); Assert.assertTrue(childrenKeys.isEmpty()); } @@ -167,4 +176,10 @@ public class WorkerRegistryTest { workerRegistry.unRegistry(); } + + @Test + public void testGetWorkerZkPaths() { + workerRegistry.init(); + Assert.assertEquals(workerGroups.size(),workerRegistry.getWorkerZkPaths().size()); + } } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index 1cc4db6fea..24cdb89a06 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -14,322 +14,256 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.service.zk; -import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP; +import static org.apache.dolphinscheduler.common.Constants.COLON; +import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP; +import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING; +import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; +import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.ResInfo; import org.apache.dolphinscheduler.common.utils.StringUtils; + +import org.apache.curator.framework.recipes.locks.InterProcessMutex; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; -import java.util.*; - -import static org.apache.dolphinscheduler.common.Constants.*; - /** * abstract zookeeper client */ @Component -public abstract class AbstractZKClient extends ZookeeperCachedOperator { - - private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); - - - /** - * remove dead server by host - * @param host host - * @param serverType serverType - * @throws Exception - */ - public void removeDeadServerByHost(String host, String serverType) throws Exception { - List deadServers = super.getChildrenKeys(getDeadZNodeParentPath()); - for(String serverPath : deadServers){ - if(serverPath.startsWith(serverType+UNDERLINE+host)){ - String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; - super.remove(server); - logger.info("{} server {} deleted from zk dead server path success" , serverType , host); - } - } - } - - - /** - * opType(add): if find dead server , then add to zk deadServerPath - * opType(delete): delete path from zk - * - * @param zNode node path - * @param zkNodeType master or worker - * @param opType delete or add - * @throws Exception errors - */ - public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception { - String host = getHostByEventDataPath(zNode); - String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; - - //check server restart, if restart , dead server path in zk should be delete - if(opType.equals(DELETE_ZK_OP)){ - removeDeadServerByHost(host, type); - - }else if(opType.equals(ADD_ZK_OP)){ - String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host; - if(!super.isExisted(deadServerPath)){ - //add dead server info to zk dead server path : /dead-servers/ - - super.persist(deadServerPath,(type + UNDERLINE + host)); - - logger.info("{} server dead , and {} added to zk dead server path success" , - zkNodeType.toString(), zNode); - } - } - - } - - /** - * get active master num - * @return active master number - */ - public int getActiveMasterNum(){ - List childrenList = new ArrayList<>(); - try { - // read master node parent path from conf - if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){ - childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER)); - } - } catch (Exception e) { - logger.error("getActiveMasterNum error",e); - } - return childrenList.size(); - } - - /** - * - * @return zookeeper quorum - */ - public String getZookeeperQuorum(){ - return getZookeeperConfig().getServerList(); - } - - /** - * get server list. - * @param zkNodeType zookeeper node type - * @return server list - */ - public List getServersList(ZKNodeType zkNodeType){ - Map masterMap = getServerMaps(zkNodeType); - String parentPath = getZNodeParentPath(zkNodeType); - - List masterServers = new ArrayList<>(); - for (Map.Entry entry : masterMap.entrySet()) { - Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); - if(masterServer == null){ - continue; - } - String key = entry.getKey(); - masterServer.setZkDirectory(parentPath + "/"+ key); - //set host and port - String[] hostAndPort=key.split(COLON); - String[] hosts=hostAndPort[0].split(DIVISION_STRING); - // fetch the last one - masterServer.setHost(hosts[hosts.length-1]); - masterServer.setPort(Integer.parseInt(hostAndPort[1])); - masterServers.add(masterServer); - } - return masterServers; - } - - /** - * get master server list map. - * @param zkNodeType zookeeper node type - * @return result : {host : resource info} - */ - public Map getServerMaps(ZKNodeType zkNodeType){ - - Map masterMap = new HashMap<>(); - try { - String path = getZNodeParentPath(zkNodeType); - List serverList = super.getChildrenKeys(path); - if(zkNodeType == ZKNodeType.WORKER){ - List workerList = new ArrayList<>(); - for(String group : serverList){ - List groupServers = super.getChildrenKeys(path + Constants.SLASH + group); - for(String groupServer : groupServers){ - workerList.add(group + Constants.SLASH + groupServer); - } - } - serverList = workerList; - } - for(String server : serverList){ - masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server)); - } - } catch (Exception e) { - logger.error("get server list failed", e); - } - - return masterMap; - } - - /** - * check the zookeeper node already exists - * @param host host - * @param zkNodeType zookeeper node type - * @return true if exists - */ - public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) { - String path = getZNodeParentPath(zkNodeType); - if(StringUtils.isEmpty(path)){ - logger.error("check zk node exists error, host:{}, zk node type:{}", - host, zkNodeType.toString()); - return false; - } - Map serverMaps = getServerMaps(zkNodeType); - for(String hostKey : serverMaps.keySet()){ - if(hostKey.contains(host)){ - return true; - } - } - return false; - } - - /** - * - * @return get worker node parent path - */ - protected String getWorkerZNodeParentPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; - } - - /** - * - * @return get master node parent path - */ - protected String getMasterZNodeParentPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS; - } - - /** - * - * @return get master lock path - */ - public String getMasterLockPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS; - } - - /** - * - * @param zkNodeType zookeeper node type - * @return get zookeeper node parent path - */ - public String getZNodeParentPath(ZKNodeType zkNodeType) { - String path = ""; - switch (zkNodeType){ - case MASTER: - return getMasterZNodeParentPath(); - case WORKER: - return getWorkerZNodeParentPath(); - case DEAD_SERVER: - return getDeadZNodeParentPath(); - default: - break; - } - return path; - } - - /** - * - * @return get dead server node parent path - */ - protected String getDeadZNodeParentPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS; - } - - /** - * - * @return get master start up lock path - */ - public String getMasterStartUpLockPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS; - } - - /** - * - * @return get master failover lock path - */ - public String getMasterFailoverLockPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS; - } - - /** - * - * @return get worker failover lock path - */ - public String getWorkerFailoverLockPath(){ - return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS; - } - - /** - * release mutex - * @param mutex mutex - */ - public void releaseMutex(InterProcessMutex mutex) { - if (mutex != null){ - try { - mutex.release(); - } catch (Exception e) { - if(e.getMessage().equals("instance must be started before calling this method")){ - logger.warn("lock release"); - }else{ - logger.error("lock release failed",e); - } - - } - } - } - - /** - * init system znode - */ - protected void initSystemZNode(){ - try { - persist(getMasterZNodeParentPath(), ""); - persist(getWorkerZNodeParentPath(), ""); - persist(getDeadZNodeParentPath(), ""); - - logger.info("initialize server nodes success."); - } catch (Exception e) { - logger.error("init system znode failed",e); - } - } - - /** - * get host ip, string format: masterParentPath/ip - * @param path path - * @return host ip, string format: masterParentPath/ip - */ - protected String getHostByEventDataPath(String path) { - if(StringUtils.isEmpty(path)){ - logger.error("empty path!"); - return ""; - } - String[] pathArray = path.split(SINGLE_SLASH); - if(pathArray.length < 1){ - logger.error("parse ip error: {}", path); - return ""; - } - return pathArray[pathArray.length - 1]; - - } - - @Override - public String toString() { - return "AbstractZKClient{" + - "zkClient=" + zkClient + - ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' + - ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' + - ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' + - '}'; - } -} \ No newline at end of file +public abstract class AbstractZKClient extends RegisterOperator { + + private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class); + + /** + * get active master num + * + * @return active master number + */ + public int getActiveMasterNum() { + List childrenList = new ArrayList<>(); + try { + // read master node parent path from conf + if (super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))) { + childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER)); + } + } catch (Exception e) { + logger.error("getActiveMasterNum error", e); + } + return childrenList.size(); + } + + /** + * @return zookeeper quorum + */ + public String getZookeeperQuorum() { + return getZookeeperConfig().getServerList(); + } + + /** + * get server list. + * + * @param zkNodeType zookeeper node type + * @return server list + */ + public List getServersList(ZKNodeType zkNodeType) { + Map masterMap = getServerMaps(zkNodeType); + String parentPath = getZNodeParentPath(zkNodeType); + + List masterServers = new ArrayList<>(); + for (Map.Entry entry : masterMap.entrySet()) { + Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); + if (masterServer == null) { + continue; + } + String key = entry.getKey(); + masterServer.setZkDirectory(parentPath + "/" + key); + //set host and port + String[] hostAndPort = key.split(COLON); + String[] hosts = hostAndPort[0].split(DIVISION_STRING); + // fetch the last one + masterServer.setHost(hosts[hosts.length - 1]); + masterServer.setPort(Integer.parseInt(hostAndPort[1])); + masterServers.add(masterServer); + } + return masterServers; + } + + /** + * get master server list map. + * + * @param zkNodeType zookeeper node type + * @return result : {host : resource info} + */ + public Map getServerMaps(ZKNodeType zkNodeType) { + + Map masterMap = new HashMap<>(); + try { + String path = getZNodeParentPath(zkNodeType); + List serverList = super.getChildrenKeys(path); + if (zkNodeType == ZKNodeType.WORKER) { + List workerList = new ArrayList<>(); + for (String group : serverList) { + List groupServers = super.getChildrenKeys(path + Constants.SLASH + group); + for (String groupServer : groupServers) { + workerList.add(group + Constants.SLASH + groupServer); + } + } + serverList = workerList; + } + for (String server : serverList) { + masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server)); + } + } catch (Exception e) { + logger.error("get server list failed", e); + } + + return masterMap; + } + + /** + * check the zookeeper node already exists + * + * @param host host + * @param zkNodeType zookeeper node type + * @return true if exists + */ + public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) { + String path = getZNodeParentPath(zkNodeType); + if (StringUtils.isEmpty(path)) { + logger.error("check zk node exists error, host:{}, zk node type:{}", + host, zkNodeType); + return false; + } + Map serverMaps = getServerMaps(zkNodeType); + for (String hostKey : serverMaps.keySet()) { + if (hostKey.contains(host)) { + return true; + } + } + return false; + } + + /** + * @return get worker node parent path + */ + protected String getWorkerZNodeParentPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; + } + + /** + * @return get master node parent path + */ + protected String getMasterZNodeParentPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS; + } + + /** + * @return get master lock path + */ + public String getMasterLockPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS; + } + + /** + * @param zkNodeType zookeeper node type + * @return get zookeeper node parent path + */ + public String getZNodeParentPath(ZKNodeType zkNodeType) { + String path = ""; + switch (zkNodeType) { + case MASTER: + return getMasterZNodeParentPath(); + case WORKER: + return getWorkerZNodeParentPath(); + case DEAD_SERVER: + return getDeadZNodeParentPath(); + default: + break; + } + return path; + } + + + /** + * @return get master start up lock path + */ + public String getMasterStartUpLockPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS; + } + + /** + * @return get master failover lock path + */ + public String getMasterFailoverLockPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS; + } + + /** + * @return get worker failover lock path + */ + public String getWorkerFailoverLockPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS; + } + + /** + * release mutex + * + * @param mutex mutex + */ + public void releaseMutex(InterProcessMutex mutex) { + if (mutex != null) { + try { + mutex.release(); + } catch (Exception e) { + if ("instance must be started before calling this method".equals(e.getMessage())) { + logger.warn("lock release"); + } else { + logger.error("lock release failed", e); + } + + } + } + } + + /** + * init system znode + */ + protected void initSystemZNode() { + try { + persist(getMasterZNodeParentPath(), ""); + persist(getWorkerZNodeParentPath(), ""); + persist(getDeadZNodeParentPath(), ""); + + logger.info("initialize server nodes success."); + } catch (Exception e) { + logger.error("init system znode failed", e); + } + } + + @Override + public String toString() { + return "AbstractZKClient{" + + "zkClient=" + getZkClient() + + ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' + + ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' + + ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java new file mode 100644 index 0000000000..0fd4a4fa92 --- /dev/null +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.zk; + +import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP; +import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP; +import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; +import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; +import org.apache.dolphinscheduler.common.utils.StringUtils; + +import java.util.List; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * register operator + */ +@Component +public class RegisterOperator extends ZookeeperCachedOperator { + + private final Logger logger = LoggerFactory.getLogger(RegisterOperator.class); + + /** + * @return get dead server node parent path + */ + protected String getDeadZNodeParentPath() { + return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS; + } + + /** + * remove dead server by host + * + * @param host host + * @param serverType serverType + * @throws Exception + */ + public void removeDeadServerByHost(String host, String serverType) throws Exception { + List deadServers = super.getChildrenKeys(getDeadZNodeParentPath()); + for (String serverPath : deadServers) { + if (serverPath.startsWith(serverType + UNDERLINE + host)) { + String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath; + super.remove(server); + logger.info("{} server {} deleted from zk dead server path success", serverType, host); + } + } + } + + /** + * get host ip, string format: masterParentPath/ip + * + * @param path path + * @return host ip, string format: masterParentPath/ip + */ + protected String getHostByEventDataPath(String path) { + if (StringUtils.isEmpty(path)) { + logger.error("empty path!"); + return ""; + } + String[] pathArray = path.split(SINGLE_SLASH); + if (pathArray.length < 1) { + logger.error("parse ip error: {}", path); + return ""; + } + return pathArray[pathArray.length - 1]; + + } + + /** + * opType(add): if find dead server , then add to zk deadServerPath + * opType(delete): delete path from zk + * + * @param zNode node path + * @param zkNodeType master or worker + * @param opType delete or add + * @throws Exception errors + */ + public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception { + String host = getHostByEventDataPath(zNode); + String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; + + //check server restart, if restart , dead server path in zk should be delete + if (opType.equals(DELETE_ZK_OP)) { + removeDeadServerByHost(host, type); + + } else if (opType.equals(ADD_ZK_OP)) { + String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host; + if (!super.isExisted(deadServerPath)) { + //add dead server info to zk dead server path : /dead-servers/ + + super.persist(deadServerPath, (type + UNDERLINE + host)); + + logger.info("{} server dead , and {} added to zk dead server path success", + zkNodeType, zNode); + } + } + + } + + /** + * opType(add): if find dead server , then add to zk deadServerPath + * opType(delete): delete path from zk + * + * @param zNodeSet node path set + * @param zkNodeType master or worker + * @param opType delete or add + * @throws Exception errors + */ + public void handleDeadServer(Set zNodeSet, ZKNodeType zkNodeType, String opType) throws Exception { + + String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; + for (String zNode : zNodeSet) { + String host = getHostByEventDataPath(zNode); + //check server restart, if restart , dead server path in zk should be delete + if (opType.equals(DELETE_ZK_OP)) { + removeDeadServerByHost(host, type); + + } else if (opType.equals(ADD_ZK_OP)) { + String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host; + if (!super.isExisted(deadServerPath)) { + //add dead server info to zk dead server path : /dead-servers/ + + super.persist(deadServerPath, (type + UNDERLINE + host)); + + logger.info("{} server dead , and {} added to zk dead server path success", + zkNodeType, zNode); + } + } + + } + + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java index 5bdc6f8cd7..57ac13e3be 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java @@ -52,6 +52,9 @@ public class ZookeeperConfig { @Value("${zookeeper.dolphinscheduler.root:/dolphinscheduler}") private String dsRoot; + @Value("${zookeeper.max.wait.time:10000}") + private int maxWaitTime; + public String getServerList() { return serverList; } @@ -115,4 +118,12 @@ public class ZookeeperConfig { public void setDsRoot(String dsRoot) { this.dsRoot = dsRoot; } + + public int getMaxWaitTime() { + return maxWaitTime; + } + + public void setMaxWaitTime(int maxWaitTime) { + this.maxWaitTime = maxWaitTime; + } } \ No newline at end of file diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java new file mode 100644 index 0000000000..f828c0772f --- /dev/null +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.service.zk; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; + +import java.util.concurrent.TimeUnit; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +/** + * register operator test + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class RegisterOperatorTest { + + private static ZKServer zkServer; + + @InjectMocks + private RegisterOperator registerOperator; + + @Mock + private ZookeeperConfig zookeeperConfig; + + private static final String DS_ROOT = "/dolphinscheduler"; + private static final String MASTER_NODE = "127.0.0.1:5678"; + + @Before + public void before() { + new Thread(() -> { + if (zkServer == null) { + zkServer = new ZKServer(); + } + zkServer.startLocalZkServer(2185); + }).start(); + } + + @Test + public void testAfterPropertiesSet() throws Exception { + TimeUnit.SECONDS.sleep(10); + Mockito.when(zookeeperConfig.getServerList()).thenReturn("127.0.0.1:2185"); + Mockito.when(zookeeperConfig.getBaseSleepTimeMs()).thenReturn(100); + Mockito.when(zookeeperConfig.getMaxRetries()).thenReturn(10); + Mockito.when(zookeeperConfig.getMaxSleepMs()).thenReturn(30000); + Mockito.when(zookeeperConfig.getSessionTimeoutMs()).thenReturn(60000); + Mockito.when(zookeeperConfig.getConnectionTimeoutMs()).thenReturn(30000); + Mockito.when(zookeeperConfig.getDigest()).thenReturn(""); + Mockito.when(zookeeperConfig.getDsRoot()).thenReturn(DS_ROOT); + Mockito.when(zookeeperConfig.getMaxWaitTime()).thenReturn(30000); + + registerOperator.afterPropertiesSet(); + Assert.assertNotNull(registerOperator.getZkClient()); + } + + @After + public void after() { + if (zkServer != null) { + zkServer.stop(); + } + } + + @Test + public void testGetDeadZNodeParentPath() throws Exception { + + testAfterPropertiesSet(); + String path = registerOperator.getDeadZNodeParentPath(); + + Assert.assertEquals(DS_ROOT + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS, path); + } + + @Test + public void testHandleDeadServer() throws Exception { + testAfterPropertiesSet(); + registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP); + String path = registerOperator.getDeadZNodeParentPath(); + Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + + } + + @Test + public void testRemoveDeadServerByHost() throws Exception { + testAfterPropertiesSet(); + String path = registerOperator.getDeadZNodeParentPath(); + + registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP); + Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + + registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_PREFIX); + Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index aff1f98034..fd0ea39b06 100644 --- a/pom.xml +++ b/pom.xml @@ -817,6 +817,7 @@ **/server/master/MasterExecThreadTest.java --> **/server/master/ParamsTest.java **/server/register/ZookeeperNodeManagerTest.java + **/server/register/ZookeeperRegistryCenterTest.java **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java **/server/utils/FlinkArgsUtilsTest.java @@ -838,6 +839,8 @@ **/service/quartz/cron/CronUtilsTest.java **/service/zk/DefaultEnsembleProviderTest.java **/service/zk/ZKServerTest.java + **/service/zk/CuratorZookeeperClientTest.java + **/service/zk/RegisterOperatorTest.java **/service/queue/TaskUpdateQueueTest.java **/service/queue/TaskPriorityTest.java **/dao/mapper/DataSourceUserMapperTest.java