diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index 040ea5a43f..01218e5d8b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -14,8 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.registry; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.registry.HeartBeatTask; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; + import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -23,15 +35,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; -import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; -import org.apache.dolphinscheduler.server.master.config.MasterConfig; -import org.apache.dolphinscheduler.server.registry.HeartBeatTask; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -40,7 +43,7 @@ import org.springframework.stereotype.Service; import com.google.common.collect.Sets; /** - * master registry + * master registry */ @Service public class MasterRegistry { @@ -48,7 +51,7 @@ public class MasterRegistry { private final Logger logger = LoggerFactory.getLogger(MasterRegistry.class); /** - * zookeeper registry center + * zookeeper registry center */ @Autowired private ZookeeperRegistryCenter zookeeperRegistryCenter; @@ -65,19 +68,18 @@ public class MasterRegistry { private ScheduledExecutorService heartBeatExecutor; /** - * worker start time + * master start time */ private String startTime; - @PostConstruct - public void init(){ + public void init() { this.startTime = DateUtils.dateToString(new Date()); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } /** - * registry + * registry */ public void registry() { String address = NetUtils.getHost(); @@ -86,12 +88,12 @@ public class MasterRegistry { zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { - if(newState == ConnectionState.LOST){ + if (newState == ConnectionState.LOST) { logger.error("master : {} connection lost from zookeeper", address); - } else if(newState == ConnectionState.RECONNECTED){ + } else if (newState == ConnectionState.RECONNECTED) { logger.info("master : {} reconnected to zookeeper", address); zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); - } else if(newState == ConnectionState.SUSPENDED){ + } else if (newState == ConnectionState.SUSPENDED) { logger.warn("master : {} connection SUSPENDED ", address); } } @@ -103,36 +105,35 @@ public class MasterRegistry { Sets.newHashSet(getMasterPath()), zookeeperRegistryCenter); - this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); - logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); + this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, 0, masterHeartbeatInterval, TimeUnit.SECONDS); + logger.info("master node : {} registry to ZK path {} successfully with heartBeatInterval : {}s" + , address, localNodePath, masterHeartbeatInterval); } /** - * remove registry info + * remove registry info */ public void unRegistry() { String address = getLocalAddress(); String localNodePath = getMasterPath(); heartBeatExecutor.shutdownNow(); zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); - logger.info("master node : {} unRegistry to ZK.", address); + logger.info("master node : {} unRegistry from ZK path {}." + , address, localNodePath); } /** - * get master path - * @return + * get master path */ private String getMasterPath() { String address = getLocalAddress(); - String localNodePath = this.zookeeperRegistryCenter.getMasterPath() + "/" + address; - return localNodePath; + return this.zookeeperRegistryCenter.getMasterPath() + "/" + address; } /** - * get local address - * @return + * get local address */ - private String getLocalAddress(){ + private String getLocalAddress() { return NetUtils.getHost() + ":" + masterConfig.getListenPort(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java index ea822cbfb1..7763e07314 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.registry; +import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; + import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; @@ -24,6 +26,10 @@ import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -31,10 +37,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; /** * master registry test */ @@ -65,6 +67,7 @@ public class MasterRegistryTest { @Test public void testUnRegistry() throws InterruptedException { + masterRegistry.init(); masterRegistry.registry(); TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node masterRegistry.unRegistry(); diff --git a/pom.xml b/pom.xml index 56ddc3fbc6..0c20bd873f 100644 --- a/pom.xml +++ b/pom.xml @@ -820,7 +820,7 @@ **/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java **/server/master/dispatch/host/assign/RandomSelectorTest.java **/server/master/dispatch/host/assign/RoundRobinSelectorTest.java - + **/server/master/register/MasterRegistryTest.java **/server/master/AlertManagerTest.java **/server/master/MasterCommandTest.java **/server/master/DependentTaskTest.java