Browse Source

[1.3.6-Prepare][Improvement-4624][cherry pick] When the server exist in the dead server list of zk,need stop service byself (#5013)

lgcareer 4 years ago committed by GitHub
parent
commit
710024719e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 63
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java
  2. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  3. 73
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  4. 40
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
  5. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
  6. 92
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java
  7. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java
  8. 60
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  9. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  10. 35
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  11. 25
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
  12. 61
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java
  13. 78
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java
  14. 45
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java
  15. 542
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java
  16. 155
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
  17. 11
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
  18. 116
      dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java
  19. 3
      pom.xml

63
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
@ -25,6 +26,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService;
import org.apache.dolphinscheduler.server.worker.WorkerServer;
import org.apache.dolphinscheduler.server.zk.ZKMasterClient;
@ -42,13 +44,10 @@ import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.FilterType;
@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = {
@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class})
})
public class MasterServer {
public class MasterServer implements IStoppable {
/**
* logger of MasterServer
@ -73,6 +72,12 @@ public class MasterServer {
*/
private NettyRemotingServer nettyRemotingServer;
/**
* master registry
*/
@Autowired
private MasterRegistry masterRegistry;
/**
* zk master client
*/
@ -100,19 +105,26 @@ public class MasterServer {
* run master server
*/
@PostConstruct
public void run(){
public void run() {
try {
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
this.masterRegistry.getZookeeperRegistryCenter().setStoppable(this);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
// self tolerant
this.zkMasterClient.start();
this.zkMasterClient.start(this);
// scheduler start
this.masterSchedulerService.start();
@ -137,7 +149,9 @@ public class MasterServer {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
close("shutdownHook");
if (Stopper.isRunning()) {
close("shutdownHook");
}
}
}));
@ -145,13 +159,14 @@ public class MasterServer {
/**
* gracefully close
*
* @param cause close cause
*/
public void close(String cause) {
try {
//execute only once
if(Stopper.isStopped()){
if (Stopper.isStopped()) {
return;
}
@ -163,24 +178,32 @@ public class MasterServer {
try {
//thread sleep 3 seconds for thread quietly stop
Thread.sleep(3000L);
}catch (Exception e){
} catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
//
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
this.masterRegistry.unRegistry();
this.zkMasterClient.close();
//close quartz
try{
try {
QuartzExecutors.getInstance().shutdown();
logger.info("Quartz service stopped");
}catch (Exception e){
logger.warn("Quartz service stopped exception:{}",e.getMessage());
} catch (Exception e) {
logger.warn("Quartz service stopped exception:{}", e.getMessage());
}
} catch (Exception e) {
logger.error("master server stop exception ", e);
} finally {
System.exit(-1);
}
}
@Override
public void stop(String cause) {
close(cause);
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -149,7 +149,7 @@ public class LowerWeightHostManager extends CommonHostManager {
String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup);
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for(String node : nodes){
String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
if(StringUtils.isNotEmpty(heartbeat)
&& heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
String[] parts = heartbeat.split(COMMA);

73
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -14,8 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.curator.framework.state.ConnectionState;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -23,15 +34,6 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -40,7 +42,7 @@ import org.springframework.stereotype.Service;
import com.google.common.collect.Sets;
/**
* master registry
* master registry
*/
@Service
public class MasterRegistry {
@ -48,7 +50,7 @@ public class MasterRegistry {
private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class);
/**
* zookeeper registry center
* zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@ -65,42 +67,41 @@ public class MasterRegistry {
private ScheduledExecutorService heartBeatExecutor;
/**
* worker start time
* master start time
*/
private String startTime;
@PostConstruct
public void init(){
public void init() {
this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
/**
* registry
* registry
*/
public void registry() {
String address = OSUtils.getHost();
String localNodePath = getMasterPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if(newState == ConnectionState.LOST){
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
(client, newState) -> {
if (newState == ConnectionState.LOST) {
logger.error("master : {} connection lost from zookeeper", address);
} else if(newState == ConnectionState.RECONNECTED){
} else if (newState == ConnectionState.RECONNECTED) {
logger.info("master : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, "");
} else if(newState == ConnectionState.SUSPENDED){
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("master : {} connection SUSPENDED ", address);
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, "");
}
}
});
});
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
masterConfig.getMasterReservedMemory(),
masterConfig.getMasterMaxCpuloadAvg(),
Sets.newHashSet(getMasterPath()),
Constants.MASTER_PREFIX,
zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
@ -108,31 +109,37 @@ public class MasterRegistry {
}
/**
* remove registry info
* remove registry info
*/
public void unRegistry() {
String address = getLocalAddress();
String localNodePath = getMasterPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath);
zookeeperRegistryCenter.getRegisterOperator().remove(localNodePath);
logger.info("master node : {} unRegistry to ZK.", address);
}
/**
* get master path
* @return
* get master path
*/
private String getMasterPath() {
public String getMasterPath() {
String address = getLocalAddress();
String localNodePath = this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
return localNodePath;
return this.zookeeperRegistryCenter.getMasterPath() + "/" + address;
}
/**
* get local address
* get local address
* @return
*/
private String getLocalAddress(){
return OSUtils.getAddr(masterConfig.getListenPort());
}
/**
* get zookeeper registry center
* @return ZookeeperRegistryCenter
*/
public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
return zookeeperRegistryCenter;
}
}

40
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

@ -19,16 +19,21 @@ package org.apache.dolphinscheduler.server.registry;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
import java.util.Date;
import java.util.Set;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import java.util.Date;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HeartBeatTask extends Thread {
/**
* Heart beat task
*/
public class HeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
@ -36,23 +41,39 @@ public class HeartBeatTask extends Thread {
private double reservedMemory;
private double maxCpuloadAvg;
private Set<String> heartBeatPaths;
private String serverType;
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* server stop or not
*/
protected IStoppable stoppable = null;
public HeartBeatTask(String startTime,
double reservedMemory,
double maxCpuloadAvg,
Set<String> heartBeatPaths,
String serverType,
ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.startTime = startTime;
this.reservedMemory = reservedMemory;
this.maxCpuloadAvg = maxCpuloadAvg;
this.heartBeatPaths = heartBeatPaths;
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
this.serverType = serverType;
}
@Override
public void run() {
try {
// check dead or not in zookeeper
for (String heartBeatPath : heartBeatPaths) {
if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath, serverType)) {
zookeeperRegistryCenter.getStoppable().stop("i was judged to death, release resources and stop myself");
return;
}
}
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
double loadAverage = OSUtils.loadAverage();
@ -78,10 +99,19 @@ public class HeartBeatTask extends Thread {
builder.append(OSUtils.getProcessID());
for (String heartBeatPath : heartBeatPaths) {
zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString());
zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath, builder.toString());
}
} catch (Throwable ex) {
logger.error("error write heartbeat info", ex);
}
}
/**
* for stop server
*
* @param serverStoppable server stoppable interface
*/
public void setStoppable(IStoppable serverStoppable) {
this.stoppable = serverStoppable;
}
}

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java

@ -93,11 +93,11 @@ public class ZookeeperNodeManager implements InitializingBean {
/**
* init MasterNodeListener listener
*/
registryCenter.getZookeeperCachedOperator().addListener(new MasterNodeListener());
registryCenter.getRegisterOperator().addListener(new MasterNodeListener());
/**
* init WorkerNodeListener listener
*/
registryCenter.getZookeeperCachedOperator().addListener(new WorkerGroupNodeListener());
registryCenter.getRegisterOperator().addListener(new WorkerGroupNodeListener());
}
/**

92
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java

@ -17,19 +17,27 @@
package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* zookeeper register center
* zookeeper register center
*/
@Service
public class ZookeeperRegistryCenter implements InitializingBean {
@ -38,10 +46,9 @@ public class ZookeeperRegistryCenter implements InitializingBean {
@Autowired
protected ZookeeperCachedOperator zookeeperCachedOperator;
protected RegisterOperator registerOperator;
@Autowired
private ZookeeperConfig zookeeperConfig;
private ZookeeperConfig zookeeperConfig;
/**
* nodes namespace
@ -60,6 +67,8 @@ public class ZookeeperRegistryCenter implements InitializingBean {
public final String EMPTY = "";
private IStoppable stoppable;
@Override
public void afterPropertiesSet() throws Exception {
NODES = zookeeperConfig.getDsRoot() + "/nodes";
@ -82,23 +91,22 @@ public class ZookeeperRegistryCenter implements InitializingBean {
* init nodes
*/
private void initNodes() {
zookeeperCachedOperator.persist(MASTER_PATH, EMPTY);
zookeeperCachedOperator.persist(WORKER_PATH, EMPTY);
registerOperator.persist(MASTER_PATH, EMPTY);
registerOperator.persist(WORKER_PATH, EMPTY);
}
/**
* close
*/
public void close() {
if (isStarted.compareAndSet(true, false)) {
if (zookeeperCachedOperator != null) {
zookeeperCachedOperator.close();
}
if (isStarted.compareAndSet(true, false) && registerOperator != null) {
registerOperator.close();
}
}
/**
* get master path
*
* @return master path
*/
public String getMasterPath() {
@ -107,6 +115,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker path
*
* @return worker path
*/
public String getWorkerPath() {
@ -114,7 +123,8 @@ public class ZookeeperRegistryCenter implements InitializingBean {
}
/**
* get master nodes directly
* get master nodes directly
*
* @return master nodes
*/
public Set<String> getMasterNodesDirectly() {
@ -123,7 +133,8 @@ public class ZookeeperRegistryCenter implements InitializingBean {
}
/**
* get worker nodes directly
* get worker nodes directly
*
* @return master nodes
*/
public Set<String> getWorkerNodesDirectly() {
@ -133,6 +144,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker group directly
*
* @return worker group nodes
*/
public Set<String> getWorkerGroupDirectly() {
@ -142,6 +154,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker group nodes
*
* @param workerGroup
* @return
*/
@ -152,6 +165,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* whether worker path
*
* @param path path
* @return result
*/
@ -161,6 +175,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* whether master path
*
* @param path path
* @return result
*/
@ -170,6 +185,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker group path
*
* @param workerGroup workerGroup
* @return worker group path
*/
@ -179,19 +195,53 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get children nodes
*
* @param key key
* @return children nodes
*/
public List<String> getChildrenKeys(final String key) {
return zookeeperCachedOperator.getChildrenKeys(key);
return registerOperator.getChildrenKeys(key);
}
/**
* @return get dead server node parent path
*/
public String getDeadZNodeParentPath() {
return registerOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
}
public void setStoppable(IStoppable stoppable) {
this.stoppable = stoppable;
}
public IStoppable getStoppable() {
return stoppable;
}
/**
* get zookeeperCachedOperator
* @return zookeeperCachedOperator
* check dead server or not , if dead, stop self
*
* @param zNode node path
* @param serverType master or worker prefix
* @return true if not exists
* @throws Exception errors
*/
public ZookeeperCachedOperator getZookeeperCachedOperator() {
return zookeeperCachedOperator;
protected boolean checkIsDeadServer(String zNode, String serverType) throws Exception {
//ip_sequenceno
String[] zNodesPath = zNode.split("\\/");
String ipSeqNo = zNodesPath[zNodesPath.length - 1];
String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX;
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo;
if (!registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath)) {
return true;
}
return false;
}
public RegisterOperator getRegisterOperator() {
return registerOperator;
}
}

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.worker;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
@ -29,6 +31,9 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -42,7 +47,7 @@ import javax.annotation.PostConstruct;
* worker server
*/
@ComponentScan("org.apache.dolphinscheduler")
public class WorkerServer {
public class WorkerServer implements IStoppable {
/**
* logger
@ -106,7 +111,15 @@ public class WorkerServer {
this.nettyRemotingServer.start();
// worker registry
this.workerRegistry.registry();
try {
this.workerRegistry.registry();
this.workerRegistry.getZookeeperRegistryCenter().setStoppable(this);
Set<String> workerZkPaths = this.workerRegistry.getWorkerZkPaths();
this.workerRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(workerZkPaths, ZKNodeType.WORKER, Constants.DELETE_ZK_OP);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
// retry report task status
this.retryReportTaskStatusThread.start();
@ -117,7 +130,9 @@ public class WorkerServer {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
close("shutdownHook");
if (Stopper.isRunning()) {
close("shutdownHook");
}
}
}));
}
@ -126,7 +141,7 @@ public class WorkerServer {
try {
//execute only once
if(Stopper.isStopped()){
if (Stopper.isStopped()) {
return;
}
@ -138,7 +153,7 @@ public class WorkerServer {
try {
//thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L);
}catch (Exception e){
} catch (Exception e) {
logger.warn("thread sleep exception", e);
}
@ -147,8 +162,13 @@ public class WorkerServer {
} catch (Exception e) {
logger.error("worker server stop exception ", e);
} finally {
System.exit(-1);
}
}
@Override
public void stop(String cause) {
close(cause);
}
}

60
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -19,6 +19,17 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.SLASH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.curator.framework.state.ConnectionState;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.Executors;
@ -27,16 +38,6 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -46,7 +47,7 @@ import com.google.common.collect.Sets;
/**
* worker registry
* worker registry
*/
@Service
public class WorkerRegistry {
@ -54,13 +55,13 @@ public class WorkerRegistry {
private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class);
/**
* zookeeper registry center
* zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter zookeeperRegistryCenter;
/**
* worker config
* worker config
*/
@Autowired
private WorkerConfig workerConfig;
@ -79,14 +80,22 @@ public class WorkerRegistry {
private Set<String> workerGroups;
@PostConstruct
public void init(){
public void init() {
this.workerGroups = workerConfig.getWorkerGroups();
this.startTime = DateUtils.dateToString(new Date());
this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor"));
}
/**
* registry
* get zookeeper registry center
* @return ZookeeperRegistryCenter
*/
public ZookeeperRegistryCenter getZookeeperRegistryCenter() {
return zookeeperRegistryCenter;
}
/**
* registry
*/
public void registry() {
String address = OSUtils.getHost();
@ -94,20 +103,18 @@ public class WorkerRegistry {
int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval();
for (String workerZKPath : workerZkPaths) {
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, "");
zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener(
(client,newState) -> {
if (newState == ConnectionState.LOST) {
logger.error("worker : {} connection lost from zookeeper", address);
} else if (newState == ConnectionState.RECONNECTED) {
logger.info("worker : {} reconnected to zookeeper", address);
zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, "");
zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(workerZKPath, "");
} else if (newState == ConnectionState.SUSPENDED) {
logger.warn("worker : {} connection SUSPENDED ", address);
}
}
});
});
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
}
@ -115,6 +122,7 @@ public class WorkerRegistry {
this.workerConfig.getWorkerReservedMemory(),
this.workerConfig.getWorkerMaxCpuloadAvg(),
workerZkPaths,
Constants.WORKER_PREFIX,
this.zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
@ -122,22 +130,22 @@ public class WorkerRegistry {
}
/**
* remove registry info
* remove registry info
*/
public void unRegistry() {
String address = getLocalAddress();
Set<String> workerZkPaths = getWorkerZkPaths();
for (String workerZkPath : workerZkPaths) {
zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath);
zookeeperRegistryCenter.getRegisterOperator().remove(workerZkPath);
logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath);
}
this.heartBeatExecutor.shutdownNow();
}
/**
* get worker path
* get worker path
*/
private Set<String> getWorkerZkPaths() {
public Set<String> getWorkerZkPaths() {
Set<String> workerZkPaths = Sets.newHashSet();
String address = getLocalAddress();

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.MasterServer;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService;
@ -72,8 +73,7 @@ public class ZKMasterClient extends AbstractZKClient {
@Autowired
private MasterRegistry masterRegistry;
public void start() {
public void start(MasterServer masterServer) {
InterProcessMutex mutex = null;
try {
// create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master
@ -83,6 +83,9 @@ public class ZKMasterClient extends AbstractZKClient {
// Master registry
masterRegistry.registry();
masterRegistry.getZookeeperRegistryCenter().setStoppable(masterServer);
String registPath = this.masterRegistry.getMasterPath();
masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP);
// init system znode
this.initSystemZNode();

35
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -17,11 +17,20 @@
package org.apache.dolphinscheduler.server.master.consumer;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
@ -35,8 +44,17 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.apache.curator.CuratorZookeeperClient;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -47,17 +65,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class,
NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class,
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, MasterConfig.class})
ZookeeperNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class})
public class TaskPriorityQueueConsumerTest {
@ -503,8 +514,6 @@ public class TaskPriorityQueueConsumerTest {
TaskExecutionContext taskExecutionContext = taskPriorityQueueConsumer.getTaskExecutionContext(1);
Assert.assertNotNull(taskExecutionContext);
}

25
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java

@ -17,29 +17,32 @@
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.apache.curator.CuratorZookeeperClient;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
/**
* master registry test
*/
@RunWith(SpringRunner.class)
@ContextConfiguration(classes={SpringZKServer.class, MasterRegistry.class,ZookeeperRegistryCenter.class, MasterConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
@ContextConfiguration(classes = {SpringZKServer.class, MasterRegistry.class, ZookeeperRegistryCenter.class,
MasterConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class})
public class MasterRegistryTest {
@Autowired
@ -56,18 +59,20 @@ public class MasterRegistryTest {
masterRegistry.registry();
String masterPath = zookeeperRegistryCenter.getMasterPath();
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
String masterNodePath = masterPath + Constants.SLASH + (OSUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort()));
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath);
String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort());
String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath);
Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length);
masterRegistry.unRegistry();
}
@Test
public void testUnRegistry() throws InterruptedException {
masterRegistry.init();
masterRegistry.registry();
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
masterRegistry.unRegistry();
String masterPath = zookeeperRegistryCenter.getMasterPath();
List<String> childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(masterPath);
List<String> childrenKeys = zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(masterPath);
Assert.assertTrue(childrenKeys.isEmpty());
}
}

61
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenterTest.java

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
* zookeeper registry center test
*/
@RunWith(MockitoJUnitRunner.class)
public class ZookeeperRegistryCenterTest {
@InjectMocks
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@Mock
protected RegisterOperator registerOperator;
@Mock
private ZookeeperConfig zookeeperConfig;
private static final String DS_ROOT = "/dolphinscheduler";
@Test
public void testGetDeadZNodeParentPath() {
ZookeeperConfig zookeeperConfig = new ZookeeperConfig();
zookeeperConfig.setDsRoot(DS_ROOT);
Mockito.when(registerOperator.getZookeeperConfig()).thenReturn(zookeeperConfig);
String deadZNodeParentPath = zookeeperRegistryCenter.getDeadZNodeParentPath();
Assert.assertEquals(deadZNodeParentPath, DS_ROOT + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS);
}
}

78
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java

@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
@ -33,12 +33,18 @@ import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseSer
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.apache.curator.CuratorZookeeperClient;
import java.util.Date;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -47,17 +53,32 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.io.IOException;
import java.util.Date;
import io.netty.channel.Channel;
/**
* test task call back service
* todo refactor it in the form of mock
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes={TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, MasterRegistry.class, WorkerRegistry.class,
ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class,
ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskCallbackService.class,
TaskResponseService.class, TaskAckProcessor.class,TaskResponseProcessor.class})
@ContextConfiguration(classes = {
TaskCallbackServiceTestConfig.class,
SpringZKServer.class,
SpringApplicationContext.class,
MasterRegistry.class,
WorkerRegistry.class,
ZookeeperRegistryCenter.class,
MasterConfig.class,
WorkerConfig.class,
RegisterOperator.class,
ZookeeperConfig.class,
ZookeeperNodeManager.class,
TaskCallbackService.class,
TaskResponseService.class,
TaskAckProcessor.class,
TaskResponseProcessor.class,
TaskExecuteProcessor.class,
CuratorZookeeperClient.class,
TaskExecutionContextCacheManagerImpl.class})
public class TaskCallbackServiceTest {
@Autowired
@ -74,6 +95,7 @@ public class TaskCallbackServiceTest {
/**
* send ack test
*
* @throws Exception
*/
@Test
@ -101,6 +123,7 @@ public class TaskCallbackServiceTest {
/**
* send result test
*
* @throws Exception
*/
@Test
@ -140,7 +163,7 @@ public class TaskCallbackServiceTest {
@Test
public void testPause(){
Assert.assertEquals(5000, taskCallbackService.pause(3));;
Assert.assertEquals(5000, taskCallbackService.pause(3));
}
@Test
@ -171,41 +194,4 @@ public class TaskCallbackServiceTest {
nettyRemotingServer.close();
nettyRemotingClient.close();
}
// @Test(expected = IllegalStateException.class)
// public void testSendAckWithIllegalStateException2(){
// masterRegistry.registry();
// final NettyServerConfig serverConfig = new NettyServerConfig();
// serverConfig.setListenPort(30000);
// NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig);
// nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
// nettyRemotingServer.start();
//
// final NettyClientConfig clientConfig = new NettyClientConfig();
// NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig);
// Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000"));
// taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1));
// channel.close();
// TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
// ackCommand.setTaskInstanceId(1);
// ackCommand.setStartTime(new Date());
//
// nettyRemotingServer.close();
//
// taskCallbackService.sendAck(1, ackCommand.convert2Command());
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//
// Stopper.stop();
//
// try {
// Thread.sleep(5000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
}

45
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java

@ -19,18 +19,20 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.RegisterOperator;
import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.state.ConnectionStateListener;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -61,7 +63,7 @@ public class WorkerRegistryTest {
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@Mock
private ZookeeperCachedOperator zookeeperCachedOperator;
private RegisterOperator registerOperator;
@Mock
private CuratorFrameworkImpl zkClient;
@ -69,15 +71,21 @@ public class WorkerRegistryTest {
@Mock
private WorkerConfig workerConfig;
private static final Set<String> workerGroups;
static {
workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
}
@Before
public void before() {
Set<String> workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups);
Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker");
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator);
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient);
Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn(
Mockito.when(zookeeperRegistryCenter.getRegisterOperator()).thenReturn(registerOperator);
Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient()).thenReturn(zkClient);
Mockito.when(zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable()).thenReturn(
new Listenable<ConnectionStateListener>() {
@Override
public void addListener(ConnectionStateListener connectionStateListener) {
@ -114,7 +122,7 @@ public class WorkerRegistryTest {
int i = 0;
for (String workerGroup : workerConfig.getWorkerGroups()) {
String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (OSUtils.getAddr(workerConfig.getListenPort()));
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath);
String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(workerZkPath);
if (0 == i) {
Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/"));
} else {
@ -143,6 +151,7 @@ public class WorkerRegistryTest {
Assert.assertEquals(0, testWorkerGroupPathZkChildren.size());
Assert.assertEquals(0, defaultWorkerGroupPathZkChildren.size());
workerRegistry.unRegistry();
}
@Test
@ -155,7 +164,7 @@ public class WorkerRegistryTest {
for (String workerGroup : workerConfig.getWorkerGroups()) {
String workerGroupPath = workerPath + "/" + workerGroup.trim();
List<String> childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath);
List<String> childrenKeys = zookeeperRegistryCenter.getRegisterOperator().getChildrenKeys(workerGroupPath);
Assert.assertTrue(childrenKeys.isEmpty());
}
@ -167,4 +176,10 @@ public class WorkerRegistryTest {
workerRegistry.unRegistry();
}
@Test
public void testGetWorkerZkPaths() {
workerRegistry.init();
Assert.assertEquals(workerGroups.size(),workerRegistry.getWorkerZkPaths().size());
}
}

542
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java

@ -14,322 +14,256 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.*;
import static org.apache.dolphinscheduler.common.Constants.*;
/**
* abstract zookeeper client
*/
@Component
public abstract class AbstractZKClient extends ZookeeperCachedOperator {
private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
/**
* remove dead server by host
* @param host host
* @param serverType serverType
* @throws Exception
*/
public void removeDeadServerByHost(String host, String serverType) throws Exception {
List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
for(String serverPath : deadServers){
if(serverPath.startsWith(serverType+UNDERLINE+host)){
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
super.remove(server);
logger.info("{} server {} deleted from zk dead server path success" , serverType , host);
}
}
}
/**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
*
* @param zNode node path
* @param zkNodeType master or worker
* @param opType delete or add
* @throws Exception errors
*/
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
String host = getHostByEventDataPath(zNode);
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
//check server restart, if restart , dead server path in zk should be delete
if(opType.equals(DELETE_ZK_OP)){
removeDeadServerByHost(host, type);
}else if(opType.equals(ADD_ZK_OP)){
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if(!super.isExisted(deadServerPath)){
//add dead server info to zk dead server path : /dead-servers/
super.persist(deadServerPath,(type + UNDERLINE + host));
logger.info("{} server dead , and {} added to zk dead server path success" ,
zkNodeType.toString(), zNode);
}
}
}
/**
* get active master num
* @return active master number
*/
public int getActiveMasterNum(){
List<String> childrenList = new ArrayList<>();
try {
// read master node parent path from conf
if(super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))){
childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
}
} catch (Exception e) {
logger.error("getActiveMasterNum error",e);
}
return childrenList.size();
}
/**
*
* @return zookeeper quorum
*/
public String getZookeeperQuorum(){
return getZookeeperConfig().getServerList();
}
/**
* get server list.
* @param zkNodeType zookeeper node type
* @return server list
*/
public List<Server> getServersList(ZKNodeType zkNodeType){
Map<String, String> masterMap = getServerMaps(zkNodeType);
String parentPath = getZNodeParentPath(zkNodeType);
List<Server> masterServers = new ArrayList<>();
for (Map.Entry<String, String> entry : masterMap.entrySet()) {
Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
if(masterServer == null){
continue;
}
String key = entry.getKey();
masterServer.setZkDirectory(parentPath + "/"+ key);
//set host and port
String[] hostAndPort=key.split(COLON);
String[] hosts=hostAndPort[0].split(DIVISION_STRING);
// fetch the last one
masterServer.setHost(hosts[hosts.length-1]);
masterServer.setPort(Integer.parseInt(hostAndPort[1]));
masterServers.add(masterServer);
}
return masterServers;
}
/**
* get master server list map.
* @param zkNodeType zookeeper node type
* @return result : {host : resource info}
*/
public Map<String, String> getServerMaps(ZKNodeType zkNodeType){
Map<String, String> masterMap = new HashMap<>();
try {
String path = getZNodeParentPath(zkNodeType);
List<String> serverList = super.getChildrenKeys(path);
if(zkNodeType == ZKNodeType.WORKER){
List<String> workerList = new ArrayList<>();
for(String group : serverList){
List<String> groupServers = super.getChildrenKeys(path + Constants.SLASH + group);
for(String groupServer : groupServers){
workerList.add(group + Constants.SLASH + groupServer);
}
}
serverList = workerList;
}
for(String server : serverList){
masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server));
}
} catch (Exception e) {
logger.error("get server list failed", e);
}
return masterMap;
}
/**
* check the zookeeper node already exists
* @param host host
* @param zkNodeType zookeeper node type
* @return true if exists
*/
public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
String path = getZNodeParentPath(zkNodeType);
if(StringUtils.isEmpty(path)){
logger.error("check zk node exists error, host:{}, zk node type:{}",
host, zkNodeType.toString());
return false;
}
Map<String, String> serverMaps = getServerMaps(zkNodeType);
for(String hostKey : serverMaps.keySet()){
if(hostKey.contains(host)){
return true;
}
}
return false;
}
/**
*
* @return get worker node parent path
*/
protected String getWorkerZNodeParentPath(){
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
}
/**
*
* @return get master node parent path
*/
protected String getMasterZNodeParentPath(){
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
}
/**
*
* @return get master lock path
*/
public String getMasterLockPath(){
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
}
/**
*
* @param zkNodeType zookeeper node type
* @return get zookeeper node parent path
*/
public String getZNodeParentPath(ZKNodeType zkNodeType) {
String path = "";
switch (zkNodeType){
case MASTER:
return getMasterZNodeParentPath();
case WORKER:
return getWorkerZNodeParentPath();
case DEAD_SERVER:
return getDeadZNodeParentPath();
default:
break;
}
return path;
}
/**
*
* @return get dead server node parent path
*/
protected String getDeadZNodeParentPath(){
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
}
/**
*
* @return get master start up lock path
*/
public String getMasterStartUpLockPath(){
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
}
/**
*
* @return get master failover lock path
*/
public String getMasterFailoverLockPath(){
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
}
/**
*
* @return get worker failover lock path
*/
public String getWorkerFailoverLockPath(){
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
}
/**
* release mutex
* @param mutex mutex
*/
public void releaseMutex(InterProcessMutex mutex) {
if (mutex != null){
try {
mutex.release();
} catch (Exception e) {
if(e.getMessage().equals("instance must be started before calling this method")){
logger.warn("lock release");
}else{
logger.error("lock release failed",e);
}
}
}
}
/**
* init system znode
*/
protected void initSystemZNode(){
try {
persist(getMasterZNodeParentPath(), "");
persist(getWorkerZNodeParentPath(), "");
persist(getDeadZNodeParentPath(), "");
logger.info("initialize server nodes success.");
} catch (Exception e) {
logger.error("init system znode failed",e);
}
}
/**
* get host ip, string format: masterParentPath/ip
* @param path path
* @return host ip, string format: masterParentPath/ip
*/
protected String getHostByEventDataPath(String path) {
if(StringUtils.isEmpty(path)){
logger.error("empty path!");
return "";
}
String[] pathArray = path.split(SINGLE_SLASH);
if(pathArray.length < 1){
logger.error("parse ip error: {}", path);
return "";
}
return pathArray[pathArray.length - 1];
}
@Override
public String toString() {
return "AbstractZKClient{" +
"zkClient=" + zkClient +
", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\'' +
", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\'' +
", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\'' +
'}';
}
}
public abstract class AbstractZKClient extends RegisterOperator {
private static final Logger logger = LoggerFactory.getLogger(AbstractZKClient.class);
/**
* get active master num
*
* @return active master number
*/
public int getActiveMasterNum() {
List<String> childrenList = new ArrayList<>();
try {
// read master node parent path from conf
if (super.isExisted(getZNodeParentPath(ZKNodeType.MASTER))) {
childrenList = super.getChildrenKeys(getZNodeParentPath(ZKNodeType.MASTER));
}
} catch (Exception e) {
logger.error("getActiveMasterNum error", e);
}
return childrenList.size();
}
/**
* @return zookeeper quorum
*/
public String getZookeeperQuorum() {
return getZookeeperConfig().getServerList();
}
/**
* get server list.
*
* @param zkNodeType zookeeper node type
* @return server list
*/
public List<Server> getServersList(ZKNodeType zkNodeType) {
Map<String, String> masterMap = getServerMaps(zkNodeType);
String parentPath = getZNodeParentPath(zkNodeType);
List<Server> masterServers = new ArrayList<>();
for (Map.Entry<String, String> entry : masterMap.entrySet()) {
Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
if (masterServer == null) {
continue;
}
String key = entry.getKey();
masterServer.setZkDirectory(parentPath + "/" + key);
//set host and port
String[] hostAndPort = key.split(COLON);
String[] hosts = hostAndPort[0].split(DIVISION_STRING);
// fetch the last one
masterServer.setHost(hosts[hosts.length - 1]);
masterServer.setPort(Integer.parseInt(hostAndPort[1]));
masterServers.add(masterServer);
}
return masterServers;
}
/**
* get master server list map.
*
* @param zkNodeType zookeeper node type
* @return result : {host : resource info}
*/
public Map<String, String> getServerMaps(ZKNodeType zkNodeType) {
Map<String, String> masterMap = new HashMap<>();
try {
String path = getZNodeParentPath(zkNodeType);
List<String> serverList = super.getChildrenKeys(path);
if (zkNodeType == ZKNodeType.WORKER) {
List<String> workerList = new ArrayList<>();
for (String group : serverList) {
List<String> groupServers = super.getChildrenKeys(path + Constants.SLASH + group);
for (String groupServer : groupServers) {
workerList.add(group + Constants.SLASH + groupServer);
}
}
serverList = workerList;
}
for (String server : serverList) {
masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server));
}
} catch (Exception e) {
logger.error("get server list failed", e);
}
return masterMap;
}
/**
* check the zookeeper node already exists
*
* @param host host
* @param zkNodeType zookeeper node type
* @return true if exists
*/
public boolean checkZKNodeExists(String host, ZKNodeType zkNodeType) {
String path = getZNodeParentPath(zkNodeType);
if (StringUtils.isEmpty(path)) {
logger.error("check zk node exists error, host:{}, zk node type:{}",
host, zkNodeType);
return false;
}
Map<String, String> serverMaps = getServerMaps(zkNodeType);
for (String hostKey : serverMaps.keySet()) {
if (hostKey.contains(host)) {
return true;
}
}
return false;
}
/**
* @return get worker node parent path
*/
protected String getWorkerZNodeParentPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS;
}
/**
* @return get master node parent path
*/
protected String getMasterZNodeParentPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_MASTERS;
}
/**
* @return get master lock path
*/
public String getMasterLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_MASTERS;
}
/**
* @param zkNodeType zookeeper node type
* @return get zookeeper node parent path
*/
public String getZNodeParentPath(ZKNodeType zkNodeType) {
String path = "";
switch (zkNodeType) {
case MASTER:
return getMasterZNodeParentPath();
case WORKER:
return getWorkerZNodeParentPath();
case DEAD_SERVER:
return getDeadZNodeParentPath();
default:
break;
}
return path;
}
/**
* @return get master start up lock path
*/
public String getMasterStartUpLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS;
}
/**
* @return get master failover lock path
*/
public String getMasterFailoverLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS;
}
/**
* @return get worker failover lock path
*/
public String getWorkerFailoverLockPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS;
}
/**
* release mutex
*
* @param mutex mutex
*/
public void releaseMutex(InterProcessMutex mutex) {
if (mutex != null) {
try {
mutex.release();
} catch (Exception e) {
if ("instance must be started before calling this method".equals(e.getMessage())) {
logger.warn("lock release");
} else {
logger.error("lock release failed", e);
}
}
}
}
/**
* init system znode
*/
protected void initSystemZNode() {
try {
persist(getMasterZNodeParentPath(), "");
persist(getWorkerZNodeParentPath(), "");
persist(getDeadZNodeParentPath(), "");
logger.info("initialize server nodes success.");
} catch (Exception e) {
logger.error("init system znode failed", e);
}
}
@Override
public String toString() {
return "AbstractZKClient{"
+ "zkClient=" + getZkClient()
+ ", deadServerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.DEAD_SERVER) + '\''
+ ", masterZNodeParentPath='" + getZNodeParentPath(ZKNodeType.MASTER) + '\''
+ ", workerZNodeParentPath='" + getZNodeParentPath(ZKNodeType.WORKER) + '\''
+ '}';
}
}

155
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP;
import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import static org.apache.dolphinscheduler.common.Constants.UNDERLINE;
import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* register operator
*/
@Component
public class RegisterOperator extends ZookeeperCachedOperator {
private final Logger logger = LoggerFactory.getLogger(RegisterOperator.class);
/**
* @return get dead server node parent path
*/
protected String getDeadZNodeParentPath() {
return getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS;
}
/**
* remove dead server by host
*
* @param host host
* @param serverType serverType
* @throws Exception
*/
public void removeDeadServerByHost(String host, String serverType) throws Exception {
List<String> deadServers = super.getChildrenKeys(getDeadZNodeParentPath());
for (String serverPath : deadServers) {
if (serverPath.startsWith(serverType + UNDERLINE + host)) {
String server = getDeadZNodeParentPath() + SINGLE_SLASH + serverPath;
super.remove(server);
logger.info("{} server {} deleted from zk dead server path success", serverType, host);
}
}
}
/**
* get host ip, string format: masterParentPath/ip
*
* @param path path
* @return host ip, string format: masterParentPath/ip
*/
protected String getHostByEventDataPath(String path) {
if (StringUtils.isEmpty(path)) {
logger.error("empty path!");
return "";
}
String[] pathArray = path.split(SINGLE_SLASH);
if (pathArray.length < 1) {
logger.error("parse ip error: {}", path);
return "";
}
return pathArray[pathArray.length - 1];
}
/**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
*
* @param zNode node path
* @param zkNodeType master or worker
* @param opType delete or add
* @throws Exception errors
*/
public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception {
String host = getHostByEventDataPath(zNode);
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
//check server restart, if restart , dead server path in zk should be delete
if (opType.equals(DELETE_ZK_OP)) {
removeDeadServerByHost(host, type);
} else if (opType.equals(ADD_ZK_OP)) {
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if (!super.isExisted(deadServerPath)) {
//add dead server info to zk dead server path : /dead-servers/
super.persist(deadServerPath, (type + UNDERLINE + host));
logger.info("{} server dead , and {} added to zk dead server path success",
zkNodeType, zNode);
}
}
}
/**
* opType(add): if find dead server , then add to zk deadServerPath
* opType(delete): delete path from zk
*
* @param zNodeSet node path set
* @param zkNodeType master or worker
* @param opType delete or add
* @throws Exception errors
*/
public void handleDeadServer(Set<String> zNodeSet, ZKNodeType zkNodeType, String opType) throws Exception {
String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX;
for (String zNode : zNodeSet) {
String host = getHostByEventDataPath(zNode);
//check server restart, if restart , dead server path in zk should be delete
if (opType.equals(DELETE_ZK_OP)) {
removeDeadServerByHost(host, type);
} else if (opType.equals(ADD_ZK_OP)) {
String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + host;
if (!super.isExisted(deadServerPath)) {
//add dead server info to zk dead server path : /dead-servers/
super.persist(deadServerPath, (type + UNDERLINE + host));
logger.info("{} server dead , and {} added to zk dead server path success",
zkNodeType, zNode);
}
}
}
}
}

11
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java

@ -52,6 +52,9 @@ public class ZookeeperConfig {
@Value("${zookeeper.dolphinscheduler.root:/dolphinscheduler}")
private String dsRoot;
@Value("${zookeeper.max.wait.time:10000}")
private int maxWaitTime;
public String getServerList() {
return serverList;
}
@ -115,4 +118,12 @@ public class ZookeeperConfig {
public void setDsRoot(String dsRoot) {
this.dsRoot = dsRoot;
}
public int getMaxWaitTime() {
return maxWaitTime;
}
public void setMaxWaitTime(int maxWaitTime) {
this.maxWaitTime = maxWaitTime;
}
}

116
dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.service.zk;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ZKNodeType;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
* register operator test
*/
@RunWith(MockitoJUnitRunner.Silent.class)
public class RegisterOperatorTest {
private static ZKServer zkServer;
@InjectMocks
private RegisterOperator registerOperator;
@Mock
private ZookeeperConfig zookeeperConfig;
private static final String DS_ROOT = "/dolphinscheduler";
private static final String MASTER_NODE = "127.0.0.1:5678";
@Before
public void before() {
new Thread(() -> {
if (zkServer == null) {
zkServer = new ZKServer();
}
zkServer.startLocalZkServer(2185);
}).start();
}
@Test
public void testAfterPropertiesSet() throws Exception {
TimeUnit.SECONDS.sleep(10);
Mockito.when(zookeeperConfig.getServerList()).thenReturn("127.0.0.1:2185");
Mockito.when(zookeeperConfig.getBaseSleepTimeMs()).thenReturn(100);
Mockito.when(zookeeperConfig.getMaxRetries()).thenReturn(10);
Mockito.when(zookeeperConfig.getMaxSleepMs()).thenReturn(30000);
Mockito.when(zookeeperConfig.getSessionTimeoutMs()).thenReturn(60000);
Mockito.when(zookeeperConfig.getConnectionTimeoutMs()).thenReturn(30000);
Mockito.when(zookeeperConfig.getDigest()).thenReturn("");
Mockito.when(zookeeperConfig.getDsRoot()).thenReturn(DS_ROOT);
Mockito.when(zookeeperConfig.getMaxWaitTime()).thenReturn(30000);
registerOperator.afterPropertiesSet();
Assert.assertNotNull(registerOperator.getZkClient());
}
@After
public void after() {
if (zkServer != null) {
zkServer.stop();
}
}
@Test
public void testGetDeadZNodeParentPath() throws Exception {
testAfterPropertiesSet();
String path = registerOperator.getDeadZNodeParentPath();
Assert.assertEquals(DS_ROOT + Constants.ZOOKEEPER_DOLPHINSCHEDULER_DEAD_SERVERS, path);
}
@Test
public void testHandleDeadServer() throws Exception {
testAfterPropertiesSet();
registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP);
String path = registerOperator.getDeadZNodeParentPath();
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
}
@Test
public void testRemoveDeadServerByHost() throws Exception {
testAfterPropertiesSet();
String path = registerOperator.getDeadZNodeParentPath();
registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP);
Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_PREFIX);
Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE)));
}
}

3
pom.xml

@ -817,6 +817,7 @@
<include>**/server/master/MasterExecThreadTest.java</include> -->
<include>**/server/master/ParamsTest.java</include>
<include>**/server/register/ZookeeperNodeManagerTest.java</include>
<include>**/server/register/ZookeeperRegistryCenterTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include>
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
@ -838,6 +839,8 @@
<include>**/service/quartz/cron/CronUtilsTest.java</include>
<include>**/service/zk/DefaultEnsembleProviderTest.java</include>
<include>**/service/zk/ZKServerTest.java</include>
<include>**/service/zk/CuratorZookeeperClientTest.java</include>
<include>**/service/zk/RegisterOperatorTest.java</include>
<include>**/service/queue/TaskUpdateQueueTest.java</include>
<include>**/service/queue/TaskPriorityTest.java</include>
<include>**/dao/mapper/DataSourceUserMapperTest.java</include>

Loading…
Cancel
Save