From 9d36eaf6df627e9fc564a7c57182052ddf754eec Mon Sep 17 00:00:00 2001 From: Tboy Date: Sun, 1 Mar 2020 21:42:23 +0800 Subject: [PATCH] Refactor worker (#2044) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refactor worker (#10) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei * updates Co-authored-by: qiaozhanwei * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei * updates * add- register processor Co-authored-by: qiaozhanwei * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * Refactor worker (#2018) * Refactor worker (#7) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei * updates Co-authored-by: qiaozhanwei * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei * updates * add- register processor Co-authored-by: qiaozhanwei * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type Co-authored-by: qiaozhanwei * Refactor worker (#8) * Refactor worker (#2000) * Refactor worker (#2) * Refactor worker (#1993) * Refactor worker (#1) * add TaskResponseProcessor (#1983) * 1, master persistent task 2. extract master and worker communication model (#1992) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error Co-authored-by: qiaozhanwei * updates Co-authored-by: qiaozhanwei * TaskExecutionContext create modify (#1994) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify Co-authored-by: qiaozhanwei * updates * add- register processor Co-authored-by: qiaozhanwei * buildAckCommand taskInstanceId not set modify (#2002) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify (#2004) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment (#2006) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type (#2012) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type Co-authored-by: qiaozhanwei * add kill command Co-authored-by: qiaozhanwei * add TaskInstanceCacheManager receive Worker report result,modify master polling db transfrom to cache (#2021) * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * 1, master persistent task 2. extract master and worker communication model * add license * modify javadoc error * TaskExecutionContext create modify * buildAckCommand taskInstanceId not set modify * java doc error modify * add comment * ExecutorManager interface add generic type * add TaskInstanceCacheManager receive Worker report result * TaskInstance setExecutePath * add TaskInstanceCacheManager to receive Worker Task result report * TaskInstanceCacheManager add remove method * add license * add dispatcht task method * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access * AbstractCommandExecutor remove db access Co-authored-by: qiaozhanwei * refactor heartbeat logic * update registry and add worker group * add worker group * add lowerWeight host manager Co-authored-by: qiaozhanwei --- .../server/master/config/MasterConfig.java | 11 ++ .../master/dispatch/ExecutorDispatcher.java | 14 +- .../dispatch/host/CommonHostManager.java | 88 +++++++++ .../dispatch/host/HostManagerConfig.java | 64 +++++++ .../dispatch/host/LowerWeightHostManager.java | 171 ++++++++++++++++++ .../dispatch/host/RandomHostManager.java | 48 +++++ .../dispatch/host/RoundRobinHostManager.java | 55 +----- .../dispatch/host/assign/HostSelector.java | 39 ++++ .../dispatch/host/assign/HostWeight.java | 73 ++++++++ .../host/assign/LowerWeightRoundRobin.java | 46 +++++ .../server/registry/ZookeeperNodeManager.java | 5 + .../host/LowerWeightRoundRobinTest.java | 43 +++++ 12 files changed, 603 insertions(+), 54 deletions(-) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index efb7cff1a7..e8a8ecbe43 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -43,6 +43,17 @@ public class MasterConfig { @Value("${master.reserved.memory}") private double masterReservedMemory; + @Value("${master.host.selector:lowerWeight}") + private String hostSelector; + + public String getHostSelector() { + return hostSelector; + } + + public void setHostSelector(String hostSelector) { + this.hostSelector = hostSelector; + } + public int getMasterExecThreads() { return masterExecThreads; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java index 8a803a2d0f..c597dc196a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java @@ -20,12 +20,13 @@ package org.apache.dolphinscheduler.server.master.dispatch; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; -import org.apache.dolphinscheduler.server.master.dispatch.host.RoundRobinHostManager; +import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -44,14 +45,23 @@ public class ExecutorDispatcher implements InitializingBean { @Autowired private NettyExecutorManager nettyExecutorManager; + @Autowired + private MasterConfig masterConfig; + /** * round robin host manager */ @Autowired - private RoundRobinHostManager hostManager; + private HostManager hostManager; + /** + * executor manager + */ private final ConcurrentHashMap> executorManagers; + /** + * constructor + */ public ExecutorDispatcher(){ this.executorManagers = new ConcurrentHashMap<>(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java new file mode 100644 index 0000000000..080ce7a8af --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dispatch.host; + +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; +import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; +import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + + +/** + * round robin host manager + */ +public abstract class CommonHostManager implements HostManager { + + private final Logger logger = LoggerFactory.getLogger(CommonHostManager.class); + + /** + * zookeeperNodeManager + */ + @Autowired + protected ZookeeperNodeManager zookeeperNodeManager; + + /** + * select host + * @param context context + * @return host + */ + @Override + public Host select(ExecutionContext context){ + Host host = new Host(); + Collection nodes = null; + /** + * executor type + */ + ExecutorType executorType = context.getExecutorType(); + switch (executorType){ + case WORKER: + nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup()); + break; + case CLIENT: + break; + default: + throw new IllegalArgumentException("invalid executorType : " + executorType); + + } + if(CollectionUtils.isEmpty(nodes)){ + return host; + } + List candidateHosts = new ArrayList<>(nodes.size()); + nodes.stream().forEach(node -> candidateHosts.add(Host.of(node))); + + return select(candidateHosts); + } + + public abstract Host select(Collection nodes); + + public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) { + this.zookeeperNodeManager = zookeeperNodeManager; + } + + public ZookeeperNodeManager getZookeeperNodeManager() { + return zookeeperNodeManager; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java new file mode 100644 index 0000000000..458a1ee036 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dispatch.host; + +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.AutowireCapableBeanFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * host manager config + */ +@Configuration +public class HostManagerConfig { + + private AutowireCapableBeanFactory beanFactory; + + @Autowired + private MasterConfig masterConfig; + + @Autowired + public HostManagerConfig(AutowireCapableBeanFactory beanFactory) { + this.beanFactory = beanFactory; + } + + @Bean + public HostManager hostManager() { + String hostSelector = masterConfig.getHostSelector(); + HostSelector selector = HostSelector.of(hostSelector); + HostManager hostManager; + switch (selector){ + case RANDOM: + hostManager = new RandomHostManager(); + break; + case ROUNDROBIN: + hostManager = new RoundRobinHostManager(); + break; + case LOWERWEIGHT: + hostManager = new LowerWeightHostManager(); + break; + default: + throw new IllegalArgumentException("unSupport selector " + hostSelector); + } + beanFactory.autowireBean(hostManager); + return hostManager; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java new file mode 100644 index 0000000000..99cae6954c --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dispatch.host; + +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.dolphinscheduler.common.Constants.COMMA; + + +/** + * round robin host manager + */ +public class LowerWeightHostManager extends CommonHostManager { + + private final Logger logger = LoggerFactory.getLogger(LowerWeightHostManager.class); + + /** + * zookeeper registry center + */ + @Autowired + private ZookeeperRegistryCenter registryCenter; + + /** + * round robin host manager + */ + private RoundRobinHostManager roundRobinHostManager; + + /** + * selector + */ + private LowerWeightRoundRobin selector; + + /** + * worker host weights + */ + private ConcurrentHashMap> workerHostWeights; + + /** + * worker group host lock + */ + private Lock lock; + + /** + * executor service + */ + private ScheduledExecutorService executorService; + + @PostConstruct + public void init(){ + this.selector = new LowerWeightRoundRobin(); + this.workerHostWeights = new ConcurrentHashMap<>(); + this.lock = new ReentrantLock(); + this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor")); + this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS); + this.roundRobinHostManager = new RoundRobinHostManager(); + this.roundRobinHostManager.setZookeeperNodeManager(getZookeeperNodeManager()); + } + + @PreDestroy + public void close(){ + this.executorService.shutdownNow(); + } + + /** + * select host + * @param context context + * @return host + */ + @Override + public Host select(ExecutionContext context){ + Set workerHostWeights = getWorkerHostWeights(context.getWorkerGroup()); + if(CollectionUtils.isNotEmpty(workerHostWeights)){ + return selector.select(workerHostWeights).getHost(); + } else{ + return roundRobinHostManager.select(context); + } + } + + @Override + public Host select(Collection nodes) { + throw new UnsupportedOperationException("not support"); + } + + private void syncWorkerHostWeight(Map> workerHostWeights){ + lock.lock(); + try { + workerHostWeights.clear(); + workerHostWeights.putAll(workerHostWeights); + } finally { + lock.unlock(); + } + } + + private Set getWorkerHostWeights(String workerGroup){ + lock.lock(); + try { + return workerHostWeights.get(workerGroup); + } finally { + lock.unlock(); + } + } + + class RefreshResourceTask implements Runnable{ + + @Override + public void run() { + try { + Map> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes(); + Set>> entries = workerGroupNodes.entrySet(); + Map> workerHostWeights = new HashMap<>(); + for(Map.Entry> entry : entries){ + String workerGroup = entry.getKey(); + Set nodes = entry.getValue(); + String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup); + Set hostWeights = new HashSet<>(nodes.size()); + for(String node : nodes){ + String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node); + if(StringUtils.isNotEmpty(heartbeat) && heartbeat.contains(COMMA) && heartbeat.split(COMMA).length == 5){ + String[] parts = heartbeat.split(COMMA); + double cpu = Double.parseDouble(parts[0]); + double memory = Double.parseDouble(parts[1]); + double loadAverage = Double.parseDouble(parts[2]); + HostWeight hostWeight = new HostWeight(Host.of(node), cpu, memory, loadAverage); + hostWeights.add(hostWeight); + } + } + workerHostWeights.put(workerGroup, hostWeights); + } + syncWorkerHostWeight(workerHostWeights); + } catch (Throwable ex){ + logger.error("RefreshResourceTask error", ex); + } + } + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java new file mode 100644 index 0000000000..ef2b6fd22f --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dispatch.host; + +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector; + +import java.util.Collection; + + +/** + * round robin host manager + */ +public class RandomHostManager extends CommonHostManager { + + /** + * selector + */ + private final Selector selector; + + /** + * set round robin + */ + public RandomHostManager(){ + this.selector = new RandomSelector<>(); + } + + @Override + public Host select(Collection nodes) { + return selector.select(nodes); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java index a57363213e..e9fef49ecf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java @@ -17,36 +17,17 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.remote.utils.Host; -import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; -import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; /** * round robin host manager */ -@Service -public class RoundRobinHostManager implements HostManager { - - private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class); - - /** - * zookeeperNodeManager - */ - @Autowired - private ZookeeperNodeManager zookeeperNodeManager; +public class RoundRobinHostManager extends CommonHostManager { /** * selector @@ -60,39 +41,9 @@ public class RoundRobinHostManager implements HostManager { this.selector = new RoundRobinSelector<>(); } - /** - * select host - * @param context context - * @return host - */ @Override - public Host select(ExecutionContext context){ - Host host = new Host(); - Collection nodes = null; - /** - * executor type - */ - ExecutorType executorType = context.getExecutorType(); - switch (executorType){ - case WORKER: - nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup()); - break; - case CLIENT: - break; - default: - throw new IllegalArgumentException("invalid executorType : " + executorType); - - } - if(CollectionUtils.isEmpty(nodes)){ - return host; - } - List candidateHosts = new ArrayList<>(nodes.size()); - nodes.stream().forEach(node -> candidateHosts.add(Host.of(node))); - - /** - * select - */ - return selector.select(candidateHosts); + public Host select(Collection nodes) { + return selector.select(nodes); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java new file mode 100644 index 0000000000..145393e1f0 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dispatch.host.assign; + +/** + * host selector + */ +public enum HostSelector { + + RANDOM, + + ROUNDROBIN, + + LOWERWEIGHT; + + public static HostSelector of(String selector){ + for(HostSelector hs : values()){ + if(hs.name().equalsIgnoreCase(selector)){ + return hs; + } + } + throw new IllegalArgumentException("invalid host selector : " + selector); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java new file mode 100644 index 0000000000..ebceea7b13 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dispatch.host.assign; + +import org.apache.dolphinscheduler.remote.utils.Host; + +/** + * host weight + */ +public class HostWeight { + + private final int CPU_FACTOR = 10; + + private final int MEMORY_FACTOR = 20; + + private final int LOAD_AVERAGE_FACTOR = 70; + + private final Host host; + + private final int weight; + + private int currentWeight; + + public HostWeight(Host host, double cpu, double memory, double loadAverage) { + this.weight = calculateWeight(cpu, memory, loadAverage); + this.host = host ; + this.currentWeight = weight ; + } + + public int getCurrentWeight() { + return currentWeight; + } + + public int getWeight() { + return weight; + } + + public void setCurrentWeight(int currentWeight) { + this.currentWeight = currentWeight; + } + + public Host getHost() { + return host; + } + + @Override + public String toString() { + return "HostWeight{" + + "host=" + host + + ", weight=" + weight + + ", currentWeight=" + currentWeight + + '}'; + } + + private int calculateWeight(double cpu, double memory, double loadAverage){ + return (int)(cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java new file mode 100644 index 0000000000..cadf418f51 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dispatch.host.assign; + +import java.util.Collection; + +/** + * lower weight round robin + */ +public class LowerWeightRoundRobin implements Selector{ + + public HostWeight select(Collection sources){ + int totalWeight = 0; + int lowWeight = 0; + HostWeight lowerNode = null; + for (HostWeight hostWeight : sources) { + totalWeight += hostWeight.getWeight(); + hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight()); + if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight() ) { + lowerNode = hostWeight; + lowWeight = hostWeight.getCurrentWeight(); + } + } + lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight); + return lowerNode; + + } +} + + + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java index 590a25f52c..25355e2925 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java @@ -31,6 +31,7 @@ import org.springframework.stereotype.Service; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; @@ -227,6 +228,10 @@ public class ZookeeperNodeManager implements InitializingBean { } } + public Map> getWorkerGroupNodes(){ + return Collections.unmodifiableMap(workerGroupNodes); + } + /** * get worker group nodes * @param workerGroup diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java new file mode 100644 index 0000000000..10936a6329 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.master.dispatch.host; + +import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; + + +public class LowerWeightRoundRobinTest { + + + @Test + public void testSelect(){ + Collection sources = new ArrayList<>(); + sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84)); + sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24)); + sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15)); + System.out.println(sources); + LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); + for(int i = 0; i < 100; i ++){ + System.out.println(roundRobin.select(sources)); + } + } +}