diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index 7829347551..7c33b9026f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -22,14 +22,15 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.ThreadPoolExecutors; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; +import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -45,8 +46,6 @@ import org.springframework.context.annotation.ComponentScan; import javax.annotation.PostConstruct; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; /** * master server @@ -65,11 +64,6 @@ public class MasterServer implements IStoppable { @Autowired private ZKMasterClient zkMasterClient = null; - /** - * heartbeat thread pool - */ - private ScheduledExecutorService heartbeatMasterService; - /** * process service */ @@ -87,6 +81,11 @@ public class MasterServer implements IStoppable { @Autowired private MasterConfig masterConfig; + /** + * zookeeper registry center + */ + @Autowired + private ZookeeperRegistryCenter zookeeperRegistryCenter; /** * spring application context @@ -95,8 +94,15 @@ public class MasterServer implements IStoppable { @Autowired private SpringApplicationContext springApplicationContext; + /** + * netty remote server + */ private NettyRemotingServer nettyRemotingServer; + /** + * master registry + */ + private MasterRegistry masterRegistry; /** * master server startup @@ -115,7 +121,6 @@ public class MasterServer implements IStoppable { @PostConstruct public void run(){ - // //init remoting server NettyServerConfig serverConfig = new NettyServerConfig(); serverConfig.setListenPort(45678); @@ -124,23 +129,17 @@ public class MasterServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor()); this.nettyRemotingServer.start(); + // + this.masterRegistry = new MasterRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), masterConfig.getMasterHeartbeatInterval()); + this.masterRegistry.registry(); + // zkMasterClient.init(); masterSchedulerService = ThreadUtils.newDaemonSingleThreadExecutor("Master-Scheduler-Thread"); - heartbeatMasterService = ThreadUtils.newDaemonThreadScheduledExecutor("Master-Main-Thread",Constants.DEFAULT_MASTER_HEARTBEAT_THREAD_NUM); - - // heartbeat thread implement - Runnable heartBeatThread = heartBeatThread(); - zkMasterClient.setStoppable(this); - // regular heartbeat - // delay 5 seconds, send heartbeat every 30 seconds - heartbeatMasterService. - scheduleAtFixedRate(heartBeatThread, 5, masterConfig.getMasterHeartbeatInterval(), TimeUnit.SECONDS); - // master scheduler thread MasterSchedulerThread masterSchedulerThread = new MasterSchedulerThread( zkMasterClient, @@ -206,13 +205,8 @@ public class MasterServer implements IStoppable { }catch (Exception e){ logger.warn("thread sleep exception ", e); } - try { - heartbeatMasterService.shutdownNow(); - }catch (Exception e){ - logger.warn("heartbeat service stopped exception"); - } - - logger.info("heartbeat service stopped"); + this.nettyRemotingServer.close(); + this.masterRegistry.unRegistry(); //close quartz try{ @@ -247,35 +241,10 @@ public class MasterServer implements IStoppable { logger.info("zookeeper service stopped"); - } catch (Exception e) { logger.error("master server stop exception ", e); System.exit(-1); } } - - - /** - * heartbeat thread implement - * @return - */ - private Runnable heartBeatThread(){ - logger.info("start master heart beat thread..."); - Runnable heartBeatThread = new Runnable() { - @Override - public void run() { - if(Stopper.isRunning()) { - // send heartbeat to zk - if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) { - logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server"); - return; - } - - zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX); - } - } - }; - return heartBeatThread; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index a9c111d0c9..ebfb2f4dc0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -19,11 +19,21 @@ package org.apache.dolphinscheduler.server.master.registry; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.remote.utils.Constants; +import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Date; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; + /** * master registry */ @@ -41,14 +51,32 @@ public class MasterRegistry { */ private final int port; + /** + * heartbeat interval + */ + private final long heartBeatInterval; + + /** + * heartbeat executor + */ + private final ScheduledExecutorService heartBeatExecutor; + + /** + * worker start time + */ + private final String startTime; + /** * construct * @param zookeeperRegistryCenter zookeeperRegistryCenter * @param port port */ - public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port){ + public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){ this.zookeeperRegistryCenter = zookeeperRegistryCenter; this.port = port; + this.heartBeatInterval = heartBeatInterval; + this.startTime = DateUtils.dateToString(new Date()); + this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } /** @@ -56,8 +84,8 @@ public class MasterRegistry { */ public void registry() { String address = Constants.LOCAL_ADDRESS; - String localNodePath = getWorkerPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, ""); + String localNodePath = getMasterPath(); + zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { @@ -65,13 +93,14 @@ public class MasterRegistry { logger.error("master : {} connection lost from zookeeper", address); } else if(newState == ConnectionState.RECONNECTED){ logger.info("master : {} reconnected to zookeeper", address); - zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, ""); + zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); } else if(newState == ConnectionState.SUSPENDED){ logger.warn("master : {} connection SUSPENDED ", address); } } }); - logger.info("master node : {} registry to ZK successfully.", address); + this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), heartBeatInterval, heartBeatInterval, TimeUnit.SECONDS); + logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, heartBeatInterval); } /** @@ -79,18 +108,18 @@ public class MasterRegistry { */ public void unRegistry() { String address = getLocalAddress(); - String localNodePath = getWorkerPath(); + String localNodePath = getMasterPath(); zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); - logger.info("worker node : {} unRegistry to ZK.", address); + logger.info("master node : {} unRegistry to ZK.", address); } /** - * get worker path + * get master path * @return */ - private String getWorkerPath() { + private String getMasterPath() { String address = getLocalAddress(); - String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address; + String localNodePath = this.zookeeperRegistryCenter.getMasterPath() + "/" + address; return localNodePath; } @@ -101,4 +130,26 @@ public class MasterRegistry { private String getLocalAddress(){ return Constants.LOCAL_ADDRESS + ":" + port; } + + /** + * hear beat task + */ + class HeartBeatTask implements Runnable{ + + @Override + public void run() { + try { + StringBuilder builder = new StringBuilder(100); + builder.append(OSUtils.cpuUsage()).append(COMMA); + builder.append(OSUtils.memoryUsage()).append(COMMA); + builder.append(OSUtils.loadAverage()).append(COMMA); + builder.append(startTime).append(COMMA); + builder.append(DateUtils.dateToString(new Date())); + String masterPath = getMasterPath(); + zookeeperRegistryCenter.getZookeeperCachedOperator().update(masterPath, builder.toString()); + } catch (Throwable ex){ + logger.error("error write master heartbeat info", ex); + } + } + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index f53f187437..0cb905971d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -150,7 +150,6 @@ public class WorkerServer implements IStoppable { */ public static void main(String[] args) { Thread.currentThread().setName(Constants.THREAD_NAME_WORKER_SERVER); - System.setProperty("spring.profiles.active","worker"); new SpringApplicationBuilder(WorkerServer.class).web(WebApplicationType.NONE).run(args); } @@ -169,7 +168,7 @@ public class WorkerServer implements IStoppable { this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor()); this.nettyRemotingServer.start(); - this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval()); + this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup()); this.workerRegistry.registry(); this.zkWorkerClient.init(); @@ -188,22 +187,12 @@ public class WorkerServer implements IStoppable { // submit kill process thread killExecutorService.execute(killProcessThread); - // new fetch task thread -// FetchTaskThread fetchTaskThread = new FetchTaskThread(zkWorkerClient, processService, taskQueue); -// -// // submit fetch task thread -// fetchTaskExecutorService.execute(fetchTaskThread); - /** * register hooks, which are called before the process exits */ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - // worker server exit alert - if (zkWorkerClient.getActiveMasterNum() <= 1) { - alertDao.sendServerStopedAlert(1, OSUtils.getHost(), "Worker-Server"); - } stop("shutdownhook"); } })); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index c4d4b61af5..747b34faf9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -34,9 +34,20 @@ public class WorkerConfig { @Value("${worker.max.cpuload.avg}") private int workerMaxCpuloadAvg; - @Value("${master.reserved.memory}") + @Value("${worker.reserved.memory}") private double workerReservedMemory; + @Value("${worker.group: DEFAULT}") + private String workerGroup; + + public String getWorkerGroup() { + return workerGroup; + } + + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; + } + public int getWorkerExecThreads() { return workerExecThreads; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index b6f6896d66..6876f05795 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -21,6 +21,7 @@ import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; @@ -33,6 +34,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; +import static org.apache.dolphinscheduler.remote.utils.Constants.SLASH; /** @@ -42,6 +44,8 @@ public class WorkerRegistry { private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); + private static final String DEFAULT_GROUP = "DEFAULT"; + /** * zookeeper registry center */ @@ -67,15 +71,30 @@ public class WorkerRegistry { */ private final String startTime; + /** + * worker group + */ + private final String workerGroup; + /** * construct * @param zookeeperRegistryCenter zookeeperRegistryCenter * @param port port */ public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){ + this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_GROUP); + } + + /** + * construct + * @param zookeeperRegistryCenter zookeeperRegistryCenter + * @param port port + */ + public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval, String workerGroup){ this.zookeeperRegistryCenter = zookeeperRegistryCenter; this.port = port; this.heartBeatInterval = heartBeatInterval; + this.workerGroup = workerGroup; this.startTime = DateUtils.dateToString(new Date()); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } @@ -86,7 +105,7 @@ public class WorkerRegistry { public void registry() { String address = Constants.LOCAL_ADDRESS; String localNodePath = getWorkerPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, ""); + zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { @@ -94,7 +113,7 @@ public class WorkerRegistry { logger.error("worker : {} connection lost from zookeeper", address); } else if(newState == ConnectionState.RECONNECTED){ logger.info("worker : {} reconnected to zookeeper", address); - zookeeperRegistryCenter.getZookeeperCachedOperator().persist(localNodePath, ""); + zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); } else if(newState == ConnectionState.SUSPENDED){ logger.warn("worker : {} connection SUSPENDED ", address); } @@ -122,8 +141,14 @@ public class WorkerRegistry { */ private String getWorkerPath() { String address = getLocalAddress(); - String localNodePath = this.zookeeperRegistryCenter.getWorkerPath() + "/" + address; - return localNodePath; + StringBuilder builder = new StringBuilder(100); + String workerPath = this.zookeeperRegistryCenter.getWorkerPath(); + builder.append(workerPath).append(SLASH); + if(StringUtils.isNotEmpty(workerGroup) && !DEFAULT_GROUP.equalsIgnoreCase(workerGroup)){ + builder.append(workerGroup.trim()).append(SLASH); + } + builder.append(address); + return builder.toString(); } /** @@ -149,7 +174,7 @@ public class WorkerRegistry { builder.append(startTime).append(COMMA); builder.append(DateUtils.dateToString(new Date())); String workerPath = getWorkerPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().persist(workerPath, builder.toString()); + zookeeperRegistryCenter.getZookeeperCachedOperator().update(workerPath, builder.toString()); } catch (Throwable ex){ logger.error("error write worker heartbeat info", ex); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index fe4ec9130a..98e350b59d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -100,9 +100,6 @@ public class ZKMasterClient extends AbstractZKClient { // init system znode this.initSystemZNode(); - // register master - this.registerMaster(); - // check if fault tolerance is required,failure and tolerance if (getActiveMasterNum() == 1) { failoverWorker(null, true); @@ -132,25 +129,6 @@ public class ZKMasterClient extends AbstractZKClient { return alertDao; } - - - - /** - * register master znode - */ - public void registerMaster(){ - try { - String serverPath = registerServer(ZKNodeType.MASTER); - if(StringUtils.isEmpty(serverPath)){ - System.exit(-1); - } - masterZNode = serverPath; - } catch (Exception e) { - logger.error("register master failure ",e); - System.exit(-1); - } - } - /** * handle path events that this class cares about * @param client zkClient diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java index 7ddee3b2a1..33990bc59f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKWorkerClient.java @@ -55,24 +55,6 @@ public class ZKWorkerClient extends AbstractZKClient { // init system znode this.initSystemZNode(); - // register worker - this.registWorker(); - } - - /** - * register worker - */ - private void registWorker(){ - try { - String serverPath = registerServer(ZKNodeType.WORKER); - if(StringUtils.isEmpty(serverPath)){ - System.exit(-1); - } - workerZNode = serverPath; - } catch (Exception e) { - logger.error("register worker failure",e); - System.exit(-1); - } } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index 135bfdabc6..6e887f80d7 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -46,40 +46,6 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator { */ protected IStoppable stoppable = null; - /** - * heartbeat for zookeeper - * @param znode zookeeper node - * @param serverType server type - */ - public void heartBeatForZk(String znode, String serverType){ - try { - - //check dead or not in zookeeper - if(zkClient.getState() == CuratorFrameworkState.STOPPED || checkIsDeadServer(znode, serverType)){ - stoppable.stop("i was judged to death, release resources and stop myself"); - return; - } - - String resInfoStr = super.get(znode); - String[] splits = resInfoStr.split(Constants.COMMA); - if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ - return; - } - String str = splits[0] + Constants.COMMA - + splits[1] + Constants.COMMA - + OSUtils.cpuUsage() + Constants.COMMA - + OSUtils.memoryUsage() + Constants.COMMA - + OSUtils.loadAverage() + Constants.COMMA - + splits[5] + Constants.COMMA - + DateUtils.dateToString(new Date()); - zkClient.setData().forPath(znode,str.getBytes()); - - } catch (Exception e) { - logger.error("heartbeat for zk failed", e); - stoppable.stop("heartbeat for zk exception, release resources and stop myself"); - } - } - /** * check dead server or not , if dead, stop self *