Browse Source

Refactor worker (#2044)

* Refactor worker (#10)

* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify (#2004)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment (#2006)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type (#2012)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* Refactor worker (#2018)

* Refactor worker (#7)

* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify (#2004)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment (#2006)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type (#2012)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* Refactor worker (#8)

* Refactor worker (#2000)

* Refactor worker (#2)

* Refactor worker (#1993)

* Refactor worker (#1)

* add TaskResponseProcessor (#1983)

* 1, master persistent task 2. extract  master and worker communication model (#1992)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* TaskExecutionContext create modify (#1994)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* updates

* add- register processor

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* buildAckCommand taskInstanceId not set modify (#2002)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify (#2004)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment (#2006)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type (#2012)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* add kill command

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* add TaskInstanceCacheManager receive Worker report result,modify master polling db transfrom to cache (#2021)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>

* refactor heartbeat logic

* update registry and add worker group

* add worker group

* add lowerWeight host manager

Co-authored-by: qiaozhanwei <qiaozhanwei@outlook.com>
pull/2/head
Tboy 5 years ago committed by GitHub
parent
commit
9d36eaf6df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  2. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java
  3. 88
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
  4. 64
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java
  5. 171
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  6. 48
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
  7. 55
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
  8. 39
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
  9. 73
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
  10. 46
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java
  11. 5
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java
  12. 43
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -43,6 +43,17 @@ public class MasterConfig {
@Value("${master.reserved.memory}") @Value("${master.reserved.memory}")
private double masterReservedMemory; private double masterReservedMemory;
@Value("${master.host.selector:lowerWeight}")
private String hostSelector;
public String getHostSelector() {
return hostSelector;
}
public void setHostSelector(String hostSelector) {
this.hostSelector = hostSelector;
}
public int getMasterExecThreads() { public int getMasterExecThreads() {
return masterExecThreads; return masterExecThreads;
} }

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcher.java

@ -20,12 +20,13 @@ package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.ExecutorManager;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.dispatch.host.RoundRobinHostManager; import org.apache.dolphinscheduler.server.master.dispatch.host.HostManager;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -44,14 +45,23 @@ public class ExecutorDispatcher implements InitializingBean {
@Autowired @Autowired
private NettyExecutorManager nettyExecutorManager; private NettyExecutorManager nettyExecutorManager;
@Autowired
private MasterConfig masterConfig;
/** /**
* round robin host manager * round robin host manager
*/ */
@Autowired @Autowired
private RoundRobinHostManager hostManager; private HostManager hostManager;
/**
* executor manager
*/
private final ConcurrentHashMap<ExecutorType, ExecutorManager<Boolean>> executorManagers; private final ConcurrentHashMap<ExecutorType, ExecutorManager<Boolean>> executorManagers;
/**
* constructor
*/
public ExecutorDispatcher(){ public ExecutorDispatcher(){
this.executorManagers = new ConcurrentHashMap<>(); this.executorManagers = new ConcurrentHashMap<>();
} }

88
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* round robin host manager
*/
public abstract class CommonHostManager implements HostManager {
private final Logger logger = LoggerFactory.getLogger(CommonHostManager.class);
/**
* zookeeperNodeManager
*/
@Autowired
protected ZookeeperNodeManager zookeeperNodeManager;
/**
* select host
* @param context context
* @return host
*/
@Override
public Host select(ExecutionContext context){
Host host = new Host();
Collection<String> nodes = null;
/**
* executor type
*/
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
break;
case CLIENT:
break;
default:
throw new IllegalArgumentException("invalid executorType : " + executorType);
}
if(CollectionUtils.isEmpty(nodes)){
return host;
}
List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
return select(candidateHosts);
}
public abstract Host select(Collection<Host> nodes);
public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) {
this.zookeeperNodeManager = zookeeperNodeManager;
}
public ZookeeperNodeManager getZookeeperNodeManager() {
return zookeeperNodeManager;
}
}

64
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostSelector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* host manager config
*/
@Configuration
public class HostManagerConfig {
private AutowireCapableBeanFactory beanFactory;
@Autowired
private MasterConfig masterConfig;
@Autowired
public HostManagerConfig(AutowireCapableBeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
@Bean
public HostManager hostManager() {
String hostSelector = masterConfig.getHostSelector();
HostSelector selector = HostSelector.of(hostSelector);
HostManager hostManager;
switch (selector){
case RANDOM:
hostManager = new RandomHostManager();
break;
case ROUNDROBIN:
hostManager = new RoundRobinHostManager();
break;
case LOWERWEIGHT:
hostManager = new LowerWeightHostManager();
break;
default:
throw new IllegalArgumentException("unSupport selector " + hostSelector);
}
beanFactory.autowireBean(hostManager);
return hostManager;
}
}

171
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
/**
* round robin host manager
*/
public class LowerWeightHostManager extends CommonHostManager {
private final Logger logger = LoggerFactory.getLogger(LowerWeightHostManager.class);
/**
* zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter registryCenter;
/**
* round robin host manager
*/
private RoundRobinHostManager roundRobinHostManager;
/**
* selector
*/
private LowerWeightRoundRobin selector;
/**
* worker host weights
*/
private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeights;
/**
* worker group host lock
*/
private Lock lock;
/**
* executor service
*/
private ScheduledExecutorService executorService;
@PostConstruct
public void init(){
this.selector = new LowerWeightRoundRobin();
this.workerHostWeights = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS);
this.roundRobinHostManager = new RoundRobinHostManager();
this.roundRobinHostManager.setZookeeperNodeManager(getZookeeperNodeManager());
}
@PreDestroy
public void close(){
this.executorService.shutdownNow();
}
/**
* select host
* @param context context
* @return host
*/
@Override
public Host select(ExecutionContext context){
Set<HostWeight> workerHostWeights = getWorkerHostWeights(context.getWorkerGroup());
if(CollectionUtils.isNotEmpty(workerHostWeights)){
return selector.select(workerHostWeights).getHost();
} else{
return roundRobinHostManager.select(context);
}
}
@Override
public Host select(Collection<Host> nodes) {
throw new UnsupportedOperationException("not support");
}
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights){
lock.lock();
try {
workerHostWeights.clear();
workerHostWeights.putAll(workerHostWeights);
} finally {
lock.unlock();
}
}
private Set<HostWeight> getWorkerHostWeights(String workerGroup){
lock.lock();
try {
return workerHostWeights.get(workerGroup);
} finally {
lock.unlock();
}
}
class RefreshResourceTask implements Runnable{
@Override
public void run() {
try {
Map<String, Set<String>> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes();
Set<Map.Entry<String, Set<String>>> entries = workerGroupNodes.entrySet();
Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
for(Map.Entry<String, Set<String>> entry : entries){
String workerGroup = entry.getKey();
Set<String> nodes = entry.getValue();
String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup);
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for(String node : nodes){
String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
if(StringUtils.isNotEmpty(heartbeat) && heartbeat.contains(COMMA) && heartbeat.split(COMMA).length == 5){
String[] parts = heartbeat.split(COMMA);
double cpu = Double.parseDouble(parts[0]);
double memory = Double.parseDouble(parts[1]);
double loadAverage = Double.parseDouble(parts[2]);
HostWeight hostWeight = new HostWeight(Host.of(node), cpu, memory, loadAverage);
hostWeights.add(hostWeight);
}
}
workerHostWeights.put(workerGroup, hostWeights);
}
syncWorkerHostWeight(workerHostWeights);
} catch (Throwable ex){
logger.error("RefreshResourceTask error", ex);
}
}
}
}

48
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import java.util.Collection;
/**
* round robin host manager
*/
public class RandomHostManager extends CommonHostManager {
/**
* selector
*/
private final Selector<Host> selector;
/**
* set round robin
*/
public RandomHostManager(){
this.selector = new RandomSelector<>();
}
@Override
public Host select(Collection<Host> nodes) {
return selector.select(nodes);
}
}

55
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java

@ -17,36 +17,17 @@
package org.apache.dolphinscheduler.server.master.dispatch.host; package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List;
/** /**
* round robin host manager * round robin host manager
*/ */
@Service public class RoundRobinHostManager extends CommonHostManager {
public class RoundRobinHostManager implements HostManager {
private final Logger logger = LoggerFactory.getLogger(RoundRobinHostManager.class);
/**
* zookeeperNodeManager
*/
@Autowired
private ZookeeperNodeManager zookeeperNodeManager;
/** /**
* selector * selector
@ -60,39 +41,9 @@ public class RoundRobinHostManager implements HostManager {
this.selector = new RoundRobinSelector<>(); this.selector = new RoundRobinSelector<>();
} }
/**
* select host
* @param context context
* @return host
*/
@Override @Override
public Host select(ExecutionContext context){ public Host select(Collection<Host> nodes) {
Host host = new Host(); return selector.select(nodes);
Collection<String> nodes = null;
/**
* executor type
*/
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
break;
case CLIENT:
break;
default:
throw new IllegalArgumentException("invalid executorType : " + executorType);
}
if(CollectionUtils.isEmpty(nodes)){
return host;
}
List<Host> candidateHosts = new ArrayList<>(nodes.size());
nodes.stream().forEach(node -> candidateHosts.add(Host.of(node)));
/**
* select
*/
return selector.select(candidateHosts);
} }
} }

39
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
/**
* host selector
*/
public enum HostSelector {
RANDOM,
ROUNDROBIN,
LOWERWEIGHT;
public static HostSelector of(String selector){
for(HostSelector hs : values()){
if(hs.name().equalsIgnoreCase(selector)){
return hs;
}
}
throw new IllegalArgumentException("invalid host selector : " + selector);
}
}

73
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
/**
* host weight
*/
public class HostWeight {
private final int CPU_FACTOR = 10;
private final int MEMORY_FACTOR = 20;
private final int LOAD_AVERAGE_FACTOR = 70;
private final Host host;
private final int weight;
private int currentWeight;
public HostWeight(Host host, double cpu, double memory, double loadAverage) {
this.weight = calculateWeight(cpu, memory, loadAverage);
this.host = host ;
this.currentWeight = weight ;
}
public int getCurrentWeight() {
return currentWeight;
}
public int getWeight() {
return weight;
}
public void setCurrentWeight(int currentWeight) {
this.currentWeight = currentWeight;
}
public Host getHost() {
return host;
}
@Override
public String toString() {
return "HostWeight{" +
"host=" + host +
", weight=" + weight +
", currentWeight=" + currentWeight +
'}';
}
private int calculateWeight(double cpu, double memory, double loadAverage){
return (int)(cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR);
}
}

46
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobin.java

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
/**
* lower weight round robin
*/
public class LowerWeightRoundRobin implements Selector<HostWeight>{
public HostWeight select(Collection<HostWeight> sources){
int totalWeight = 0;
int lowWeight = 0;
HostWeight lowerNode = null;
for (HostWeight hostWeight : sources) {
totalWeight += hostWeight.getWeight();
hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight() ) {
lowerNode = hostWeight;
lowWeight = hostWeight.getCurrentWeight();
}
}
lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
return lowerNode;
}
}

5
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java

@ -31,6 +31,7 @@ import org.springframework.stereotype.Service;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -227,6 +228,10 @@ public class ZookeeperNodeManager implements InitializingBean {
} }
} }
public Map<String, Set<String>> getWorkerGroupNodes(){
return Collections.unmodifiableMap(workerGroupNodes);
}
/** /**
* get worker group nodes * get worker group nodes
* @param workerGroup * @param workerGroup

43
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightRoundRobinTest.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
public class LowerWeightRoundRobinTest {
@Test
public void testSelect(){
Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24));
sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15));
System.out.println(sources);
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
for(int i = 0; i < 100; i ++){
System.out.println(roundRobin.select(sources));
}
}
}
Loading…
Cancel
Save