From 43ba29a2dd72a480d582258323ec5d062333fe96 Mon Sep 17 00:00:00 2001
From: lgcareer <18610854716@163.com>
Date: Tue, 2 Mar 2021 14:05:00 +0800
Subject: [PATCH] [Improvement-4624] When the server exist in the dead server
list of zk,need stop service byself (#4626)
* [Improvement-4624] When the server exist in the dead server list of zk,need stop service byself
* [Improvement-4624]fix check style and add MaterRegistryTest
* [Improvement-4624]fix check style and add ZookeeperRegistryCenterTest
* [Improvement-4624]fix check style and add ZookeeperRegistryCenterTest
* [Improvement-4624]add RegisterOperatorTest
* [Improvement-4624]update RegisterOperatorTest
* [Improvement-4624]resolve code smell
* [Improvement-4624]revert LICENSE-@form-create-element-ui
---
.../server/master/MasterServer.java | 69 +++++---
.../dispatch/host/LowerWeightHostManager.java | 2 +-
.../master/registry/MasterRegistry.java | 38 +++--
.../server/registry/HeartBeatTask.java | 33 +++-
.../server/registry/ZookeeperNodeManager.java | 4 +-
.../registry/ZookeeperRegistryCenter.java | 92 ++++++++---
.../server/worker/WorkerServer.java | 74 ++++++---
.../worker/registry/WorkerRegistry.java | 36 ++--
.../server/zk/ZKMasterClient.java | 3 +
.../TaskPriorityQueueConsumerTest.java | 4 +-
.../master/registry/MasterRegistryTest.java | 7 +-
.../registry/ZookeeperRegistryCenterTest.java | 61 +++++++
.../processor/TaskCallbackServiceTest.java | 4 +-
.../worker/registry/WorkerRegistryTest.java | 44 +++--
.../service/zk/AbstractZKClient.java | 83 +---------
.../service/zk/RegisterOperator.java | 155 ++++++++++++++++++
.../service/zk/ZookeeperCachedOperator.java | 3 +
.../service/zk/RegisterOperatorTest.java | 116 +++++++++++++
pom.xml | 2 +
19 files changed, 619 insertions(+), 211 deletions(-)
create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
create mode 100644 dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
create mode 100644 dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
index 18882a2fb5..c2ea2c4073 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
@@ -14,9 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -25,6 +27,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
+import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
@@ -42,13 +45,10 @@ import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
-
-
-
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class})
})
-public class MasterServer {
+public class MasterServer implements IStoppable {
/**
* logger of MasterServer
@@ -62,8 +62,8 @@ public class MasterServer {
private MasterConfig masterConfig;
/**
- * spring application context
- * only use it for initialization
+ * spring application context
+ * only use it for initialization
*/
@Autowired
private SpringApplicationContext springApplicationContext;
@@ -73,6 +73,12 @@ public class MasterServer {
*/
private NettyRemotingServer nettyRemotingServer;
+ /**
+ * master registry
+ */
+ @Autowired
+ private MasterRegistry masterRegistry;
+
/**
* zk master client
*/
@@ -87,8 +93,9 @@ public class MasterServer {
/**
* master server startup
- *
+ *
* master server not use web service
+ *
* @param args arguments
*/
public static void main(String[] args) {
@@ -100,16 +107,23 @@ public class MasterServer {
* run master server
*/
@PostConstruct
- public void run(){
+ public void run() {
+ try {
+ //init remoting server
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(masterConfig.getListenPort());
+ this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
+ this.nettyRemotingServer.start();
- //init remoting server
- NettyServerConfig serverConfig = new NettyServerConfig();
- serverConfig.setListenPort(masterConfig.getListenPort());
- this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
- this.nettyRemotingServer.start();
+ this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this);
+
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
// self tolerant
this.zkMasterClient.start();
@@ -137,7 +151,9 @@ public class MasterServer {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- close("shutdownHook");
+ if (Stopper.isRunning()) {
+ close("shutdownHook");
+ }
}
}));
@@ -145,13 +161,14 @@ public class MasterServer {
/**
* gracefully close
+ *
* @param cause close cause
*/
public void close(String cause) {
try {
//execute only once
- if(Stopper.isStopped()){
+ if (Stopper.isStopped()) {
return;
}
@@ -163,24 +180,32 @@ public class MasterServer {
try {
//thread sleep 3 seconds for thread quietly stop
Thread.sleep(3000L);
- }catch (Exception e){
+ } catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
//
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
+ this.masterRegistry.unRegistry();
this.zkMasterClient.close();
//close quartz
- try{
+ try {
QuartzExecutors.getInstance().shutdown();
logger.info("Quartz service stopped");
- }catch (Exception e){
- logger.warn("Quartz service stopped exception:{}",e.getMessage());
+ } catch (Exception e) {
+ logger.warn("Quartz service stopped exception:{}", e.getMessage());
}
+
} catch (Exception e) {
logger.error("master server stop exception ", e);
+ } finally {
System.exit(-1);
}
}
+
+ @Override
+ public void stop(String cause) {
+ close(cause);
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
index 1872ae0a6e..ac7d8b0ffc 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
@@ -149,7 +149,7 @@ public class LowerWeightHostManager extends CommonHostManager {
String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup);
Set hostWeights = new HashSet<>(nodes.size());
for(String node : nodes){
- String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
+ String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
if(StringUtils.isNotEmpty(heartbeat)
&& heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
String[] parts = heartbeat.split(COMMA);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
index 37d6e72243..b492395a0c 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.registry;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
@@ -24,9 +25,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
import java.util.Date;
import java.util.concurrent.Executors;
@@ -84,30 +83,29 @@ public class MasterRegistry {
public void registry() {
String address = NetUtils.getHost();
String localNodePath = getMasterPath();
- zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
- zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
+ zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
+ (client, newState) -> {
if (newState == ConnectionState.LOST) {
logger.error("master : {} connection lost from zookeeper", address);
} else if (newState == ConnectionState.RECONNECTED) {
logger.info("master : {} reconnected to zookeeper", address);
- zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
+ zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("master : {} connection SUSPENDED ", address);
+ zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
}
- }
- });
+ });
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
masterConfig.getMasterReservedMemory(),
masterConfig.getMasterMaxCpuloadAvg(),
Sets.newHashSet(getMasterPath()),
+ Constants.MASTER_PREFIX,
zookeeperRegistryCenter);
- this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0, masterHeartbeatInterval, TimeUnit.SECONDS);
- logger.info("master node : {} registry to ZK path {} successfully with heartBeatInterval : {}s"
- , address, localNodePath, masterHeartbeatInterval);
+ this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
+ logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
}
/**
@@ -116,16 +114,14 @@ public class MasterRegistry {
public void unRegistry() {
String address = getLocalAddress();
String localNodePath = getMasterPath();
- heartBeatExecutor.shutdownNow();
- zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
- logger.info("master node : {} unRegistry from ZK path {}."
- , address, localNodePath);
+ zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath);
+ logger.info("master node : {} unRegistry to ZK.", address);
}
/**
* get master path
*/
- private String getMasterPath() {
+ public String getMasterPath() {
String address = getLocalAddress();
return this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
}
@@ -139,4 +135,12 @@ public class MasterRegistry {
}
+ /**
+ * get zookeeper registry center
+ * @return ZookeeperRegistryCenter
+ */
+ public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
+ return zookeeperRegistryCenter;
+ }
+
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
index b89d85126f..a12583b535 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.registry;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
@@ -29,7 +30,10 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class HeartBeatTask extends Thread {
+/**
+ * Heart beat task
+ */
+public class HeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
@@ -37,23 +41,39 @@ public class HeartBeatTask extends Thread {
private double reservedMemory;
private double maxCpuloadAvg;
private Set heartBeatPaths;
+ private String serverType;
private ZookeeperRegistryCenter zookeeperRegistryCenter;
+ /**
+ * server stop or not
+ */
+ protected IStoppable stoppable = null;
public HeartBeatTask(String startTime,
double reservedMemory,
double maxCpuloadAvg,
Set heartBeatPaths,
+ String serverType,
ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.startTime = startTime;
this.reservedMemory = reservedMemory;
this.maxCpuloadAvg = maxCpuloadAvg;
this.heartBeatPaths = heartBeatPaths;
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
+ this.serverType = serverType;
}
@Override
public void run() {
try {
+
+ // check dead or not in zookeeper
+ for (String heartBeatPath : heartBeatPaths) {
+ if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath, serverType)) {
+ zookeeperRegistryCenter.getStoppable().stop("i was judged to death, release resources and stop myself");
+ return;
+ }
+ }
+
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
double loadAverage = OSUtils.loadAverage();
@@ -79,10 +99,19 @@ public class HeartBeatTask extends Thread {
builder.append(OSUtils.getProcessID());
for (String heartBeatPath : heartBeatPaths) {
- zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString());
+ zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath, builder.toString());
}
} catch (Throwable ex) {
logger.error("error write heartbeat info", ex);
}
}
+
+ /**
+ * for stop server
+ *
+ * @param serverStoppable server stoppable interface
+ */
+ public void setStoppable(IStoppable serverStoppable) {
+ this.stoppable = serverStoppable;
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
index b1a5edee38..4dfdb80e52 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
@@ -93,11 +93,11 @@ public class ZookeeperNodeManager implements InitializingBean {
/**
* init MasterNodeListener listener
*/
- registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener());
+ registryCenter.getRegisterOperator().addListener(new MasterNodeListener());
/**
* init WorkerNodeListener listener
*/
- registryCenter.getZookeeperCachedOperator().addListener(new WorkerGroupNodeListener());
+ registryCenter.getRegisterOperator().addListener(new WorkerGroupNodeListener());
}
/**
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
index 3ca62bee6a..9017a13a65 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
@@ -17,19 +17,27 @@
package org.apache.dolphinscheduler.server.registry;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
/**
- * zookeeper register center
+ * zookeeper register center
*/
@Service
public class ZookeeperRegistryCenter implements InitializingBean {
@@ -38,10 +46,9 @@ public class ZookeeperRegistryCenter implements InitializingBean {
@Autowired
- protected ZookeeperCachedOperator zookeeperCachedOperator;
-
+ protected RegisterOperator registerOperator;
@Autowired
- private ZookeeperConfig zookeeperConfig;
+ private ZookeeperConfig zookeeperConfig;
/**
* nodes namespace
@@ -60,6 +67,8 @@ public class ZookeeperRegistryCenter implements InitializingBean {
public final String EMPTY = "";
+ private IStoppable stoppable;
+
@Override
public void afterPropertiesSet() throws Exception {
NODES = zookeeperConfig.getDsRoot() + "/nodes";
@@ -82,23 +91,22 @@ public class ZookeeperRegistryCenter implements InitializingBean {
* init nodes
*/
private void initNodes() {
- zookeeperCachedOperator.persist(MASTER_PATH, EMPTY);
- zookeeperCachedOperator.persist(WORKER_PATH, EMPTY);
+ registerOperator.persist(MASTER_PATH, EMPTY);
+ registerOperator.persist(WORKER_PATH, EMPTY);
}
/**
* close
*/
public void close() {
- if (isStarted.compareAndSet(true, false)) {
- if (zookeeperCachedOperator != null) {
- zookeeperCachedOperator.close();
- }
+ if (isStarted.compareAndSet(true, false) && registerOperator != null) {
+ registerOperator.close();
}
}
/**
* get master path
+ *
* @return master path
*/
public String getMasterPath() {
@@ -107,6 +115,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker path
+ *
* @return worker path
*/
public String getWorkerPath() {
@@ -114,7 +123,8 @@ public class ZookeeperRegistryCenter implements InitializingBean {
}
/**
- * get master nodes directly
+ * get master nodes directly
+ *
* @return master nodes
*/
public Set getMasterNodesDirectly() {
@@ -123,7 +133,8 @@ public class ZookeeperRegistryCenter implements InitializingBean {
}
/**
- * get worker nodes directly
+ * get worker nodes directly
+ *
* @return master nodes
*/
public Set getWorkerNodesDirectly() {
@@ -133,6 +144,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker group directly
+ *
* @return worker group nodes
*/
public Set getWorkerGroupDirectly() {
@@ -142,6 +154,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker group nodes
+ *
* @param workerGroup
* @return
*/
@@ -152,6 +165,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* whether worker path
+ *
* @param path path
* @return result
*/
@@ -161,6 +175,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* whether master path
+ *
* @param path path
* @return result
*/
@@ -170,6 +185,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker group path
+ *
* @param workerGroup workerGroup
* @return worker group path
*/
@@ -179,19 +195,53 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get children nodes
+ *
* @param key key
* @return children nodes
*/
public List getChildrenKeys(final String key) {
- return zookeeperCachedOperator.getChildrenKeys(key);
+ return registerOperator.getChildrenKeys(key);
+ }
+
+ /**
+ * @return get dead server node parent path
+ */
+ public String getDeadZNodeParentPath() {
+ return registerOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
+ }
+
+ public void setStoppable(IStoppable stoppable) {
+ this.stoppable = stoppable;
+ }
+
+ public IStoppable getStoppable() {
+ return stoppable;
}
/**
- * get zookeeperCachedOperator
- * @return zookeeperCachedOperator
+ * check dead server or not , if dead, stop self
+ *
+ * @param zNode node path
+ * @param serverType master or worker prefix
+ * @return true if not exists
+ * @throws Exception errors
*/
- public ZookeeperCachedOperator getZookeeperCachedOperator() {
- return zookeeperCachedOperator;
+ protected boolean checkIsDeadServer(String zNode, String serverType) throws Exception {
+ //ip_sequenceno
+ String[] zNodesPath = zNode.split("\\/");
+ String ipSeqNo = zNodesPath[zNodesPath.length - 1];
+
+ String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
+ String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
+
+ if (!registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath)) {
+ return true;
+ }
+
+ return false;
}
+ public RegisterOperator getRegisterOperator() {
+ return registerOperator;
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
index a267b5bf32..10880bf94f 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
@@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.IStoppable;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
@@ -33,20 +35,24 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import java.util.Set;
+
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.springframework.beans.factory.annotation.Autowired;
+
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
/**
- * worker server
+ * worker server
*/
@ComponentScan("org.apache.dolphinscheduler")
-public class WorkerServer {
+public class WorkerServer implements IStoppable {
/**
* logger
@@ -54,31 +60,31 @@ public class WorkerServer {
private static final Logger logger = LoggerFactory.getLogger(WorkerServer.class);
/**
- * netty remote server
+ * netty remote server
*/
private NettyRemotingServer nettyRemotingServer;
/**
- * worker registry
+ * worker registry
*/
@Autowired
private WorkerRegistry workerRegistry;
/**
- * worker config
+ * worker config
*/
@Autowired
private WorkerConfig workerConfig;
/**
- * spring application context
- * only use it for initialization
+ * spring application context
+ * only use it for initialization
*/
@Autowired
private SpringApplicationContext springApplicationContext;
/**
- * alert model netty remote server
+ * alert model netty remote server
*/
private AlertClientService alertClientService;
@@ -105,24 +111,31 @@ public class WorkerServer {
*/
@PostConstruct
public void run() {
- logger.info("start worker server...");
-
- //alert-server client registry
- alertClientService = new AlertClientService(workerConfig.getAlertListenHost(),Constants.ALERT_RPC_PORT);
-
- //init remoting server
- NettyServerConfig serverConfig = new NettyServerConfig();
- serverConfig.setListenPort(workerConfig.getListenPort());
- this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor(alertClientService));
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
- this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
- this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
- this.nettyRemotingServer.start();
-
- // worker registry
- this.workerRegistry.registry();
-
+ try {
+ logger.info("start worker server...");
+
+ //init remoting server
+ NettyServerConfig serverConfig = new NettyServerConfig();
+ serverConfig.setListenPort(workerConfig.getListenPort());
+ this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
+ this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
+ this.nettyRemotingServer.start();
+
+ this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this);
+ Set workerZkPaths = this.workerRegistry.getWorkerZkPaths();
+ this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP);
+ // worker registry
+ this.workerRegistry.registry();
+
+ // retry report task status
+ this.retryReportTaskStatusThread.start();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
// task execute manager
this.workerManagerThread.start();
@@ -135,7 +148,9 @@ public class WorkerServer {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
- close("shutdownHook");
+ if (Stopper.isRunning()) {
+ close("shutdownHook");
+ }
}
}));
}
@@ -167,8 +182,13 @@ public class WorkerServer {
} catch (Exception e) {
logger.error("worker server stop exception ", e);
+ } finally {
System.exit(-1);
}
}
+ @Override
+ public void stop(String cause) {
+ close(cause);
+ }
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
index 3d4d73f51a..b763497a04 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SLASH;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -29,9 +30,7 @@ import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.framework.state.ConnectionStateListener;
import java.util.Date;
import java.util.Set;
@@ -89,6 +88,14 @@ public class WorkerRegistry {
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
+ /**
+ * get zookeeper registry center
+ * @return ZookeeperRegistryCenter
+ */
+ public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
+ return zookeeperRegistryCenter;
+ }
+
/**
* registry
*/
@@ -98,28 +105,27 @@ public class WorkerRegistry {
int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval();
for (String workerZKPath : workerZkPaths) {
- zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, "");
- zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
+ zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
+ (client,newState) -> {
if (newState == ConnectionState.LOST) {
logger.error("worker : {} connection lost from zookeeper", address);
} else if (newState == ConnectionState.RECONNECTED) {
logger.info("worker : {} reconnected to zookeeper", address);
- zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, "");
+ zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("worker : {} connection SUSPENDED ", address);
}
- }
- });
+ });
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
}
HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime,
- this.workerConfig.getWorkerReservedMemory(),
- this.workerConfig.getWorkerMaxCpuloadAvg(),
- workerZkPaths,
- this.zookeeperRegistryCenter);
+ this.workerConfig.getWorkerReservedMemory(),
+ this.workerConfig.getWorkerMaxCpuloadAvg(),
+ workerZkPaths,
+ Constants.WORKER_PREFIX,
+ this.zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
@@ -132,7 +138,7 @@ public class WorkerRegistry {
String address = getLocalAddress();
Set workerZkPaths = getWorkerZkPaths();
for (String workerZkPath : workerZkPaths) {
- zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath);
+ zookeeperRegistryCenter.getRegisterOperator().remove(workerZkPath);
logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath);
}
this.heartBeatExecutor.shutdownNow();
@@ -141,7 +147,7 @@ public class WorkerRegistry {
/**
* get worker path
*/
- private Set getWorkerZkPaths() {
+ public Set getWorkerZkPaths() {
Set workerZkPaths = Sets.newHashSet();
String address = getLocalAddress();
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
index 1f0926ba0c..28b752ad5d 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
@@ -85,6 +85,9 @@ public class ZKMasterClient extends AbstractZKClient {
// Master registry
masterRegistry.registry();
+ String registPath = this.masterRegistry.getMasterPath();
+ masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP);
+
// init system znode
this.initSystemZNode();
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index 8c2321dd8e..74cd2da6c8 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -45,7 +45,7 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import java.util.ArrayList;
@@ -67,7 +67,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class,
NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
- ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class,
+ ZookeeperNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class,
CuratorZookeeperClient.class})
public class TaskPriorityQueueConsumerTest {
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
index a180f51576..9b62473930 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@@ -60,8 +59,8 @@ public class MasterRegistryTest {
masterRegistry.registry();
String masterPath = zookeeperRegistryCenter.getMasterPath();
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
- String masterNodePath = masterPath + "/" + (NetUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort()));
- String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath);
+ String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort());
+ String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath);
Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length);
masterRegistry.unRegistry();
}
@@ -73,7 +72,7 @@ public class MasterRegistryTest {
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
masterRegistry.unRegistry();
String masterPath = zookeeperRegistryCenter.getMasterPath();
- List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(masterPath);
+ List childrenKeys = zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(masterPath);
Assert.assertTrue(childrenKeys.isEmpty());
}
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
new file mode 100644
index 0000000000..24bb25c97f
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.registry;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
+import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * zookeeper registry center test
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class ZookeeperRegistryCenterTest {
+
+ @InjectMocks
+ private ZookeeperRegistryCenter zookeeperRegistryCenter;
+
+ @Mock
+ protected RegisterOperator registerOperator;
+
+ @Mock
+ private ZookeeperConfig zookeeperConfig;
+
+ private static final String DS_ROOT = "/dolphinscheduler";
+
+ @Test
+ public void testGetDeadZNodeParentPath() {
+ ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
+ zookeeperConfig.setDsRoot(DS_ROOT);
+ Mockito.when(registerOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
+
+ String deadZNodeParentPath = zookeeperRegistryCenter.getDeadZNodeParentPath();
+
+ Assert.assertEquals(deadZNodeParentPath, DS_ROOT + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS);
+
+ }
+
+}
\ No newline at end of file
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
index 8938f49773..bf04f1f569 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
@@ -43,7 +43,7 @@ import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import java.util.Date;
@@ -71,7 +71,7 @@ import io.netty.channel.Channel;
ZookeeperRegistryCenter.class,
MasterConfig.class,
WorkerConfig.class,
- ZookeeperCachedOperator.class,
+ RegisterOperator.class,
ZookeeperConfig.class,
ZookeeperNodeManager.class,
TaskCallbackService.class,
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
index a71e48030d..d7066c0d40 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
@@ -19,18 +19,20 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
-import org.apache.curator.framework.listen.Listenable;
-import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
+import org.apache.dolphinscheduler.service.zk.RegisterOperator;
+
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionStateListener;
+
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -61,7 +63,7 @@ public class WorkerRegistryTest {
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@Mock
- private ZookeeperCachedOperator zookeeperCachedOperator;
+ private RegisterOperator registerOperator;
@Mock
private CuratorFrameworkImpl zkClient;
@@ -69,15 +71,21 @@ public class WorkerRegistryTest {
@Mock
private WorkerConfig workerConfig;
+ private static final Set workerGroups;
+
+ static {
+ workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
+ }
+
@Before
public void before() {
- Set workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
+
Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups);
Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker");
- Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator);
- Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient);
- Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn(
+ Mockito.when(zookeeperRegistryCenter.getRegisterOperator()).thenReturn(registerOperator);
+ Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient()).thenReturn(zkClient);
+ Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable()).thenReturn(
new Listenable() {
@Override
public void addListener(ConnectionStateListener connectionStateListener) {
@@ -114,7 +122,7 @@ public class WorkerRegistryTest {
int i = 0;
for (String workerGroup : workerConfig.getWorkerGroups()) {
String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (NetUtils.getAddr(workerConfig.getListenPort()));
- String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath);
+ String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(workerZkPath);
if (0 == i) {
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/"));
} else {
@@ -156,7 +164,7 @@ public class WorkerRegistryTest {
for (String workerGroup : workerConfig.getWorkerGroups()) {
String workerGroupPath = workerPath + "/" + workerGroup.trim();
- List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath);
+ List childrenKeys = zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(workerGroupPath);
Assert.assertTrue(childrenKeys.isEmpty());
}
@@ -168,4 +176,10 @@ public class WorkerRegistryTest {
workerRegistry.unRegistry();
}
+
+ @Test
+ public void testGetWorkerZkPaths() {
+ workerRegistry.init();
+ Assert.assertEquals(workerGroups.size(),workerRegistry.getWorkerZkPaths().size());
+ }
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
index 37d8f10c93..7cdf680090 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
@@ -17,14 +17,8 @@
package org.apache.dolphinscheduler.service.zk;
-import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.COLON;
-import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
-import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
-import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
-import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
@@ -47,57 +41,10 @@ import org.springframework.stereotype.Component;
* abstract zookeeper client
*/
@Component
-public abstract class AbstractZKClient extends ZookeeperCachedOperator {
+public abstract class AbstractZKClient extends RegisterOperator {
private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
- /**
- * remove dead server by host
- *
- * @param host host
- * @param serverType serverType
- */
- public void removeDeadServerByHost(String host, String serverType) {
- List deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
- for (String serverPath : deadServers) {
- if (serverPath.startsWith(serverType + UNDERLINE + host)) {
- String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
- super.remove(server);
- logger.info("{} server {} deleted from zk dead server path success", serverType, host);
- }
- }
- }
-
- /**
- * opType(add): if find dead server , then add to zk deadServerPath
- * opType(delete): delete path from zk
- *
- * @param zNode node path
- * @param zkNodeType master or worker
- * @param opType delete or add
- */
- public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) {
- String host = getHostByEventDataPath(zNode);
- String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
-
- //check server restart, if restart , dead server path in zk should be delete
- if (opType.equals(DELETE_ZK_OP)) {
- removeDeadServerByHost(host, type);
-
- } else if (opType.equals(ADD_ZK_OP)) {
- String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
- if (!super.isExisted(deadServerPath)) {
- //add dead server info to zk dead server path : /dead-servers/
-
- super.persist(deadServerPath, (type + UNDERLINE + host));
-
- logger.info("{} server dead , and {} added to zk dead server path success",
- zkNodeType, zNode);
- }
- }
-
- }
-
/**
* get active master num
*
@@ -187,7 +134,7 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
/**
* check the zookeeper node already exists
*
- * @param host host
+ * @param host host
* @param zkNodeType zookeeper node type
* @return true if exists
*/
@@ -247,12 +194,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
return path;
}
- /**
- * @return get dead server node parent path
- */
- protected String getDeadZNodeParentPath() {
- return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
- }
/**
* @return get master start up lock path
@@ -310,26 +251,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
}
}
- /**
- * get host ip, string format: masterParentPath/ip
- *
- * @param path path
- * @return host ip, string format: masterParentPath/ip
- */
- protected String getHostByEventDataPath(String path) {
- if (StringUtils.isEmpty(path)) {
- logger.error("empty path!");
- return "";
- }
- String[] pathArray = path.split(SINGLE_SLASH);
- if (pathArray.length < 1) {
- logger.error("parse ip error: {}", path);
- return "";
- }
- return pathArray[pathArray.length - 1];
-
- }
-
@Override
public String toString() {
return "AbstractZKClient{"
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
new file mode 100644
index 0000000000..0fd4a4fa92
--- /dev/null
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.zk;
+
+import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
+import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
+import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+
+import java.util.List;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * register operator
+ */
+@Component
+public class RegisterOperator extends ZookeeperCachedOperator {
+
+ private final Logger logger = LoggerFactory.getLogger(RegisterOperator.class);
+
+ /**
+ * @return get dead server node parent path
+ */
+ protected String getDeadZNodeParentPath() {
+ return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
+ }
+
+ /**
+ * remove dead server by host
+ *
+ * @param host host
+ * @param serverType serverType
+ * @throws Exception
+ */
+ public void removeDeadServerByHost(String host, String serverType) throws Exception {
+ List deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
+ for (String serverPath : deadServers) {
+ if (serverPath.startsWith(serverType + UNDERLINE + host)) {
+ String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
+ super.remove(server);
+ logger.info("{} server {} deleted from zk dead server path success", serverType, host);
+ }
+ }
+ }
+
+ /**
+ * get host ip, string format: masterParentPath/ip
+ *
+ * @param path path
+ * @return host ip, string format: masterParentPath/ip
+ */
+ protected String getHostByEventDataPath(String path) {
+ if (StringUtils.isEmpty(path)) {
+ logger.error("empty path!");
+ return "";
+ }
+ String[] pathArray = path.split(SINGLE_SLASH);
+ if (pathArray.length < 1) {
+ logger.error("parse ip error: {}", path);
+ return "";
+ }
+ return pathArray[pathArray.length - 1];
+
+ }
+
+ /**
+ * opType(add): if find dead server , then add to zk deadServerPath
+ * opType(delete): delete path from zk
+ *
+ * @param zNode node path
+ * @param zkNodeType master or worker
+ * @param opType delete or add
+ * @throws Exception errors
+ */
+ public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
+ String host = getHostByEventDataPath(zNode);
+ String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
+
+ //check server restart, if restart , dead server path in zk should be delete
+ if (opType.equals(DELETE_ZK_OP)) {
+ removeDeadServerByHost(host, type);
+
+ } else if (opType.equals(ADD_ZK_OP)) {
+ String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
+ if (!super.isExisted(deadServerPath)) {
+ //add dead server info to zk dead server path : /dead-servers/
+
+ super.persist(deadServerPath, (type + UNDERLINE + host));
+
+ logger.info("{} server dead , and {} added to zk dead server path success",
+ zkNodeType, zNode);
+ }
+ }
+
+ }
+
+ /**
+ * opType(add): if find dead server , then add to zk deadServerPath
+ * opType(delete): delete path from zk
+ *
+ * @param zNodeSet node path set
+ * @param zkNodeType master or worker
+ * @param opType delete or add
+ * @throws Exception errors
+ */
+ public void handleDeadServer(Set zNodeSet, ZKNodeType zkNodeType, String opType) throws Exception {
+
+ String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
+ for (String zNode : zNodeSet) {
+ String host = getHostByEventDataPath(zNode);
+ //check server restart, if restart , dead server path in zk should be delete
+ if (opType.equals(DELETE_ZK_OP)) {
+ removeDeadServerByHost(host, type);
+
+ } else if (opType.equals(ADD_ZK_OP)) {
+ String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
+ if (!super.isExisted(deadServerPath)) {
+ //add dead server info to zk dead server path : /dead-servers/
+
+ super.persist(deadServerPath, (type + UNDERLINE + host));
+
+ logger.info("{} server dead , and {} added to zk dead server path success",
+ zkNodeType, zNode);
+ }
+ }
+
+ }
+
+ }
+}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
index 88c339b045..54913cf915 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperCachedOperator.java
@@ -32,6 +32,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+/**
+ * zookeeper cache operator
+ */
@Component
public class ZookeeperCachedOperator extends ZookeeperOperator {
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
new file mode 100644
index 0000000000..f828c0772f
--- /dev/null
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.service.zk;
+
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.ZKNodeType;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+/**
+ * register operator test
+ */
+@RunWith(MockitoJUnitRunner.Silent.class)
+public class RegisterOperatorTest {
+
+ private static ZKServer zkServer;
+
+ @InjectMocks
+ private RegisterOperator registerOperator;
+
+ @Mock
+ private ZookeeperConfig zookeeperConfig;
+
+ private static final String DS_ROOT = "/dolphinscheduler";
+ private static final String MASTER_NODE = "127.0.0.1:5678";
+
+ @Before
+ public void before() {
+ new Thread(() -> {
+ if (zkServer == null) {
+ zkServer = new ZKServer();
+ }
+ zkServer.startLocalZkServer(2185);
+ }).start();
+ }
+
+ @Test
+ public void testAfterPropertiesSet() throws Exception {
+ TimeUnit.SECONDS.sleep(10);
+ Mockito.when(zookeeperConfig.getServerList()).thenReturn("127.0.0.1:2185");
+ Mockito.when(zookeeperConfig.getBaseSleepTimeMs()).thenReturn(100);
+ Mockito.when(zookeeperConfig.getMaxRetries()).thenReturn(10);
+ Mockito.when(zookeeperConfig.getMaxSleepMs()).thenReturn(30000);
+ Mockito.when(zookeeperConfig.getSessionTimeoutMs()).thenReturn(60000);
+ Mockito.when(zookeeperConfig.getConnectionTimeoutMs()).thenReturn(30000);
+ Mockito.when(zookeeperConfig.getDigest()).thenReturn("");
+ Mockito.when(zookeeperConfig.getDsRoot()).thenReturn(DS_ROOT);
+ Mockito.when(zookeeperConfig.getMaxWaitTime()).thenReturn(30000);
+
+ registerOperator.afterPropertiesSet();
+ Assert.assertNotNull(registerOperator.getZkClient());
+ }
+
+ @After
+ public void after() {
+ if (zkServer != null) {
+ zkServer.stop();
+ }
+ }
+
+ @Test
+ public void testGetDeadZNodeParentPath() throws Exception {
+
+ testAfterPropertiesSet();
+ String path = registerOperator.getDeadZNodeParentPath();
+
+ Assert.assertEquals(DS_ROOT + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS, path);
+ }
+
+ @Test
+ public void testHandleDeadServer() throws Exception {
+ testAfterPropertiesSet();
+ registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP);
+ String path = registerOperator.getDeadZNodeParentPath();
+ Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+
+ }
+
+ @Test
+ public void testRemoveDeadServerByHost() throws Exception {
+ testAfterPropertiesSet();
+ String path = registerOperator.getDeadZNodeParentPath();
+
+ registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP);
+ Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+
+ registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_PREFIX);
+ Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
+ }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index e69ff3778c..a07fde2fd1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -930,6 +930,7 @@
**/server/master/processor/TaskKillResponseProcessorTest.java
**/server/master/processor/queue/TaskResponseServiceTest.java
**/server/register/ZookeeperNodeManagerTest.java
+ **/server/register/ZookeeperRegistryCenterTest.java
**/server/utils/DataxUtilsTest.java
**/server/utils/ExecutionContextTestUtils.java
**/server/utils/HostTest.java
@@ -961,6 +962,7 @@
**/service/zk/DefaultEnsembleProviderTest.java
**/service/zk/ZKServerTest.java
**/service/zk/CuratorZookeeperClientTest.java
+ **/service/zk/RegisterOperatorTest.java
**/service/queue/TaskUpdateQueueTest.java
**/service/queue/PeerTaskInstancePriorityQueueTest.java