From db663a13a37c6bc4e47286ebbc8efb2f20efa57b Mon Sep 17 00:00:00 2001 From: CalvinKirs Date: Tue, 22 Sep 2020 17:39:39 +0800 Subject: [PATCH] [Improvement][remote]load balance warm up (#3770) * [Improvement][remote]load balance warm up * reformat code * reformat code * code smell * code smell * add test * add test * add test * add test * fix bug * fix bug * add docs * add host test * add host test * add host test * add docs * code reformat * code reformat --- .../remote/utils/Constants.java | 5 ++ .../dolphinscheduler/remote/utils/Host.java | 48 ++++++++++--- .../dispatch/host/assign/HostWeight.java | 35 +++++++--- .../worker/registry/WorkerRegistry.java | 44 +++++++----- .../assign/LowerWeightRoundRobinTest.java | 51 +++++++++++--- .../host/assign/RandomSelectorTest.java | 16 ++--- .../host/assign/RoundRobinSelectorTest.java | 70 ++++++++++++------- .../server/utils/HostTest.java | 43 ++++++++++++ pom.xml | 1 + 9 files changed, 232 insertions(+), 81 deletions(-) create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java index 370467f6ca..91d4ac245e 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java @@ -53,4 +53,9 @@ public class Constants { */ public static final String OS_NAME = System.getProperty("os.name"); + /** + * warm up time + */ + public static final int WARM_UP_TIME = 10 * 60 * 1000; + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java index b905a9fea8..c18d02f09a 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.remote.utils; import java.io.Serializable; @@ -44,6 +45,11 @@ public class Host implements Serializable { */ private int weight; + /** + * startTime + */ + private long startTime; + /** * workGroup */ @@ -58,19 +64,21 @@ public class Host implements Serializable { this.address = ip + ":" + port; } - public Host(String ip, int port, int weight) { + public Host(String ip, int port, int weight, long startTime) { this.ip = ip; this.port = port; this.address = ip + ":" + port; - this.weight = weight; + this.weight = getWarmUpWeight(weight, startTime); + this.startTime = startTime; } - public Host(String ip, int port, int weight,String workGroup) { + public Host(String ip, int port, int weight, long startTime, String workGroup) { this.ip = ip; this.port = port; this.address = ip + ":" + port; - this.weight = weight; - this.workGroup=workGroup; + this.weight = getWarmUpWeight(weight, startTime); + this.workGroup = workGroup; + this.startTime = startTime; } public String getAddress() { @@ -98,6 +106,14 @@ public class Host implements Serializable { this.weight = weight; } + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + public int getPort() { return port; } @@ -133,8 +149,8 @@ public class Host implements Serializable { if (parts.length == 2) { host = new Host(parts[0], Integer.parseInt(parts[1])); } - if (parts.length == 3) { - host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2])); + if (parts.length == 4) { + host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]), Long.parseLong(parts[3])); } return host; } @@ -169,8 +185,20 @@ public class Host implements Serializable { @Override public String toString() { - return "Host{" + - "address='" + address + '\'' + - '}'; + return "Host{" + + "address='" + address + '\'' + + '}'; + } + + /** + * warm up + */ + private int getWarmUpWeight(int weight, long startTime) { + long uptime = System.currentTimeMillis() - startTime; + //If the warm-up is not over, reduce the weight + if (uptime > 0 && uptime < Constants.WARM_UP_TIME) { + return (int) (weight * ((float) uptime / Constants.WARM_UP_TIME)); + } + return weight; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java index ebceea7b13..298a62a6d9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; +import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.remote.utils.Host; /** @@ -37,9 +38,9 @@ public class HostWeight { private int currentWeight; public HostWeight(Host host, double cpu, double memory, double loadAverage) { - this.weight = calculateWeight(cpu, memory, loadAverage); - this.host = host ; - this.currentWeight = weight ; + this.weight = getWeight(cpu, memory, loadAverage, host); + this.host = host; + this.currentWeight = weight; } public int getCurrentWeight() { @@ -60,14 +61,28 @@ public class HostWeight { @Override public String toString() { - return "HostWeight{" + - "host=" + host + - ", weight=" + weight + - ", currentWeight=" + currentWeight + - '}'; + return "HostWeight{" + + "host=" + host + + ", weight=" + weight + + ", currentWeight=" + currentWeight + + '}'; } - private int calculateWeight(double cpu, double memory, double loadAverage){ - return (int)(cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR); + private int getWeight(double cpu, double memory, double loadAverage, Host host) { + int calculateWeight = (int) (cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR); + return getWarmUpWeight(host, calculateWeight); + + } + + /** + * If the warm-up is not over, add the weight + */ + private int getWarmUpWeight(Host host, int weight) { + long startTime = host.getStartTime(); + long uptime = System.currentTimeMillis() - startTime; + if (uptime > 0 && uptime < Constants.WARM_UP_TIME) { + return (int) ((weight * Constants.WARM_UP_TIME) / uptime); + } + return weight; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 36998fad63..904ea3a807 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -14,19 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.worker.registry; -import java.util.Date; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import static org.apache.dolphinscheduler.common.Constants.COLON; +import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import static org.apache.dolphinscheduler.common.Constants.SLASH; -import javax.annotation.PostConstruct; - -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -34,6 +28,19 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; + +import java.util.Date; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -41,8 +48,6 @@ import org.springframework.stereotype.Service; import com.google.common.collect.Sets; -import static org.apache.dolphinscheduler.common.Constants.*; - /** * worker registry @@ -111,10 +116,10 @@ public class WorkerRegistry { } HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime, - this.workerConfig.getWorkerReservedMemory(), - this.workerConfig.getWorkerMaxCpuloadAvg(), - workerZkPaths, - this.zookeeperRegistryCenter); + this.workerConfig.getWorkerReservedMemory(), + this.workerConfig.getWorkerMaxCpuloadAvg(), + workerZkPaths, + this.zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); @@ -142,6 +147,7 @@ public class WorkerRegistry { String address = getLocalAddress(); String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); String weight = getWorkerWeight(); + String workerStartTime = COLON + System.currentTimeMillis(); for (String workGroup : this.workerGroups) { StringBuilder workerZkPathBuilder = new StringBuilder(100); @@ -153,6 +159,7 @@ public class WorkerRegistry { workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH); workerZkPathBuilder.append(address); workerZkPathBuilder.append(weight); + workerZkPathBuilder.append(workerStartTime); workerZkPaths.add(workerZkPathBuilder.toString()); } return workerZkPaths; @@ -162,13 +169,14 @@ public class WorkerRegistry { * get local address */ private String getLocalAddress() { - return NetUtils.getHost() + ":" + workerConfig.getListenPort(); + return NetUtils.getHost() + COLON + workerConfig.getListenPort(); } /** * get Worker Weight */ private String getWorkerWeight() { - return ":" + workerConfig.getWeight(); + return COLON + workerConfig.getWeight(); } + } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java index fadaa84a69..fd5dda0873 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java @@ -14,9 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.dispatch.host.assign; import org.apache.dolphinscheduler.remote.utils.Host; + +import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; @@ -27,15 +30,47 @@ public class LowerWeightRoundRobinTest { @Test - public void testSelect(){ + public void testSelect() { Collection sources = new ArrayList<>(); - sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84)); - sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24)); - sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15)); - System.out.println(sources); + sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84)); + sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.56, 3.24)); + sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 2 * 1000)), 0.06, 0.80, 3.15)); + + LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); + HostWeight result; + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.2", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.2", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.2", result.getHost().getIp()); + } + + @Test + public void testWarmUpSelect() { + Collection sources = new ArrayList<>(); + sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84)); + sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.44, 3.84)); + sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 3 * 1000)), 0.06, 0.44, 3.84)); + sources.add(new HostWeight(Host.of("192.158.2.4:33:100:" + (System.currentTimeMillis() - 60 * 11 * 1000)), 0.06, 0.44, 3.84)); + LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); - for(int i = 0; i < 100; i ++){ - System.out.println(roundRobin.select(sources)); - } + HostWeight result; + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.4", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.2", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.4", result.getHost().getIp()); + result = roundRobin.select(sources); + Assert.assertEquals("192.158.2.1", result.getHost().getIp()); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java index f25a227947..14aa7b8f1f 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java @@ -14,11 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.dispatch.host.assign; -import org.apache.commons.lang.ObjectUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; + import org.junit.Assert; import org.junit.Test; @@ -31,22 +31,22 @@ import java.util.Collections; public class RandomSelectorTest { @Test(expected = IllegalArgumentException.class) - public void testSelectWithIllegalArgumentException(){ + public void testSelectWithIllegalArgumentException() { RandomSelector selector = new RandomSelector(); - selector.select(Collections.EMPTY_LIST); + selector.select(null); } @Test - public void testSelect1(){ + public void testSelect1() { RandomSelector selector = new RandomSelector(); - Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.2",80,20))); + Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.2", 80, 20, System.currentTimeMillis()))); Assert.assertNotNull(result); } @Test - public void testSelect(){ + public void testSelect() { RandomSelector selector = new RandomSelector(); - Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.1",80,20))); + Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.1", 80, 20, System.currentTimeMillis()))); Assert.assertNotNull(result); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java index ed62caaa2c..9e41cd68bf 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java @@ -14,16 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master.dispatch.host.assign; -import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; + import org.junit.Assert; import org.junit.Test; import java.util.Arrays; import java.util.Collections; -import java.util.List; /** * round robin selector @@ -33,43 +33,59 @@ public class RoundRobinSelectorTest { @Test(expected = IllegalArgumentException.class) public void testSelectWithIllegalArgumentException() { RoundRobinSelector selector = new RoundRobinSelector(); - selector.select(Collections.EMPTY_LIST); + selector.select(null); } @Test public void testSelect1() { RoundRobinSelector selector = new RoundRobinSelector(); - Host result = null; - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + Host result; + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); Assert.assertEquals("192.168.1.1", result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); Assert.assertEquals("192.168.1.2", result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); Assert.assertEquals("192.168.1.1", result.getIp()); // add new host - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.2", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); Assert.assertEquals("192.168.1.1", result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.3", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); Assert.assertEquals("192.168.1.2", result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.1",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.3",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.1",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.2",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.1",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.3",result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"), + new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.3", result.getIp()); // remove host3 - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.1",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.2",result.getIp()); - result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); - Assert.assertEquals("192.168.1.1",result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.2", result.getIp()); + result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"))); + Assert.assertEquals("192.168.1.1", result.getIp()); + + } + + @Test + public void testWarmUpRoundRobinSelector() { + RoundRobinSelector selector = new RoundRobinSelector(); + Host result; + result = selector.select( + Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis() - 60 * 1000 * 2, "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis() - 60 * 1000 * 10, "kris"))); + Assert.assertEquals("192.168.1.2", result.getIp()); } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java new file mode 100644 index 0000000000..6273569485 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.utils; + +import org.apache.dolphinscheduler.remote.utils.Host; + +import org.junit.Assert; +import org.junit.Test; + +/** + * host test + */ +public class HostTest { + + @Test + public void testHostWarmUp() { + Host host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000))); + Assert.assertEquals(50, host.getWeight()); + host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 10 * 1000))); + Assert.assertEquals(100, host.getWeight()); + } + + @Test + public void testHost() { + Host host = Host.of("192.158.2.2:22"); + Assert.assertEquals(22, host.getPort()); + } +} diff --git a/pom.xml b/pom.xml index 207518c936..c8cb5a9dea 100644 --- a/pom.xml +++ b/pom.xml @@ -832,6 +832,7 @@ **/server/register/ZookeeperNodeManagerTest.java **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java + **/server/utils/HostTest.java **/server/utils/LogUtilsTest.java **/server/utils/ParamUtilsTest.java