From d6a32ac65225fa4f8d9a80c09c5ea8a29e392dac Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Mon, 10 Aug 2020 19:05:21 +0800 Subject: [PATCH] [Feature#3234][cluster]enhanced load balancing (#3235) * [Future#3234][cluster]enhanced load balancing weight-based load balancing algorithm this close # 3234 * remove useless parameter * code smell * load balancing according to work group * add smooth weight round robin * remove unused constants * perfect unit test * code smell * code smell * add work weight config * fix config error * add weight docs to readme.md --- docker/build/README.md | 4 + docker/build/README_zh_CN.md | 4 + .../dolphinscheduler/worker.properties.tpl | 5 +- docker/build/startup-init-conf.sh | 1 + docker/docker-swarm/docker-compose.yml | 1 + docker/docker-swarm/docker-stack.yml | 1 + .../configmap-dolphinscheduler-worker.yaml | 1 + .../statefulset-dolphinscheduler-worker.yaml | 5 + .../kubernetes/dolphinscheduler/values.yaml | 1 + .../dolphinscheduler/remote/utils/Host.java | 63 ++++++++- .../dispatch/host/CommonHostManager.java | 7 +- .../dispatch/host/RandomHostManager.java | 2 +- .../dispatch/host/RoundRobinHostManager.java | 2 +- .../dispatch/host/assign/RandomSelector.java | 45 +++++-- .../host/assign/RoundRobinSelector.java | 120 ++++++++++++++++-- .../server/worker/config/WorkerConfig.java | 12 ++ .../worker/registry/WorkerRegistry.java | 28 ++-- .../src/main/resources/worker.properties | 3 + .../host/assign/RandomSelectorTest.java | 16 ++- .../host/assign/RoundRobinSelectorTest.java | 51 +++++--- 20 files changed, 303 insertions(+), 69 deletions(-) diff --git a/docker/build/README.md b/docker/build/README.md index bc516bc214..951f2d6b51 100644 --- a/docker/build/README.md +++ b/docker/build/README.md @@ -238,6 +238,10 @@ This environment variable sets max cpu load avg for `worker-server`. The default This environment variable sets reserved memory for `worker-server`. The default value is `0.1`. +**`WORKER_WEIGHT`** + +This environment variable sets port for `worker-server`. The default value is `100`. + **`WORKER_LISTEN_PORT`** This environment variable sets port for `worker-server`. The default value is `1234`. diff --git a/docker/build/README_zh_CN.md b/docker/build/README_zh_CN.md index c2affc0691..c4339a945c 100644 --- a/docker/build/README_zh_CN.md +++ b/docker/build/README_zh_CN.md @@ -238,6 +238,10 @@ Dolphin Scheduler映像使用了几个容易遗漏的环境变量。虽然这些 配置`worker-server`的保留内存,默认值 `0.1`。 +**`WORKER_WEIGHT`** + +配置`worker-server`的权重,默认之`100`。 + **`WORKER_LISTEN_PORT`** 配置`worker-server`的端口,默认值 `1234`。 diff --git a/docker/build/conf/dolphinscheduler/worker.properties.tpl b/docker/build/conf/dolphinscheduler/worker.properties.tpl index d596be94bc..83097dd9a4 100644 --- a/docker/build/conf/dolphinscheduler/worker.properties.tpl +++ b/docker/build/conf/dolphinscheduler/worker.properties.tpl @@ -34,4 +34,7 @@ worker.reserved.memory=${WORKER_RESERVED_MEMORY} #worker.listen.port=${WORKER_LISTEN_PORT} # default worker group -#worker.group=${WORKER_GROUP} \ No newline at end of file +#worker.groups=${WORKER_GROUP} + +# default worker weight +#worker.weight=${WORKER_WEIGHT} \ No newline at end of file diff --git a/docker/build/startup-init-conf.sh b/docker/build/startup-init-conf.sh index 73fdad6798..d5cd86f1a4 100644 --- a/docker/build/startup-init-conf.sh +++ b/docker/build/startup-init-conf.sh @@ -74,6 +74,7 @@ export WORKER_MAX_CPULOAD_AVG=${WORKER_MAX_CPULOAD_AVG:-"100"} export WORKER_RESERVED_MEMORY=${WORKER_RESERVED_MEMORY:-"0.1"} export WORKER_LISTEN_PORT=${WORKER_LISTEN_PORT:-"1234"} export WORKER_GROUP=${WORKER_GROUP:-"default"} +export WORKER_WEIGHT=${WORKER_WEIGHT:-"100"} #============================================================================ # Alert Server diff --git a/docker/docker-swarm/docker-compose.yml b/docker/docker-swarm/docker-compose.yml index 51eb0aeaa5..349b3ad790 100644 --- a/docker/docker-swarm/docker-compose.yml +++ b/docker/docker-swarm/docker-compose.yml @@ -187,6 +187,7 @@ services: WORKER_MAX_CPULOAD_AVG: "100" WORKER_RESERVED_MEMORY: "0.1" WORKER_GROUP: "default" + WORKER_WEIGHT: "100" DOLPHINSCHEDULER_DATA_BASEDIR_PATH: "/tmp/dolphinscheduler" DATABASE_HOST: dolphinscheduler-postgresql DATABASE_PORT: 5432 diff --git a/docker/docker-swarm/docker-stack.yml b/docker/docker-swarm/docker-stack.yml index ca9f7c88c7..dff4a47b2c 100644 --- a/docker/docker-swarm/docker-stack.yml +++ b/docker/docker-swarm/docker-stack.yml @@ -187,6 +187,7 @@ services: WORKER_MAX_CPULOAD_AVG: "100" WORKER_RESERVED_MEMORY: "0.1" WORKER_GROUP: "default" + WORKER_WEIGHT: "100" DOLPHINSCHEDULER_DATA_BASEDIR_PATH: "/tmp/dolphinscheduler" DATABASE_HOST: dolphinscheduler-postgresql DATABASE_PORT: 5432 diff --git a/docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-worker.yaml b/docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-worker.yaml index 1e08b67b53..569341c225 100644 --- a/docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-worker.yaml +++ b/docker/kubernetes/dolphinscheduler/templates/configmap-dolphinscheduler-worker.yaml @@ -31,6 +31,7 @@ data: WORKER_RESERVED_MEMORY: {{ .Values.worker.configmap.WORKER_RESERVED_MEMORY | quote }} WORKER_LISTEN_PORT: {{ .Values.worker.configmap.WORKER_LISTEN_PORT | quote }} WORKER_GROUP: {{ .Values.worker.configmap.WORKER_GROUP | quote }} + WORKER_WEIGHT: {{ .Values.worker.configmap.WORKER_WEIGHT | quote }} DOLPHINSCHEDULER_DATA_BASEDIR_PATH: {{ include "dolphinscheduler.worker.base.dir" . | quote }} dolphinscheduler_env.sh: |- {{- range .Values.worker.configmap.DOLPHINSCHEDULER_ENV }} diff --git a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml index fd32a955a2..ae562cc62b 100644 --- a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml +++ b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml @@ -162,6 +162,11 @@ spec: configMapKeyRef: name: {{ include "dolphinscheduler.fullname" . }}-worker key: WORKER_GROUP + - name: WORKER_WEUGHT + valueFrom: + configMapKeyRef: + name: {{ include "dolphinscheduler.fullname" . }}-worker + key: WORKER_WEIGHT - name: DOLPHINSCHEDULER_DATA_BASEDIR_PATH valueFrom: configMapKeyRef: diff --git a/docker/kubernetes/dolphinscheduler/values.yaml b/docker/kubernetes/dolphinscheduler/values.yaml index 8acb1d326a..3261b08401 100644 --- a/docker/kubernetes/dolphinscheduler/values.yaml +++ b/docker/kubernetes/dolphinscheduler/values.yaml @@ -201,6 +201,7 @@ worker: WORKER_RESERVED_MEMORY: "0.1" WORKER_LISTEN_PORT: "1234" WORKER_GROUP: "default" + WORKER_WEIGHT: "100" DOLPHINSCHEDULER_DATA_BASEDIR_PATH: "/tmp/dolphinscheduler" DOLPHINSCHEDULER_ENV: - "export HADOOP_HOME=/opt/soft/hadoop" diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java index e9eaabcad6..b905a9fea8 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java @@ -20,7 +20,7 @@ import java.io.Serializable; import java.util.Objects; /** - * server address + * server address */ public class Host implements Serializable { @@ -39,6 +39,16 @@ public class Host implements Serializable { */ private int port; + /** + * weight + */ + private int weight; + + /** + * workGroup + */ + private String workGroup; + public Host() { } @@ -48,6 +58,21 @@ public class Host implements Serializable { this.address = ip + ":" + port; } + public Host(String ip, int port, int weight) { + this.ip = ip; + this.port = port; + this.address = ip + ":" + port; + this.weight = weight; + } + + public Host(String ip, int port, int weight,String workGroup) { + this.ip = ip; + this.port = port; + this.address = ip + ":" + port; + this.weight = weight; + this.workGroup=workGroup; + } + public String getAddress() { return address; } @@ -65,6 +90,14 @@ public class Host implements Serializable { this.address = ip + ":" + port; } + public int getWeight() { + return weight; + } + + public void setWeight(int weight) { + this.weight = weight; + } + public int getPort() { return port; } @@ -74,31 +107,47 @@ public class Host implements Serializable { this.address = ip + ":" + port; } + public String getWorkGroup() { + return workGroup; + } + + public void setWorkGroup(String workGroup) { + this.workGroup = workGroup; + } + /** * address convert host + * * @param address address * @return host */ - public static Host of(String address){ - if(address == null) { + public static Host of(String address) { + if (address == null) { throw new IllegalArgumentException("Host : address is null."); } String[] parts = address.split(":"); - if (parts.length != 2) { + if (parts.length < 2) { throw new IllegalArgumentException(String.format("Host : %s illegal.", address)); } - Host host = new Host(parts[0], Integer.parseInt(parts[1])); + Host host = null; + if (parts.length == 2) { + host = new Host(parts[0], Integer.parseInt(parts[1])); + } + if (parts.length == 3) { + host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2])); + } return host; } /** * whether old version + * * @param address address * @return old version is true , otherwise is false */ - public static Boolean isOldVersion(String address){ + public static Boolean isOldVersion(String address) { String[] parts = address.split(":"); - return parts.length != 2 ? true : false; + return parts.length != 2 && parts.length != 3; } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java index 58006bf7f7..4a3d4bd9f1 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java @@ -71,7 +71,12 @@ public abstract class CommonHostManager implements HostManager { return host; } List candidateHosts = new ArrayList<>(nodes.size()); - nodes.stream().forEach(node -> candidateHosts.add(Host.of(node))); + nodes.forEach(node -> { + Host nodeHost=Host.of(node); + nodeHost.setWorkGroup(context.getWorkerGroup()); + candidateHosts.add(nodeHost); + }); + return select(candidateHosts); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java index ef2b6fd22f..241906a7b4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java @@ -38,7 +38,7 @@ public class RandomHostManager extends CommonHostManager { * set round robin */ public RandomHostManager(){ - this.selector = new RandomSelector<>(); + this.selector = new RandomSelector(); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java index e9fef49ecf..ec1945e563 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java @@ -38,7 +38,7 @@ public class RoundRobinHostManager extends CommonHostManager { * set round robin */ public RoundRobinHostManager(){ - this.selector = new RoundRobinSelector<>(); + this.selector = new RoundRobinSelector(); } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java index e00d6f7a65..6975127b9a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java @@ -17,27 +17,44 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; +import org.apache.dolphinscheduler.remote.utils.Host; + +import java.util.ArrayList; import java.util.Collection; -import java.util.Random; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; /** * random selector - * @param T */ -public class RandomSelector extends AbstractSelector { - - private final Random random = new Random(); +public class RandomSelector extends AbstractSelector { @Override - public T doSelect(final Collection source) { - - int size = source.size(); - /** - * random select - */ - int randomIndex = random.nextInt(size); - - return (T) source.toArray()[randomIndex]; + public Host doSelect(final Collection source) { + + List hosts = new ArrayList<>(source); + int size = hosts.size(); + int[] weights = new int[size]; + int totalWeight = 0; + int index = 0; + + for (Host host : hosts) { + totalWeight += host.getWeight(); + weights[index] = host.getWeight(); + index++; + } + + if (totalWeight > 0) { + int offset = ThreadLocalRandom.current().nextInt(totalWeight); + + for (int i = 0; i < size; i++) { + offset -= weights[i]; + if (offset < 0) { + return hosts.get(i); + } + } + } + return hosts.get(ThreadLocalRandom.current().nextInt(size)); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java index 06e469fe6b..34a79ac6e8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java @@ -16,27 +16,123 @@ */ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; +import org.apache.dolphinscheduler.remote.utils.Host; import org.springframework.stereotype.Service; -import java.util.Collection; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; /** - * round robin selector - * @param T + * Smooth Weight Round Robin */ @Service -public class RoundRobinSelector extends AbstractSelector { +public class RoundRobinSelector extends AbstractSelector { + + private ConcurrentMap> workGroupWeightMap = new ConcurrentHashMap<>(); + + private static final int RECYCLE_PERIOD = 100000; + + private AtomicBoolean updateLock = new AtomicBoolean(); + + protected static class WeightedRoundRobin { + private int weight; + private AtomicLong current = new AtomicLong(0); + private long lastUpdate; + + int getWeight() { + return weight; + } + + void setWeight(int weight) { + this.weight = weight; + current.set(0); + } + + long increaseCurrent() { + return current.addAndGet(weight); + } + + void sel(int total) { + current.addAndGet(-1L * total); + } + + long getLastUpdate() { + return lastUpdate; + } + + void setLastUpdate(long lastUpdate) { + this.lastUpdate = lastUpdate; + } + + } - private final AtomicInteger index = new AtomicInteger(0); @Override - public T doSelect(Collection source) { + public Host doSelect(Collection source) { + + List hosts = new ArrayList<>(source); + String key = hosts.get(0).getWorkGroup(); + ConcurrentMap map = workGroupWeightMap.get(key); + if (map == null) { + workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>()); + map = workGroupWeightMap.get(key); + } + + int totalWeight = 0; + long maxCurrent = Long.MIN_VALUE; + long now = System.currentTimeMillis(); + Host selectedHost = null; + WeightedRoundRobin selectWeightRoundRobin = null; + + for (Host host : hosts) { + String workGroupHost = host.getWorkGroup() + host.getAddress(); + WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost); + int weight = host.getWeight(); + if (weight < 0) { + weight = 0; + } + + if (weightedRoundRobin == null) { + weightedRoundRobin = new WeightedRoundRobin(); + // set weight + weightedRoundRobin.setWeight(weight); + map.putIfAbsent(workGroupHost, weightedRoundRobin); + weightedRoundRobin = map.get(workGroupHost); + } + if (weight != weightedRoundRobin.getWeight()) { + weightedRoundRobin.setWeight(weight); + } + + long cur = weightedRoundRobin.increaseCurrent(); + weightedRoundRobin.setLastUpdate(now); + if (cur > maxCurrent) { + maxCurrent = cur; + selectedHost = host; + selectWeightRoundRobin = weightedRoundRobin; + } + + totalWeight += weight; + } + + + if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) { + try { + ConcurrentMap newMap = new ConcurrentHashMap<>(map); + newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD); + workGroupWeightMap.put(key, newMap); + } finally { + updateLock.set(false); + } + } + + if (selectedHost != null) { + selectWeightRoundRobin.sel(totalWeight); + return selectedHost; + } - int size = source.size(); - /** - * round robin - */ - return (T) source.toArray()[index.getAndIncrement() % size]; + return hosts.get(0); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 2dedaf8e1b..fa97403527 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -49,6 +49,9 @@ public class WorkerConfig { @Value("${worker.listen.port: 1234}") private int listenPort; + @Value("${worker.weight:100}") + private int weight; + public int getListenPort() { return listenPort; } @@ -107,4 +110,13 @@ public class WorkerConfig { public void setWorkerMaxCpuloadAvg(int workerMaxCpuloadAvg) { this.workerMaxCpuloadAvg = workerMaxCpuloadAvg; } + + + public int getWeight() { + return weight; + } + + public void setWeight(int weight) { + this.weight = weight; + } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 5e400e1e1f..6204edcba4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -16,9 +16,6 @@ */ package org.apache.dolphinscheduler.server.worker.registry; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.common.Constants.SLASH; - import java.util.Date; import java.util.Set; import java.util.concurrent.Executors; @@ -44,9 +41,11 @@ import org.springframework.stereotype.Service; import com.google.common.collect.Sets; +import static org.apache.dolphinscheduler.common.Constants.*; + /** - * worker registry + * worker registry */ @Service public class WorkerRegistry { @@ -54,13 +53,13 @@ public class WorkerRegistry { private final Logger logger = LoggerFactory.getLogger(WorkerRegistry.class); /** - * zookeeper registry center + * zookeeper registry center */ @Autowired private ZookeeperRegistryCenter zookeeperRegistryCenter; /** - * worker config + * worker config */ @Autowired private WorkerConfig workerConfig; @@ -86,7 +85,7 @@ public class WorkerRegistry { } /** - * registry + * registry */ public void registry() { String address = NetUtils.getHost(); @@ -122,7 +121,7 @@ public class WorkerRegistry { } /** - * remove registry info + * remove registry info */ public void unRegistry() { String address = getLocalAddress(); @@ -135,13 +134,14 @@ public class WorkerRegistry { } /** - * get worker path + * get worker path */ private Set getWorkerZkPaths() { Set workerZkPaths = Sets.newHashSet(); String address = getLocalAddress(); String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); + String weight = getWorkerWeight(); for (String workGroup : this.workerGroups) { StringBuilder workerZkPathBuilder = new StringBuilder(100); @@ -152,15 +152,23 @@ public class WorkerRegistry { // trim and lower case is need workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH); workerZkPathBuilder.append(address); + workerZkPathBuilder.append(weight).append(SLASH); workerZkPaths.add(workerZkPathBuilder.toString()); } return workerZkPaths; } /** - * get local address + * get local address */ private String getLocalAddress() { return NetUtils.getHost() + ":" + workerConfig.getListenPort(); } + + /** + * get Worker Weight + */ + private String getWorkerWeight() { + return ":" + workerConfig.getWeight(); + } } diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties index 0365c8a9c9..9fba30c147 100644 --- a/dolphinscheduler-server/src/main/resources/worker.properties +++ b/dolphinscheduler-server/src/main/resources/worker.properties @@ -32,3 +32,6 @@ # default worker group #worker.groups=default + +# default worker weight +#work.weight=100 diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java index a14ea32e4e..f25a227947 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java @@ -16,7 +16,9 @@ */ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; +import org.apache.commons.lang.ObjectUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.remote.utils.Host; import org.junit.Assert; import org.junit.Test; @@ -36,16 +38,16 @@ public class RandomSelectorTest { @Test public void testSelect1(){ - RandomSelector selector = new RandomSelector(); - String result = selector.select(Arrays.asList("1")); - Assert.assertTrue(StringUtils.isNotEmpty(result)); - Assert.assertTrue(result.equalsIgnoreCase("1")); + RandomSelector selector = new RandomSelector(); + Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.2",80,20))); + Assert.assertNotNull(result); } @Test public void testSelect(){ - RandomSelector selector = new RandomSelector(); - int result = selector.select(Arrays.asList(1,2,3,4,5,6,7)); - Assert.assertTrue(result >= 1 && result <= 7); + RandomSelector selector = new RandomSelector(); + Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.1",80,20))); + Assert.assertNotNull(result); + } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java index adc55a4774..ed62caaa2c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.remote.utils.Host; import org.junit.Assert; import org.junit.Test; @@ -30,26 +31,46 @@ import java.util.List; public class RoundRobinSelectorTest { @Test(expected = IllegalArgumentException.class) - public void testSelectWithIllegalArgumentException(){ + public void testSelectWithIllegalArgumentException() { RoundRobinSelector selector = new RoundRobinSelector(); selector.select(Collections.EMPTY_LIST); } @Test - public void testSelect1(){ - RoundRobinSelector selector = new RoundRobinSelector(); - String result = selector.select(Arrays.asList("1")); - Assert.assertTrue(StringUtils.isNotEmpty(result)); - Assert.assertTrue(result.equalsIgnoreCase("1")); - } + public void testSelect1() { + RoundRobinSelector selector = new RoundRobinSelector(); + Host result = null; + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.2", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + // add new host + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.2", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.1",result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.3",result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.1",result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.2",result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.1",result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.3",result.getIp()); + // remove host3 + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.1",result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.2",result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + Assert.assertEquals("192.168.1.1",result.getIp()); - @Test - public void testSelect(){ - RoundRobinSelector selector = new RoundRobinSelector(); - List sources = Arrays.asList(1, 2, 3, 4, 5, 6, 7); - int result = selector.select(sources); - Assert.assertTrue(result == 1); - int result2 = selector.select(Arrays.asList(1,2,3,4,5,6,7)); - Assert.assertTrue(result2 == 2); } + }