From b4af3fd1764756fde4c5395639d7816d03e77bff Mon Sep 17 00:00:00 2001 From: Yichao Yang <1048262223@qq.com> Date: Mon, 13 Jul 2020 17:25:13 +0800 Subject: [PATCH] [Feature-2815][server] One worker can belong to different workergroups (#2934) * [Feature-2815][server] One worker can belong to different workergroups * Feature: Add test cases * Update MonitorService.java * Update MonitorService.java Co-authored-by: dailidong --- .../api/service/MonitorService.java | 54 +++++-- .../common/model/WorkerServerModel.java | 122 +++++++++++++++ .../master/registry/MasterRegistry.java | 15 +- .../server/registry/HeartBeatTask.java | 33 ++-- .../server/worker/config/WorkerConfig.java | 14 +- .../worker/registry/WorkerRegistry.java | 115 ++++++++------ .../src/main/resources/worker.properties | 2 +- .../worker/registry/WorkerRegistryTest.java | 148 ++++++++++++++---- .../servers/_source/zookeeperDirectories.vue | 112 +++++++++++++ .../pages/monitor/pages/servers/worker.vue | 27 +++- .../src/js/module/i18n/locale/en_US.js | 4 +- .../src/js/module/i18n/locale/zh_CN.js | 4 +- pom.xml | 2 +- 13 files changed, 525 insertions(+), 127 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java create mode 100644 dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java index 3370961fd4..55c4fa113b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java @@ -16,29 +16,33 @@ */ package org.apache.dolphinscheduler.api.service; +import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; -import org.apache.dolphinscheduler.dao.MonitorDBDao; import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.model.WorkerServerModel; +import org.apache.dolphinscheduler.dao.MonitorDBDao; import org.apache.dolphinscheduler.dao.entity.MonitorRecord; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.dolphinscheduler.common.utils.Preconditions.*; +import com.google.common.collect.Sets; /** * monitor service */ @Service -public class MonitorService extends BaseService{ +public class MonitorService extends BaseService { @Autowired private ZookeeperMonitor zookeeperMonitor; @@ -108,15 +112,41 @@ public class MonitorService extends BaseService{ public Map queryWorker(User loginUser) { Map result = new HashMap<>(5); - List masterServers = getServerListFromZK(false); - - result.put(Constants.DATA_LIST, masterServers); + List workerServers = getServerListFromZK(false) + .stream() + .map((Server server) -> { + WorkerServerModel model = new WorkerServerModel(); + model.setId(server.getId()); + model.setHost(server.getHost()); + model.setPort(server.getPort()); + model.setZkDirectories(Sets.newHashSet(server.getZkDirectory())); + model.setResInfo(server.getResInfo()); + model.setCreateTime(server.getCreateTime()); + model.setLastHeartbeatTime(server.getLastHeartbeatTime()); + return model; + }) + .collect(Collectors.toList()); + + Map workerHostPortServerMapping = workerServers + .stream() + .collect(Collectors.toMap( + (WorkerServerModel worker) -> { + String[] s = worker.getZkDirectories().iterator().next().split("/"); + return s[s.length - 1]; + } + , Function.identity() + , (WorkerServerModel oldOne, WorkerServerModel newOne) -> { + oldOne.getZkDirectories().addAll(newOne.getZkDirectories()); + return oldOne; + })); + + result.put(Constants.DATA_LIST, workerHostPortServerMapping.values()); putMsg(result,Status.SUCCESS); return result; } - public List getServerListFromZK(boolean isMaster){ + public List getServerListFromZK(boolean isMaster) { checkNotNull(zookeeperMonitor); ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java new file mode 100644 index 0000000000..984124b872 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerServerModel.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.model; + + +import java.util.Date; +import java.util.Set; + +import com.fasterxml.jackson.annotation.JsonFormat; + +/** + * server + */ +public class WorkerServerModel { + + /** + * id + */ + private int id; + + /** + * host + */ + private String host; + + /** + * port + */ + private int port; + + /** + * worker directories in zookeeper + */ + private Set zkDirectories; + + /** + * resource info about CPU and memory + */ + private String resInfo; + + /** + * create time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date createTime; + + /** + * last heart beat time + */ + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") + private Date lastHeartbeatTime; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public Set getZkDirectories() { + return zkDirectories; + } + + public void setZkDirectories(Set zkDirectories) { + this.zkDirectories = zkDirectories; + } + + public Date getLastHeartbeatTime() { + return lastHeartbeatTime; + } + + public void setLastHeartbeatTime(Date lastHeartbeatTime) { + this.lastHeartbeatTime = lastHeartbeatTime; + } + + public String getResInfo() { + return resInfo; + } + + public void setResInfo(String resInfo) { + this.resInfo = resInfo; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index 04fb79f807..040ea5a43f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -16,6 +16,13 @@ */ package org.apache.dolphinscheduler.server.master.registry; +import java.util.Date; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; @@ -30,11 +37,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; -import java.util.Date; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; /** * master registry @@ -97,7 +100,7 @@ public class MasterRegistry { HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, masterConfig.getMasterReservedMemory(), masterConfig.getMasterMaxCpuloadAvg(), - getMasterPath(), + Sets.newHashSet(getMasterPath()), zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index 2345ce9533..bd8c79cce9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -17,50 +17,50 @@ package org.apache.dolphinscheduler.server.registry; +import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; + +import java.util.Date; +import java.util.Set; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Date; - -import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; - -public class HeartBeatTask extends Thread{ +public class HeartBeatTask extends Thread { private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); private String startTime; private double reservedMemory; private double maxCpuloadAvg; - private String heartBeatPath; + private Set heartBeatPaths; private ZookeeperRegistryCenter zookeeperRegistryCenter; public HeartBeatTask(String startTime, double reservedMemory, double maxCpuloadAvg, - String heartBeatPath, - ZookeeperRegistryCenter zookeeperRegistryCenter){ + Set heartBeatPaths, + ZookeeperRegistryCenter zookeeperRegistryCenter) { this.startTime = startTime; this.reservedMemory = reservedMemory; this.maxCpuloadAvg = maxCpuloadAvg; - this.heartBeatPath = heartBeatPath; + this.heartBeatPaths = heartBeatPaths; this.zookeeperRegistryCenter = zookeeperRegistryCenter; } @Override public void run() { try { - double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); double loadAverage = OSUtils.loadAverage(); int status = Constants.NORAML_NODE_STATUS; - if(availablePhysicalMemorySize < reservedMemory - || loadAverage > maxCpuloadAvg){ - logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize , loadAverage); + if (availablePhysicalMemorySize < reservedMemory + || loadAverage > maxCpuloadAvg) { + logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize, loadAverage); status = Constants.ABNORMAL_NODE_STATUS; } @@ -76,8 +76,11 @@ public class HeartBeatTask extends Thread{ builder.append(status).append(COMMA); //save process id builder.append(OSUtils.getProcessID()); - zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString()); - } catch (Throwable ex){ + + for (String heartBeatPath : heartBeatPaths) { + zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString()); + } + } catch (Throwable ex) { logger.error("error write heartbeat info", ex); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 1a31fa09fe..2dedaf8e1b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -17,6 +17,8 @@ */ package org.apache.dolphinscheduler.server.worker.config; +import java.util.Set; + import org.apache.dolphinscheduler.common.Constants; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.PropertySource; @@ -41,8 +43,8 @@ public class WorkerConfig { @Value("${worker.reserved.memory:0.3}") private double workerReservedMemory; - @Value("${worker.group: default}") - private String workerGroup; + @Value("#{'${worker.groups:default}'.split(',')}") + private Set workerGroups; @Value("${worker.listen.port: 1234}") private int listenPort; @@ -55,12 +57,12 @@ public class WorkerConfig { this.listenPort = listenPort; } - public String getWorkerGroup() { - return workerGroup; + public Set getWorkerGroups() { + return workerGroups; } - public void setWorkerGroup(String workerGroup) { - this.workerGroup = workerGroup; + public void setWorkerGroups(Set workerGroups) { + this.workerGroups = workerGroups; } public int getWorkerExecThreads() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index e1349ea9fe..5e400e1e1f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -16,10 +16,20 @@ */ package org.apache.dolphinscheduler.server.worker.registry; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.SLASH; + +import java.util.Date; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.framework.state.ConnectionStateListener; -import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -32,15 +42,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; -import java.util.Date; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import static org.apache.dolphinscheduler.common.Constants.COMMA; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.common.Constants.SLASH; +import com.google.common.collect.Sets; /** @@ -74,11 +76,11 @@ public class WorkerRegistry { private String startTime; - private String workerGroup; + private Set workerGroups; @PostConstruct - public void init(){ - this.workerGroup = workerConfig.getWorkerGroup(); + public void init() { + this.workerGroups = workerConfig.getWorkerGroups(); this.startTime = DateUtils.dateToString(new Date()); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } @@ -88,31 +90,35 @@ public class WorkerRegistry { */ public void registry() { String address = NetUtils.getHost(); - String localNodePath = getWorkerPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); - zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - if(newState == ConnectionState.LOST){ - logger.error("worker : {} connection lost from zookeeper", address); - } else if(newState == ConnectionState.RECONNECTED){ - logger.info("worker : {} reconnected to zookeeper", address); - zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(localNodePath, ""); - } else if(newState == ConnectionState.SUSPENDED){ - logger.warn("worker : {} connection SUSPENDED ", address); - } - } - }); + Set workerZkPaths = getWorkerZkPaths(); int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); - HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, - workerConfig.getWorkerReservedMemory(), - workerConfig.getWorkerMaxCpuloadAvg(), - getWorkerPath(), - zookeeperRegistryCenter); - this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); - logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, workerHeartbeatInterval); + for (String workerZKPath : workerZkPaths) { + zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); + zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable().addListener(new ConnectionStateListener() { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.LOST) { + logger.error("worker : {} connection lost from zookeeper", address); + } else if (newState == ConnectionState.RECONNECTED) { + logger.info("worker : {} reconnected to zookeeper", address); + zookeeperRegistryCenter.getZookeeperCachedOperator().persistEphemeral(workerZKPath, ""); + } else if (newState == ConnectionState.SUSPENDED) { + logger.warn("worker : {} connection SUSPENDED ", address); + } + } + }); + logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); + } + HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime, + this.workerConfig.getWorkerReservedMemory(), + this.workerConfig.getWorkerMaxCpuloadAvg(), + workerZkPaths, + this.zookeeperRegistryCenter); + + this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); + logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); } /** @@ -120,36 +126,41 @@ public class WorkerRegistry { */ public void unRegistry() { String address = getLocalAddress(); - String localNodePath = getWorkerPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().remove(localNodePath); + Set workerZkPaths = getWorkerZkPaths(); + for (String workerZkPath : workerZkPaths) { + zookeeperRegistryCenter.getZookeeperCachedOperator().remove(workerZkPath); + logger.info("worker node : {} unRegistry from ZK {}.", address, workerZkPath); + } this.heartBeatExecutor.shutdownNow(); - logger.info("worker node : {} unRegistry to ZK.", address); } /** * get worker path - * @return */ - private String getWorkerPath() { + private Set getWorkerZkPaths() { + Set workerZkPaths = Sets.newHashSet(); + String address = getLocalAddress(); - StringBuilder builder = new StringBuilder(100); - String workerPath = this.zookeeperRegistryCenter.getWorkerPath(); - builder.append(workerPath).append(SLASH); - if(StringUtils.isEmpty(workerGroup)){ - workerGroup = DEFAULT_WORKER_GROUP; + String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); + + for (String workGroup : this.workerGroups) { + StringBuilder workerZkPathBuilder = new StringBuilder(100); + workerZkPathBuilder.append(workerZkPathPrefix).append(SLASH); + if (StringUtils.isEmpty(workGroup)) { + workGroup = DEFAULT_WORKER_GROUP; + } + // trim and lower case is need + workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH); + workerZkPathBuilder.append(address); + workerZkPaths.add(workerZkPathBuilder.toString()); } - //trim and lower case is need - builder.append(workerGroup.trim().toLowerCase()).append(SLASH); - builder.append(address); - return builder.toString(); + return workerZkPaths; } /** * get local address - * @return */ - private String getLocalAddress(){ + private String getLocalAddress() { return NetUtils.getHost() + ":" + workerConfig.getListenPort(); } - } diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties index eb01bbb3ab..0365c8a9c9 100644 --- a/dolphinscheduler-server/src/main/resources/worker.properties +++ b/dolphinscheduler-server/src/main/resources/worker.properties @@ -31,4 +31,4 @@ #worker.listen.port: 1234 # default worker group -worker.group=default \ No newline at end of file +#worker.groups=default diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java index b34ba8bee9..7fc9d2bf79 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java @@ -17,62 +17,154 @@ package org.apache.dolphinscheduler.server.worker.registry; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executor; + +import org.apache.curator.framework.imps.CuratorFrameworkImpl; +import org.apache.curator.framework.listen.Listenable; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; -import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringRunner; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.concurrent.TimeUnit; +import com.google.common.collect.Sets; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; - - -import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; /** * worker registry test */ -@RunWith(SpringRunner.class) -@ContextConfiguration(classes={SpringZKServer.class, WorkerRegistry.class,ZookeeperRegistryCenter.class, WorkerConfig.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) - +@RunWith(MockitoJUnitRunner.Silent.class) public class WorkerRegistryTest { - @Autowired + private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryTest.class); + + private static final String TEST_WORKER_GROUP = "test"; + + @InjectMocks private WorkerRegistry workerRegistry; - @Autowired + @Mock private ZookeeperRegistryCenter zookeeperRegistryCenter; - @Autowired + @Mock + private ZookeeperCachedOperator zookeeperCachedOperator; + + @Mock + private CuratorFrameworkImpl zkClient; + + @Mock private WorkerConfig workerConfig; + @Before + public void before() { + Set workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP); + Mockito.when(workerConfig.getWorkerGroups()).thenReturn(workerGroups); + + Mockito.when(zookeeperRegistryCenter.getWorkerPath()).thenReturn("/dolphinscheduler/nodes/worker"); + Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator()).thenReturn(zookeeperCachedOperator); + Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient()).thenReturn(zkClient); + Mockito.when(zookeeperRegistryCenter.getZookeeperCachedOperator().getZkClient().getConnectionStateListenable()).thenReturn( + new Listenable() { + @Override + public void addListener(ConnectionStateListener connectionStateListener) { + LOGGER.info("add listener"); + } + + @Override + public void addListener(ConnectionStateListener connectionStateListener, Executor executor) { + LOGGER.info("add listener executor"); + } + + @Override + public void removeListener(ConnectionStateListener connectionStateListener) { + LOGGER.info("remove listener"); + } + }); + + Mockito.when(workerConfig.getWorkerHeartbeatInterval()).thenReturn(10); + + Mockito.when(workerConfig.getWorkerReservedMemory()).thenReturn(1.1); + + Mockito.when(workerConfig.getWorkerMaxCpuloadAvg()).thenReturn(1); + } + @Test - public void testRegistry() throws InterruptedException { + public void testRegistry() { + + workerRegistry.init(); + workerRegistry.registry(); + String workerPath = zookeeperRegistryCenter.getWorkerPath(); - Assert.assertEquals(DEFAULT_WORKER_GROUP, workerConfig.getWorkerGroup().trim()); - String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (NetUtils.getHost() + ":" + workerConfig.getListenPort()); - TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node - String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath); - Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length); + + int i = 0; + for (String workerGroup : workerConfig.getWorkerGroups()) { + String workerZkPath = workerPath + "/" + workerGroup.trim() + "/" + (NetUtils.getHost() + ":" + workerConfig.getListenPort()); + String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(workerZkPath); + if (0 == i) { + Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/test/")); + } else { + Assert.assertTrue(workerZkPath.startsWith("/dolphinscheduler/nodes/worker/default/")); + } + i++; + } + + workerRegistry.unRegistry(); + + workerConfig.getWorkerGroups().add(StringUtils.EMPTY); + workerRegistry.init(); + workerRegistry.registry(); + + workerRegistry.unRegistry(); + + // testEmptyWorkerGroupsRegistry + workerConfig.getWorkerGroups().remove(StringUtils.EMPTY); + workerConfig.getWorkerGroups().remove(TEST_WORKER_GROUP); + workerConfig.getWorkerGroups().remove(DEFAULT_WORKER_GROUP); + workerRegistry.init(); + workerRegistry.registry(); + + List testWorkerGroupPathZkChildren = zookeeperRegistryCenter.getChildrenKeys(workerPath + "/" + TEST_WORKER_GROUP); + List defaultWorkerGroupPathZkChildren = zookeeperRegistryCenter.getChildrenKeys(workerPath + "/" + DEFAULT_WORKER_GROUP); + + Assert.assertEquals(0, testWorkerGroupPathZkChildren.size()); + Assert.assertEquals(0, defaultWorkerGroupPathZkChildren.size()); } @Test - public void testUnRegistry() throws InterruptedException { + public void testUnRegistry() { + workerRegistry.init(); workerRegistry.registry(); - TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node + workerRegistry.unRegistry(); String workerPath = zookeeperRegistryCenter.getWorkerPath(); - String workerGroupPath = workerPath + "/" + workerConfig.getWorkerGroup().trim(); - List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath); - Assert.assertTrue(childrenKeys.isEmpty()); + + for (String workerGroup : workerConfig.getWorkerGroups()) { + String workerGroupPath = workerPath + "/" + workerGroup.trim(); + List childrenKeys = zookeeperRegistryCenter.getZookeeperCachedOperator().getChildrenKeys(workerGroupPath); + Assert.assertTrue(childrenKeys.isEmpty()); + } + + // testEmptyWorkerGroupsUnRegistry + workerConfig.getWorkerGroups().remove(TEST_WORKER_GROUP); + workerConfig.getWorkerGroups().remove(DEFAULT_WORKER_GROUP); + workerRegistry.init(); + workerRegistry.registry(); + + workerRegistry.unRegistry(); } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue new file mode 100644 index 0000000000..1201cb55b4 --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/_source/zookeeperDirectories.vue @@ -0,0 +1,112 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + + + + + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue index ceaed89675..c8c0ed194c 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/monitor/pages/servers/worker.vue @@ -22,8 +22,8 @@
IP: {{item.host}} - {{$t('Process Pid')}}: {{item.id}} - {{$t('Zk registration directory')}}: {{item.zkDirectory}} + {{$t('Process Pid')}}: {{item.port}} + {{$t('Zk registration directory')}}: {{$t('Directory detail')}}
{{$t('Create Time')}}: {{item.createTime | formatDate}} @@ -74,6 +74,7 @@ import mNoData from '@/module/components/noData/noData' import themeData from '@/module/echarts/themeData.json' import mListConstruction from '@/module/components/listConstruction/listConstruction' + import zookeeperDirectoriesPopup from './_source/zookeeperDirectories' export default { name: 'servers-worker', @@ -86,7 +87,25 @@ }, props: {}, methods: { - ...mapActions('monitor', ['getWorkerData']) + ...mapActions('monitor', ['getWorkerData']), + _showZkDirectories (item) { + let zkDirectories = [] + item.zkDirectories.forEach(zkDirectory => { + zkDirectories.push({ + zkDirectory: zkDirectory + }) + }) + this.$drawer({ + direction: 'right', + render (h) { + return h(zookeeperDirectoriesPopup, { + props: { + zkDirectories: zkDirectories + } + }) + } + }) + } }, watch: {}, created () { @@ -105,7 +124,7 @@ this.isLoading = true }) }, - components: { mList, mListConstruction, mSpin, mNoData, mGauge } + components: { mList, mListConstruction, mSpin, mNoData, mGauge, zookeeperDirectoriesPopup } }