diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java index 3f53c535d2..6afb9d54c2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/ApiApplicationServer.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api; import org.springframework.boot.SpringApplication; @@ -25,15 +26,12 @@ import org.springframework.context.annotation.FilterType; @SpringBootApplication @ServletComponentScan -@ComponentScan(basePackages = {"org.apache.dolphinscheduler"}, - excludeFilters = @ComponentScan.Filter(type = FilterType.REGEX, - pattern = "org.apache.dolphinscheduler.server.*")) - +@ComponentScan(value = "org.apache.dolphinscheduler", + excludeFilters = @ComponentScan.Filter(type = FilterType.REGEX, pattern = "org.apache.dolphinscheduler.server.*")) public class ApiApplicationServer extends SpringBootServletInitializer { - public static void main(String[] args) { - SpringApplication.run(ApiApplicationServer.class, args); - } - + public static void main(String[] args) { + SpringApplication.run(ApiApplicationServer.class, args); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java index c2c74ce3da..6c5b24c5a5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.api.controller; import static org.apache.dolphinscheduler.api.enums.Status.DELETE_WORKER_GROUP_FAIL; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_ADDRESS_LIST_FAIL; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_GROUP_FAIL; import static org.apache.dolphinscheduler.api.enums.Status.SAVE_ERROR; @@ -159,4 +160,20 @@ public class WorkerGroupController extends BaseController { return returnDataList(result); } + /** + * query worker address list + * + * @param loginUser login user + * @return all worker address list + */ + @ApiOperation(value = "queryWorkerAddressList", notes = "QUERY_WORKER_ADDRESS_LIST_NOTES") + @GetMapping(value = "/worker-address-list") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_WORKER_ADDRESS_LIST_FAIL) + public Result queryWorkerAddressList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser) { + logger.info("query worker address list: login user {}", RegexUtils.escapeNRT(loginUser.getUserName())); + Map result = workerGroupService.getWorkerAddressList(); + return returnDataList(result); + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 298dbe6770..461a2d8f11 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -209,6 +209,7 @@ public enum Status { CREATE_WORKER_GROUP_FORBIDDEN_IN_DOCKER(10175, "create worker group forbidden in docker ", "创建worker分组在docker中禁止"), DELETE_WORKER_GROUP_FORBIDDEN_IN_DOCKER(10176, "delete worker group forbidden in docker ", "删除worker分组在docker中禁止"), WORKER_ADDRESS_INVALID(10177, "worker address {0} invalid", "worker地址[{0}]无效"), + QUERY_WORKER_ADDRESS_LIST_FAIL(10178, "query worker address list fail ", "查询worker地址列表失败"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index a8b11f45e8..49ba33dac0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -62,4 +62,11 @@ public interface WorkerGroupService { */ Map deleteWorkerGroupById(User loginUser, Integer id); + /** + * query all worker address list + * + * @return all worker address list + */ + Map getWorkerAddressList(); + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index 381d1e853b..5d65c38785 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -81,10 +81,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro if (isNotAdmin(loginUser, result)) { return result; } - if (Constants.DOCKER_MODE && !Constants.KUBERNETES_MODE) { - putMsg(result, Status.CREATE_WORKER_GROUP_FORBIDDEN_IN_DOCKER); - return result; - } if (StringUtils.isEmpty(name)) { putMsg(result, Status.NAME_NULL); return result; @@ -303,10 +299,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro if (isNotAdmin(loginUser, result)) { return result; } - if (Constants.DOCKER_MODE && !Constants.KUBERNETES_MODE) { - putMsg(result, Status.DELETE_WORKER_GROUP_FORBIDDEN_IN_DOCKER); - return result; - } WorkerGroup workerGroup = workerGroupMapper.selectById(id); if (workerGroup == null) { putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST); @@ -323,4 +315,18 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro return result; } + /** + * query all worker address list + * + * @return all worker address list + */ + @Override + public Map getWorkerAddressList() { + Map result = new HashMap<>(); + List serverNodeList = zookeeperMonitor.getServerNodeList(ZKNodeType.WORKER, true); + result.put(Constants.DATA_LIST, serverNodeList); + putMsg(result, Status.SUCCESS); + return result; + } + } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java index 8a283c7a74..4314089ef4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/NetUtils.java @@ -86,6 +86,9 @@ public class NetUtils { if (STS_PATTERN.matcher(host).find()) { return String.format("%s.%s", host, host.replaceFirst("\\d+$", "headless")); } + } else if (canonicalHost.contains(".")) { + String[] items = canonicalHost.split("\\."); + return String.format("%s.%s", items[0], items[1]); } return canonicalHost; } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java index 1202b58e20..f912933d84 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/NetUtilsTest.java @@ -50,7 +50,7 @@ public class NetUtilsTest { when(address.getHostAddress()).thenReturn("172.17.0.15"); assertEquals("172.17.0.15", NetUtils.getHost(address)); CommonTest.setFinalStatic(Constants.class.getDeclaredField("KUBERNETES_MODE"), true); - assertEquals("dolphinscheduler-worker-0.dolphinscheduler-worker-headless.default.svc.cluster.local", NetUtils.getHost(address)); + assertEquals("dolphinscheduler-worker-0.dolphinscheduler-worker-headless", NetUtils.getHost(address)); address = mock(InetAddress.class); when(address.getCanonicalHostName()).thenReturn("dolphinscheduler-worker-0"); when(address.getHostName()).thenReturn("dolphinscheduler-worker-0"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index c44b8a63a1..6c05554d8b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; -import org.apache.dolphinscheduler.server.worker.WorkerServer; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; @@ -45,8 +44,15 @@ import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.FilterType; import org.springframework.transaction.annotation.EnableTransactionManagement; +/** + * master server + */ @ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { - @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class}) + @ComponentScan.Filter(type = FilterType.REGEX, pattern = { + "org.apache.dolphinscheduler.server.worker.*", + "org.apache.dolphinscheduler.server.monitor.*", + "org.apache.dolphinscheduler.server.log.*" + }) }) @EnableTransactionManagement public class MasterServer implements IStoppable { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 18a7d4691d..5a0647fd67 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor; -import org.apache.commons.collections.CollectionUtils; - import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; @@ -31,17 +29,22 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; + +import org.apache.commons.collections.CollectionUtils; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import javax.annotation.PostConstruct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; - -import java.util.*; - /** * netty executor manager */ @@ -51,10 +54,10 @@ public class NettyExecutorManager extends AbstractExecutorManager{ private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class); /** - * zookeeper node manager + * server node manager */ @Autowired - private ZookeeperNodeManager zookeeperNodeManager; + private ServerNodeManager serverNodeManager; /** * netty remote client @@ -182,7 +185,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ ExecutorType executorType = context.getExecutorType(); switch (executorType){ case WORKER: - nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup()); + nodes = serverNodeManager.getWorkerGroupNodes(context.getWorkerGroup()); break; case CLIENT: break; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java index 8e1ead38fd..95f97deb5b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java @@ -18,55 +18,32 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.ResInfo; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.dao.entity.WorkerGroup; -import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; -import org.apache.dolphinscheduler.server.zk.ZKMasterClient; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; +import java.util.Set; import org.springframework.beans.factory.annotation.Autowired; /** - * round robin host manager + * common host manager */ public abstract class CommonHostManager implements HostManager { /** - * zookeeper registry center + * server node manager */ @Autowired - protected ZookeeperRegistryCenter registryCenter; - - /** - * zookeeper node manager - */ - @Autowired - protected ZookeeperNodeManager zookeeperNodeManager; - - /** - * zk master client - */ - @Autowired - protected ZKMasterClient zkMasterClient; - - /** - * worker group mapper - */ - @Autowired - protected WorkerGroupMapper workerGroupMapper; + protected ServerNodeManager serverNodeManager; /** * select host @@ -80,10 +57,7 @@ public abstract class CommonHostManager implements HostManager { ExecutorType executorType = context.getExecutorType(); switch (executorType) { case WORKER: - candidates = getHostWorkersFromDatabase(workerGroup); - if (candidates.isEmpty()) { - candidates = getHostWorkersFromZookeeper(workerGroup); - } + candidates = getWorkerCandidates(workerGroup); break; case CLIENT: break; @@ -99,31 +73,12 @@ public abstract class CommonHostManager implements HostManager { protected abstract HostWorker select(Collection nodes); - protected List getHostWorkersFromDatabase(String workerGroup) { + protected List getWorkerCandidates(String workerGroup) { List hostWorkers = new ArrayList<>(); - List workerGroups = workerGroupMapper.queryWorkerGroupByName(workerGroup); - if (CollectionUtils.isNotEmpty(workerGroups)) { - Map serverMaps = zkMasterClient.getServerMaps(ZKNodeType.WORKER, true); - for (WorkerGroup wg : workerGroups) { - for (String addr : wg.getAddrList().split(Constants.COMMA)) { - if (serverMaps.containsKey(addr)) { - String heartbeat = serverMaps.get(addr); - int hostWeight = getWorkerHostWeightFromHeartbeat(heartbeat); - hostWorkers.add(HostWorker.of(addr, hostWeight, workerGroup)); - } - } - } - } - return hostWorkers; - } - - protected List getHostWorkersFromZookeeper(String workerGroup) { - List hostWorkers = new ArrayList<>(); - Collection nodes = zookeeperNodeManager.getWorkerGroupNodes(workerGroup); + Set nodes = serverNodeManager.getWorkerGroupNodes(workerGroup); if (CollectionUtils.isNotEmpty(nodes)) { for (String node : nodes) { - String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup); - String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node); + String heartbeat = serverNodeManager.getWorkerNodeInfo(node); int hostWeight = getWorkerHostWeightFromHeartbeat(heartbeat); hostWorkers.add(HostWorker.of(node, hostWeight, workerGroup)); } @@ -142,12 +97,4 @@ public abstract class CommonHostManager implements HostManager { return hostWeight; } - public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) { - this.zookeeperNodeManager = zookeeperNodeManager; - } - - public ZookeeperNodeManager getZookeeperNodeManager() { - return zookeeperNodeManager; - } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java index ec65cabb0b..fccc31b648 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManager.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; - import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index e0b585a50e..8b7d37c4e3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -18,11 +18,9 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.ResInfo; -import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; @@ -30,11 +28,9 @@ import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin; -import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -51,17 +47,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * round robin host manager + * lower weight host manager */ public class LowerWeightHostManager extends CommonHostManager { private final Logger logger = LoggerFactory.getLogger(LowerWeightHostManager.class); - /** - * round robin host manager - */ - private RoundRobinHostManager roundRobinHostManager; - /** * selector */ @@ -89,8 +80,6 @@ public class LowerWeightHostManager extends CommonHostManager { this.lock = new ReentrantLock(); this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor")); this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),0, 5, TimeUnit.SECONDS); - this.roundRobinHostManager = new RoundRobinHostManager(); - this.roundRobinHostManager.setZookeeperNodeManager(getZookeeperNodeManager()); } @PreDestroy @@ -142,42 +131,21 @@ public class LowerWeightHostManager extends CommonHostManager { public void run() { try { Map> workerHostWeights = new HashMap<>(); - // from database - List workerGroups = workerGroupMapper.queryAllWorkerGroup(); - if (CollectionUtils.isNotEmpty(workerGroups)) { - Map serverMaps = zkMasterClient.getServerMaps(ZKNodeType.WORKER, true); - for (WorkerGroup wg : workerGroups) { - String workerGroup = wg.getName(); - List addrs = Arrays.asList(wg.getAddrList().split(Constants.COMMA)); - Set hostWeights = new HashSet<>(addrs.size()); - for (String addr : addrs) { - if (serverMaps.containsKey(addr)) { - String heartbeat = serverMaps.get(addr); - HostWeight hostWeight = getHostWeight(addr, workerGroup, heartbeat); - if (hostWeight != null) { - hostWeights.add(hostWeight); - } - } - } - workerHostWeights.put(workerGroup, hostWeights); - } - } - // from zookeeper - Map> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes(); - Set>> entries = workerGroupNodes.entrySet(); - for (Map.Entry> entry : entries) { + Map> workerGroupNodes = serverNodeManager.getWorkerGroupNodes(); + for (Map.Entry> entry : workerGroupNodes.entrySet()) { String workerGroup = entry.getKey(); Set nodes = entry.getValue(); - String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup); Set hostWeights = new HashSet<>(nodes.size()); for (String node : nodes) { - String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node); + String heartbeat = serverNodeManager.getWorkerNodeInfo(node); HostWeight hostWeight = getHostWeight(node, workerGroup, heartbeat); if (hostWeight != null) { hostWeights.add(hostWeight); } } - workerHostWeights.put(workerGroup, hostWeights); + if (!hostWeights.isEmpty()) { + workerHostWeights.put(workerGroup, hostWeights); + } } syncWorkerHostWeight(workerHostWeights); } catch (Throwable ex) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java index ad4f4aa6fd..b4288dc666 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java @@ -23,7 +23,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSele import java.util.Collection; /** - * round robin host manager + * random host manager */ public class RandomHostManager extends CommonHostManager { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java index c6b173472f..28c6f0d17d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java @@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobin import java.util.Collection; - /** * round robin host manager */ @@ -36,7 +35,7 @@ public class RoundRobinHostManager extends CommonHostManager { /** * set round robin */ - public RoundRobinHostManager(){ + public RoundRobinHostManager() { this.selector = new RoundRobinSelector(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java similarity index 60% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java index 4dfdb80e52..a487c21218 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java @@ -17,63 +17,103 @@ package org.apache.dolphinscheduler.server.registry; -import org.apache.commons.collections.CollectionUtils; -import org.apache.curator.framework.CuratorFramework; - -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; +import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; +import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.service.zk.AbstractListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; +import org.apache.dolphinscheduler.service.zk.AbstractZKClient; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import javax.annotation.PreDestroy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; /** - * zookeeper node manager + * server node manager */ @Service -public class ZookeeperNodeManager implements InitializingBean { +public class ServerNodeManager implements InitializingBean { - private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class); + private final Logger logger = LoggerFactory.getLogger(ServerNodeManager.class); /** - * master lock + * master lock */ private final Lock masterLock = new ReentrantLock(); /** - * worker group lock + * worker group lock */ private final Lock workerGroupLock = new ReentrantLock(); /** - * worker group nodes + * worker node info lock + */ + private final Lock workerNodeInfoLock = new ReentrantLock(); + + /** + * worker group nodes */ private final ConcurrentHashMap> workerGroupNodes = new ConcurrentHashMap<>(); /** - * master nodes + * master nodes */ private final Set masterNodes = new HashSet<>(); + /** + * worker node info + */ + private final Map workerNodeInfo = new HashMap<>(); + + /** + * executor service + */ + private ScheduledExecutorService executorService; + + /** + * zk client + */ + @Autowired + private ZKClient zkClient; + /** * zookeeper registry center */ @Autowired private ZookeeperRegistryCenter registryCenter; + /** + * worker group mapper + */ + @Autowired + private WorkerGroupMapper workerGroupMapper; + /** * alert dao */ @@ -87,9 +127,14 @@ public class ZookeeperNodeManager implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { /** - * load nodes from zookeeper + * load nodes from zookeeper */ load(); + /** + * init executor service + */ + executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); + executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS); /** * init MasterNodeListener listener */ @@ -103,22 +148,59 @@ public class ZookeeperNodeManager implements InitializingBean { /** * load nodes from zookeeper */ - private void load(){ + private void load() { /** * master nodes from zookeeper */ - Set masterNodes = registryCenter.getMasterNodesDirectly(); - syncMasterNodes(masterNodes); + Set initMasterNodes = registryCenter.getMasterNodesDirectly(); + syncMasterNodes(initMasterNodes); /** * worker group nodes from zookeeper */ Set workerGroups = registryCenter.getWorkerGroupDirectly(); - for(String workerGroup : workerGroups){ + for (String workerGroup : workerGroups) { syncWorkerGroupNodes(workerGroup, registryCenter.getWorkerGroupNodesDirectly(workerGroup)); } } + /** + * zookeeper client + */ + @Component + static class ZKClient extends AbstractZKClient {} + + /** + * worker node info and worker group db sync task + */ + class WorkerNodeInfoAndGroupDbSyncTask implements Runnable { + + @Override + public void run() { + // sync worker node info + Map newWorkerNodeInfo = zkClient.getServerMaps(ZKNodeType.WORKER, true); + syncWorkerNodeInfo(newWorkerNodeInfo); + + // sync worker group nodes from database + List workerGroupList = workerGroupMapper.queryAllWorkerGroup(); + if (CollectionUtils.isNotEmpty(workerGroupList)) { + for (WorkerGroup wg : workerGroupList) { + String workerGroup = wg.getName(); + Set nodes = new HashSet<>(); + String[] addrs = wg.getAddrList().split(Constants.COMMA); + for (String addr : addrs) { + if (newWorkerNodeInfo.containsKey(addr)) { + nodes.add(addr); + } + } + if (!nodes.isEmpty()) { + syncWorkerGroupNodes(workerGroup, nodes); + } + } + } + } + } + /** * worker group node listener */ @@ -126,7 +208,7 @@ public class ZookeeperNodeManager implements InitializingBean { @Override protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { - if(registryCenter.isWorkerPath(path)){ + if (registryCenter.isWorkerPath(path)) { try { if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { logger.info("worker group node : {} added.", path); @@ -141,25 +223,23 @@ public class ZookeeperNodeManager implements InitializingBean { syncWorkerGroupNodes(group, currentNodes); alertDao.sendServerStopedAlert(1, path, "WORKER"); } - } catch (IllegalArgumentException ignore) { - logger.warn(ignore.getMessage()); + } catch (IllegalArgumentException ex) { + logger.warn(ex.getMessage()); } catch (Exception ex) { logger.error("WorkerGroupListener capture data change and get data failed", ex); } } } - private String parseGroup(String path){ - String[] parts = path.split("\\/"); + private String parseGroup(String path) { + String[] parts = path.split("/"); if (parts.length < 6) { throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path)); } - String group = parts[parts.length - 2]; - return group; + return parts[parts.length - 2]; } } - /** * master node listener */ @@ -203,7 +283,7 @@ public class ZookeeperNodeManager implements InitializingBean { * sync master nodes * @param nodes master nodes */ - private void syncMasterNodes(Set nodes){ + private void syncMasterNodes(Set nodes) { masterLock.lock(); try { masterNodes.clear(); @@ -218,7 +298,7 @@ public class ZookeeperNodeManager implements InitializingBean { * @param workerGroup worker group * @param nodes worker nodes */ - private void syncWorkerGroupNodes(String workerGroup, Set nodes){ + private void syncWorkerGroupNodes(String workerGroup, Set nodes) { workerGroupLock.lock(); try { workerGroup = workerGroup.toLowerCase(); @@ -231,7 +311,7 @@ public class ZookeeperNodeManager implements InitializingBean { } } - public Map> getWorkerGroupNodes(){ + public Map> getWorkerGroupNodes() { return Collections.unmodifiableMap(workerGroupNodes); } @@ -240,15 +320,15 @@ public class ZookeeperNodeManager implements InitializingBean { * @param workerGroup workerGroup * @return worker nodes */ - public Set getWorkerGroupNodes(String workerGroup){ + public Set getWorkerGroupNodes(String workerGroup) { workerGroupLock.lock(); try { - if(StringUtils.isEmpty(workerGroup)){ - workerGroup = DEFAULT_WORKER_GROUP; + if (StringUtils.isEmpty(workerGroup)) { + workerGroup = Constants.DEFAULT_WORKER_GROUP; } workerGroup = workerGroup.toLowerCase(); Set nodes = workerGroupNodes.get(workerGroup); - if(CollectionUtils.isNotEmpty(nodes)){ + if (CollectionUtils.isNotEmpty(nodes)) { return Collections.unmodifiableSet(nodes); } return nodes; @@ -258,9 +338,48 @@ public class ZookeeperNodeManager implements InitializingBean { } /** - * close + * get worker node info + * @return worker node info + */ + public Map getWorkerNodeInfo() { + return Collections.unmodifiableMap(workerNodeInfo); + } + + /** + * get worker node info + * @param workerNode worker node + * @return worker node info + */ + public String getWorkerNodeInfo(String workerNode) { + workerNodeInfoLock.lock(); + try { + return workerNodeInfo.getOrDefault(workerNode, null); + } finally { + workerNodeInfoLock.unlock(); + } + } + + /** + * sync worker node info + * @param newWorkerNodeInfo new worker node info */ - public void close(){ + private void syncWorkerNodeInfo(Map newWorkerNodeInfo) { + workerNodeInfoLock.lock(); + try { + workerNodeInfo.clear(); + workerNodeInfo.putAll(newWorkerNodeInfo); + } finally { + workerNodeInfoLock.unlock(); + } + } + + /** + * destroy + */ + @PreDestroy + public void destroy() { + executorService.shutdownNow(); registryCenter.close(); } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index dedfb46008..67e1ee7b0a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -45,12 +45,21 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.WebApplicationType; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.FilterType; import org.springframework.transaction.annotation.EnableTransactionManagement; /** * worker server */ -@ComponentScan("org.apache.dolphinscheduler") +@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { + @ComponentScan.Filter(type = FilterType.REGEX, pattern = { + "org.apache.dolphinscheduler.server.master.*", + "org.apache.dolphinscheduler.server.monitor.*", + "org.apache.dolphinscheduler.server.log.*", + "org.apache.dolphinscheduler.server.zk.ZKMasterClient", + "org.apache.dolphinscheduler.server.registry.ServerNodeManager" + }) +}) @EnableTransactionManagement public class WorkerServer implements IStoppable { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 14b1c61ce3..60c9053b09 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -39,7 +39,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; @@ -70,7 +70,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = {DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, ZKMasterClient.class, TaskPriorityQueueConsumer.class, - ZookeeperNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class, MasterRegistry.class, + ServerNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class, MasterRegistry.class, CuratorZookeeperClient.class, SpringConnectionFactory.class}) public class TaskPriorityQueueConsumerTest { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java index de1f0517bd..ecbfed21b1 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java @@ -14,16 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.dispatch; +package org.apache.dolphinscheduler.server.master.dispatch; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -34,6 +35,7 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; @@ -47,7 +49,8 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, WorkerRegistry.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, WorkerConfig.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class}) + ServerNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, CuratorZookeeperClient.class, + SpringConnectionFactory.class}) public class ExecutorDispatcherTest { @Autowired diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java index f8e6e65521..8c53560804 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -30,7 +31,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; @@ -40,6 +41,7 @@ import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -53,8 +55,9 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, WorkerRegistry.class, - ZookeeperNodeManager.class, ZookeeperRegistryCenter.class, WorkerConfig.class, CuratorZookeeperClient.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, SpringApplicationContext.class, NettyExecutorManager.class}) + ServerNodeManager.class, ZookeeperRegistryCenter.class, WorkerConfig.class, CuratorZookeeperClient.class, + ZookeeperCachedOperator.class, ZookeeperConfig.class, SpringApplicationContext.class, NettyExecutorManager.class, + SpringConnectionFactory.class}) public class NettyExecutorManagerTest { @Autowired diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java index ab2cc12e20..661e329778 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java @@ -17,12 +17,10 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; -import com.google.common.collect.Sets; - import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils; import org.junit.Assert; @@ -33,6 +31,8 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; +import com.google.common.collect.Sets; + /** * round robin host manager test @@ -40,16 +40,15 @@ import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class RoundRobinHostManagerTest { - @Mock - private ZookeeperNodeManager zookeeperNodeManager; + private ServerNodeManager serverNodeManager; @InjectMocks RoundRobinHostManager roundRobinHostManager; @Test public void testSelectWithEmptyResult() { - Mockito.when(zookeeperNodeManager.getWorkerGroupNodes("default")).thenReturn(null); + Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(null); ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000); Host emptyHost = roundRobinHostManager.select(context); Assert.assertTrue(StringUtils.isEmpty(emptyHost.getAddress())); @@ -57,7 +56,7 @@ public class RoundRobinHostManagerTest { @Test public void testSelectWithResult() { - Mockito.when(zookeeperNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22:100")); + Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22")); ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000); Host host = roundRobinHostManager.select(context); Assert.assertTrue(StringUtils.isNotEmpty(host.getAddress())); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java similarity index 82% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java rename to dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java index c7fa8f3b65..4c8e4cffea 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java @@ -14,14 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.registry; - -import java.util.Map; -import java.util.Set; +package org.apache.dolphinscheduler.server.registry; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -30,6 +28,10 @@ import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.zk.CuratorZookeeperClient; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import java.util.Map; +import java.util.Set; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -38,16 +40,16 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** - * zookeeper node manager test + * server node manager test */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,WorkerRegistry.class, - ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, CuratorZookeeperClient.class}) -public class ZookeeperNodeManagerTest { + ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, SpringConnectionFactory.class, + ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class, CuratorZookeeperClient.class}) +public class ServerNodeManagerTest { @Autowired - private ZookeeperNodeManager zookeeperNodeManager; + private ServerNodeManager serverNodeManager; @Autowired private MasterRegistry masterRegistry; @@ -68,11 +70,11 @@ public class ZookeeperNodeManagerTest { public void testGetMasterNodes(){ masterRegistry.registry(); try { - //let the zookeeperNodeManager catch the registry event + //let the serverNodeManager catch the registry event Thread.sleep(2000); } catch (InterruptedException ignore) { } - Set masterNodes = zookeeperNodeManager.getMasterNodes(); + Set masterNodes = serverNodeManager.getMasterNodes(); Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes)); Assert.assertEquals(1, masterNodes.size()); Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next()); @@ -83,11 +85,11 @@ public class ZookeeperNodeManagerTest { public void testGetWorkerGroupNodes(){ workerRegistry.registry(); try { - //let the zookeeperNodeManager catch the registry event + //let the serverNodeManager catch the registry event Thread.sleep(2000); } catch (InterruptedException ignore) { } - Map> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes(); + Map> workerGroupNodes = serverNodeManager.getWorkerGroupNodes(); Assert.assertEquals(1, workerGroupNodes.size()); Assert.assertEquals("default".trim(), workerGroupNodes.keySet().iterator().next()); workerRegistry.unRegistry(); @@ -97,12 +99,11 @@ public class ZookeeperNodeManagerTest { public void testGetWorkerGroupNodesWithParam(){ workerRegistry.registry(); try { - //let the zookeeperNodeManager catch the registry event + //let the serverNodeManager catch the registry event Thread.sleep(3000); } catch (InterruptedException ignore) { } - Map> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes(); - Set workerNodes = zookeeperNodeManager.getWorkerGroupNodes("default"); + Set workerNodes = serverNodeManager.getWorkerGroupNodes("default"); Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes)); Assert.assertEquals(1, workerNodes.size()); Assert.assertEquals(NetUtils.getAddr(workerConfig.getListenPort()), workerNodes.iterator().next()); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index bf04f1f569..ac340e92ed 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -34,7 +35,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -63,25 +64,12 @@ import io.netty.channel.Channel; */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = { - TaskCallbackServiceTestConfig.class, - SpringZKServer.class, - SpringApplicationContext.class, - MasterRegistry.class, - WorkerRegistry.class, - ZookeeperRegistryCenter.class, - MasterConfig.class, - WorkerConfig.class, - RegisterOperator.class, - ZookeeperConfig.class, - ZookeeperNodeManager.class, - TaskCallbackService.class, - TaskResponseService.class, - TaskAckProcessor.class, - TaskResponseProcessor.class, - TaskExecuteProcessor.class, - CuratorZookeeperClient.class, - TaskExecutionContextCacheManagerImpl.class, - WorkerManagerThread.class}) + TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, + SpringConnectionFactory.class, MasterRegistry.class, WorkerRegistry.class, ZookeeperRegistryCenter.class, + MasterConfig.class, WorkerConfig.class, RegisterOperator.class, ZookeeperConfig.class, ServerNodeManager.class, + TaskCallbackService.class, TaskResponseService.class, TaskAckProcessor.class, TaskResponseProcessor.class, + TaskExecuteProcessor.class, CuratorZookeeperClient.class, TaskExecutionContextCacheManagerImpl.class, + WorkerManagerThread.class}) public class TaskCallbackServiceTest { @Autowired diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index 029dfac002..f4501bb00e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -29,9 +29,12 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,6 +102,28 @@ public abstract class AbstractZKClient extends RegisterOperator { return serverList; } + /** + * get server zk nodes. + * + * @param zkNodeType zookeeper node type + * @return result : list + */ + public List getServerZkNodes(ZKNodeType zkNodeType) { + String path = getZNodeParentPath(zkNodeType); + List serverList = super.getChildrenKeys(path); + if (zkNodeType == ZKNodeType.WORKER) { + List workerList = new ArrayList<>(); + for (String group : serverList) { + List groupServers = super.getChildrenKeys(path + Constants.SLASH + group); + for (String groupServer : groupServers) { + workerList.add(group + Constants.SLASH + groupServer); + } + } + serverList = workerList; + } + return serverList; + } + /** * get server list map. * @@ -110,17 +135,7 @@ public abstract class AbstractZKClient extends RegisterOperator { Map serverMap = new HashMap<>(); try { String path = getZNodeParentPath(zkNodeType); - List serverList = super.getChildrenKeys(path); - if (zkNodeType == ZKNodeType.WORKER) { - List workerList = new ArrayList<>(); - for (String group : serverList) { - List groupServers = super.getChildrenKeys(path + Constants.SLASH + group); - for (String groupServer : groupServers) { - workerList.add(group + Constants.SLASH + groupServer); - } - } - serverList = workerList; - } + List serverList = getServerZkNodes(zkNodeType); for (String server : serverList) { String host = server; if (zkNodeType == ZKNodeType.WORKER && hostOnly) { @@ -145,10 +160,48 @@ public abstract class AbstractZKClient extends RegisterOperator { return getServerMaps(zkNodeType, false); } + /** + * get server node set. + * + * @param zkNodeType zookeeper node type + * @param hostOnly host only + * @return result : set + */ + public Set getServerNodeSet(ZKNodeType zkNodeType, boolean hostOnly) { + Set serverSet = new HashSet<>(); + try { + List serverList = getServerZkNodes(zkNodeType); + for (String server : serverList) { + String host = server; + if (zkNodeType == ZKNodeType.WORKER && hostOnly) { + host = server.split(Constants.SLASH)[1]; + } + serverSet.add(host); + } + } catch (Exception e) { + logger.error("get server node set failed", e); + } + return serverSet; + } + + /** + * get server node list. + * + * @param zkNodeType zookeeper node type + * @param hostOnly host only + * @return result : list + */ + public List getServerNodeList(ZKNodeType zkNodeType, boolean hostOnly) { + Set serverSet = getServerNodeSet(zkNodeType, hostOnly); + List serverList = new ArrayList<>(serverSet); + Collections.sort(serverList); + return serverList; + } + /** * check the zookeeper node already exists * - * @param host host + * @param host host * @param zkNodeType zookeeper node type * @return true if exists */ diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue index bcc52313aa..716d466643 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/instance/pages/list/_source/list.vue @@ -64,7 +64,7 @@ - + - + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/createWorker.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/createWorker.vue index b799c1afd5..37f9bd1895 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/createWorker.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/createWorker.vue @@ -19,7 +19,8 @@ ref="popover" :ok-text="item ? $t('Edit') : $t('Submit')" @ok="_ok" - @close="close"> + @close="close" + style="width: 700px;"> @@ -77,6 +77,7 @@ total: null, isLoading: false, workerGroupList: [], + workerAddressList: [], searchParams: { pageSize: 10, pageNo: 1, @@ -90,7 +91,7 @@ mixins: [listUrlParamHandle], props: {}, methods: { - ...mapActions('security', ['getWorkerGroups']), + ...mapActions('security', ['getWorkerGroups', 'getWorkerAddresses']), /** * Inquire */ @@ -135,6 +136,11 @@ }).catch(e => { this.isLoading = false }) + }, + _getWorkerAddressList () { + this.getWorkerAddresses().then(res => { + this.workerAddressList = res.data.map(x => ({ id: x, label: x })) + }) } }, watch: { @@ -144,7 +150,9 @@ this.searchParams.pageNo = _.isEmpty(a.query) ? 1 : a.query.pageNo } }, - created () {}, + created () { + this._getWorkerAddressList() + }, mounted () { }, components: { mList, mListConstruction, mConditions, mSpin, mNoData, mCreateWorker } diff --git a/dolphinscheduler-ui/src/js/conf/home/store/security/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/security/actions.js index 72dcd07516..87a96dde49 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/security/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/security/actions.js @@ -630,5 +630,14 @@ export default { reject(e) }) }) + }, + getWorkerAddresses ({ state }, payload) { + return new Promise((resolve, reject) => { + io.get('worker-group/worker-address-list', payload, res => { + resolve(res) + }).catch(e => { + reject(e) + }) + }) } } diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js index a504112628..8d8ccbeaea 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js @@ -504,8 +504,7 @@ export default { 'Edit token': 'Edit token', Addresses: 'Addresses', 'Worker Addresses': 'Worker Addresses', - 'Please enter the worker addresses separated by commas': 'Please enter the worker addresses separated by commas', - 'Note: Multiple worker addresses have been comma separated': 'Note: Multiple worker addresses have been comma separated', + 'Please select the worker addresses': 'Please select the worker addresses', 'Failure time': 'Failure time', 'Expiration time': 'Expiration time', User: 'User', @@ -571,7 +570,6 @@ export default { 'Please Enter Http Condition': 'Please Enter Http Condition', 'There is no data for this period of time': 'There is no data for this period of time', 'Worker addresses cannot be empty': 'Worker addresses cannot be empty', - 'Please enter the correct worker addresses': 'Please enter the correct worker addresses', 'Please generate token': 'Please generate token', 'Spark Version': 'Spark Version', TargetDataBase: 'target database', diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js index e9a3603e2a..4ebf4c2eca 100755 --- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js +++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js @@ -504,8 +504,7 @@ export default { 'Edit token': '编辑令牌', Addresses: '地址', 'Worker Addresses': 'Worker地址', - 'Please enter the worker addresses separated by commas': '请输入Worker地址,多个用英文逗号隔开', - 'Note: Multiple worker addresses have been comma separated': '注意:多个Worker地址以英文逗号分割', + 'Please select the worker addresses': '请选择Worker地址', 'Failure time': '失效时间', 'Expiration time': '失效时间', User: '用户', @@ -571,7 +570,6 @@ export default { 'Please Enter Http Condition': '请填写校验内容', 'There is no data for this period of time': '该时间段无数据', 'Worker addresses cannot be empty': 'Worker地址不能为空', - 'Please enter the correct worker addresses': '请输入正确的Worker地址', 'Please generate token': '请生成Token', 'Spark Version': 'Spark版本', TargetDataBase: '目标库', diff --git a/pom.xml b/pom.xml index 3d892ddc0c..90051ca53c 100644 --- a/pom.xml +++ b/pom.xml @@ -934,7 +934,7 @@ **/server/master/processor/TaskAckProcessorTest.java **/server/master/processor/TaskKillResponseProcessorTest.java **/server/master/processor/queue/TaskResponseServiceTest.java - **/server/register/ZookeeperNodeManagerTest.java + **/server/register/ServerNodeManagerTest.java **/server/register/ZookeeperRegistryCenterTest.java **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index 5e71b4a48f..3218223ccc 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -795,11 +795,12 @@ CREATE TABLE `t_ds_user` ( DROP TABLE IF EXISTS `t_ds_worker_group`; CREATE TABLE `t_ds_worker_group` ( `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id', - `name` varchar(256) NULL DEFAULT NULL COMMENT 'worker group name', + `name` varchar(256) NOT NULL COMMENT 'worker group name', `addr_list` text NULL DEFAULT NULL COMMENT 'worker addr list. split by [,]', `create_time` datetime NULL DEFAULT NULL COMMENT 'create time', `update_time` datetime NULL DEFAULT NULL COMMENT 'update time', - PRIMARY KEY (`id`) + PRIMARY KEY (`id`), + UNIQUE KEY `name_unique` (`name`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; -- ---------------------------- diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql index 17bfc7072b..0193e577b3 100644 --- a/sql/dolphinscheduler_postgre.sql +++ b/sql/dolphinscheduler_postgre.sql @@ -658,11 +658,12 @@ create index version_index on t_ds_version(version); DROP TABLE IF EXISTS t_ds_worker_group; CREATE TABLE t_ds_worker_group ( id bigint NOT NULL , - name varchar(256) DEFAULT NULL , + name varchar(256) NOT NULL , addr_list text DEFAULT NULL , create_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL , - PRIMARY KEY (id) + PRIMARY KEY (id) , + CONSTRAINT name_unique UNIQUE (name) ) ; -- diff --git a/sql/upgrade/1.3.6_schema/mysql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.6_schema/mysql/dolphinscheduler_ddl.sql index 1e492a6a87..25b7f5e764 100644 --- a/sql/upgrade/1.3.6_schema/mysql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.6_schema/mysql/dolphinscheduler_ddl.sql @@ -28,6 +28,8 @@ BEGIN AND COLUMN_NAME ='ip_list') THEN ALTER TABLE t_ds_worker_group CHANGE COLUMN `ip_list` `addr_list` text; + ALTER TABLE t_ds_worker_group MODIFY COLUMN `name` varchar(256) NOT NULL; + ALTER TABLE t_ds_worker_group ADD UNIQUE KEY `name_unique` (`name`); END IF; END; diff --git a/sql/upgrade/1.3.6_schema/postgresql/dolphinscheduler_ddl.sql b/sql/upgrade/1.3.6_schema/postgresql/dolphinscheduler_ddl.sql index d39a0321b6..17a8807054 100644 --- a/sql/upgrade/1.3.6_schema/postgresql/dolphinscheduler_ddl.sql +++ b/sql/upgrade/1.3.6_schema/postgresql/dolphinscheduler_ddl.sql @@ -15,7 +15,6 @@ * limitations under the License. */ - -- uc_dolphin_T_t_ds_worker_group_A_ip_list delimiter d// CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_worker_group_A_ip_list() RETURNS void AS $$ @@ -24,8 +23,10 @@ BEGIN WHERE TABLE_NAME='t_ds_worker_group' AND COLUMN_NAME ='ip_list') THEN - ALTER TABLE t_ds_worker_group rename ip_list TO addr_list; - ALTER TABLE t_ds_worker_group ALTER column addr_list type text; + ALTER TABLE t_ds_worker_group RENAME ip_list TO addr_list; + ALTER TABLE t_ds_worker_group ALTER COLUMN addr_list type text; + ALTER TABLE t_ds_worker_group ALTER COLUMN name type varchar(256), ALTER COLUMN name SET NOT NULL; + ALTER TABLE t_ds_worker_group ADD CONSTRAINT name_unique UNIQUE (name); END IF; END; $$ LANGUAGE plpgsql; @@ -36,5 +37,5 @@ SELECT uc_dolphin_T_t_ds_worker_group_A_ip_list(); DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_worker_group_A_ip_list(); -- Add foreign key constraints for t_ds_task_instance -- +delimiter ; ALTER TABLE t_ds_task_instance ADD CONSTRAINT foreign_key_instance_id FOREIGN KEY(process_instance_id) REFERENCES t_ds_process_instance(id) ON DELETE CASCADE; -