Browse Source

[Improvement][remote]load balance warm up (#3770)

* [Improvement][remote]load balance  warm up

* reformat code

* reformat code

* code smell

* code smell

* add test

* add test

* add test

* add test

* fix bug

* fix bug

* add docs

* add host test

* add host test

* add host test

* add docs

* code reformat

* code reformat
pull/3/MERGE
CalvinKirs 4 years ago committed by GitHub
parent
commit
db663a13a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java
  2. 46
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
  3. 31
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
  4. 36
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  5. 47
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
  6. 10
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
  7. 52
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
  8. 43
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java
  9. 1
      pom.xml

5
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Constants.java

@ -53,4 +53,9 @@ public class Constants {
*/ */
public static final String OS_NAME = System.getProperty("os.name"); public static final String OS_NAME = System.getProperty("os.name");
/**
* warm up time
*/
public static final int WARM_UP_TIME = 10 * 60 * 1000;
} }

46
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.remote.utils; package org.apache.dolphinscheduler.remote.utils;
import java.io.Serializable; import java.io.Serializable;
@ -44,6 +45,11 @@ public class Host implements Serializable {
*/ */
private int weight; private int weight;
/**
* startTime
*/
private long startTime;
/** /**
* workGroup * workGroup
*/ */
@ -58,19 +64,21 @@ public class Host implements Serializable {
this.address = ip + ":" + port; this.address = ip + ":" + port;
} }
public Host(String ip, int port, int weight) { public Host(String ip, int port, int weight, long startTime) {
this.ip = ip; this.ip = ip;
this.port = port; this.port = port;
this.address = ip + ":" + port; this.address = ip + ":" + port;
this.weight = weight; this.weight = getWarmUpWeight(weight, startTime);
this.startTime = startTime;
} }
public Host(String ip, int port, int weight,String workGroup) { public Host(String ip, int port, int weight, long startTime, String workGroup) {
this.ip = ip; this.ip = ip;
this.port = port; this.port = port;
this.address = ip + ":" + port; this.address = ip + ":" + port;
this.weight = weight; this.weight = getWarmUpWeight(weight, startTime);
this.workGroup = workGroup; this.workGroup = workGroup;
this.startTime = startTime;
} }
public String getAddress() { public String getAddress() {
@ -98,6 +106,14 @@ public class Host implements Serializable {
this.weight = weight; this.weight = weight;
} }
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public int getPort() { public int getPort() {
return port; return port;
} }
@ -133,8 +149,8 @@ public class Host implements Serializable {
if (parts.length == 2) { if (parts.length == 2) {
host = new Host(parts[0], Integer.parseInt(parts[1])); host = new Host(parts[0], Integer.parseInt(parts[1]));
} }
if (parts.length == 3) { if (parts.length == 4) {
host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2])); host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]), Long.parseLong(parts[3]));
} }
return host; return host;
} }
@ -169,8 +185,20 @@ public class Host implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "Host{" + return "Host{"
"address='" + address + '\'' + + "address='" + address + '\''
'}'; + '}';
}
/**
* warm up
*/
private int getWarmUpWeight(int weight, long startTime) {
long uptime = System.currentTimeMillis() - startTime;
//If the warm-up is not over, reduce the weight
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
return (int) (weight * ((float) uptime / Constants.WARM_UP_TIME));
}
return weight;
} }
} }

31
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign; package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
/** /**
@ -37,7 +38,7 @@ public class HostWeight {
private int currentWeight; private int currentWeight;
public HostWeight(Host host, double cpu, double memory, double loadAverage) { public HostWeight(Host host, double cpu, double memory, double loadAverage) {
this.weight = calculateWeight(cpu, memory, loadAverage); this.weight = getWeight(cpu, memory, loadAverage, host);
this.host = host; this.host = host;
this.currentWeight = weight; this.currentWeight = weight;
} }
@ -60,14 +61,28 @@ public class HostWeight {
@Override @Override
public String toString() { public String toString() {
return "HostWeight{" + return "HostWeight{"
"host=" + host + + "host=" + host
", weight=" + weight + + ", weight=" + weight
", currentWeight=" + currentWeight + + ", currentWeight=" + currentWeight
'}'; + '}';
} }
private int calculateWeight(double cpu, double memory, double loadAverage){ private int getWeight(double cpu, double memory, double loadAverage, Host host) {
return (int)(cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR); int calculateWeight = (int) (cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR);
return getWarmUpWeight(host, calculateWeight);
}
/**
* If the warm-up is not over, add the weight
*/
private int getWarmUpWeight(Host host, int weight) {
long startTime = host.getStartTime();
long uptime = System.currentTimeMillis() - startTime;
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
return (int) ((weight * Constants.WARM_UP_TIME) / uptime);
}
return weight;
} }
} }

36
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -14,19 +14,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.worker.registry; package org.apache.dolphinscheduler.server.worker.registry;
import java.util.Date; import static org.apache.dolphinscheduler.common.Constants.COLON;
import java.util.Set; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import java.util.concurrent.Executors; import static org.apache.dolphinscheduler.common.Constants.SLASH;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -34,6 +28,19 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -41,8 +48,6 @@ import org.springframework.stereotype.Service;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* worker registry * worker registry
@ -142,6 +147,7 @@ public class WorkerRegistry {
String address = getLocalAddress(); String address = getLocalAddress();
String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath();
String weight = getWorkerWeight(); String weight = getWorkerWeight();
String workerStartTime = COLON + System.currentTimeMillis();
for (String workGroup : this.workerGroups) { for (String workGroup : this.workerGroups) {
StringBuilder workerZkPathBuilder = new StringBuilder(100); StringBuilder workerZkPathBuilder = new StringBuilder(100);
@ -153,6 +159,7 @@ public class WorkerRegistry {
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH); workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
workerZkPathBuilder.append(address); workerZkPathBuilder.append(address);
workerZkPathBuilder.append(weight); workerZkPathBuilder.append(weight);
workerZkPathBuilder.append(workerStartTime);
workerZkPaths.add(workerZkPathBuilder.toString()); workerZkPaths.add(workerZkPathBuilder.toString());
} }
return workerZkPaths; return workerZkPaths;
@ -162,13 +169,14 @@ public class WorkerRegistry {
* get local address * get local address
*/ */
private String getLocalAddress() { private String getLocalAddress() {
return NetUtils.getHost() + ":" + workerConfig.getListenPort(); return NetUtils.getHost() + COLON + workerConfig.getListenPort();
} }
/** /**
* get Worker Weight * get Worker Weight
*/ */
private String getWorkerWeight() { private String getWorkerWeight() {
return ":" + workerConfig.getWeight(); return COLON + workerConfig.getWeight();
} }
} }

47
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java

@ -14,9 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.dispatch.host.assign; package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.ArrayList; import java.util.ArrayList;
@ -29,13 +32,45 @@ public class LowerWeightRoundRobinTest {
@Test @Test
public void testSelect() { public void testSelect() {
Collection<HostWeight> sources = new ArrayList<>(); Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(Host.of("192.158.2.1:11"), 0.06, 0.44, 3.84)); sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.1:22"), 0.06, 0.56, 3.24)); sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.56, 3.24));
sources.add(new HostWeight(Host.of("192.158.2.1:33"), 0.06, 0.80, 3.15)); sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 2 * 1000)), 0.06, 0.80, 3.15));
System.out.println(sources);
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
for(int i = 0; i < 100; i ++){ HostWeight result;
System.out.println(roundRobin.select(sources)); result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
} }
@Test
public void testWarmUpSelect() {
Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 3 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.4:33:100:" + (System.currentTimeMillis() - 60 * 11 * 1000)), 0.06, 0.44, 3.84));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.4", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.2", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.4", result.getHost().getIp());
result = roundRobin.select(sources);
Assert.assertEquals("192.158.2.1", result.getHost().getIp());
} }
} }

10
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java

@ -14,11 +14,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.dispatch.host.assign; package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.commons.lang.ObjectUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -33,20 +33,20 @@ public class RandomSelectorTest {
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testSelectWithIllegalArgumentException() { public void testSelectWithIllegalArgumentException() {
RandomSelector selector = new RandomSelector(); RandomSelector selector = new RandomSelector();
selector.select(Collections.EMPTY_LIST); selector.select(null);
} }
@Test @Test
public void testSelect1() { public void testSelect1() {
RandomSelector selector = new RandomSelector(); RandomSelector selector = new RandomSelector();
Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.2",80,20))); Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.2", 80, 20, System.currentTimeMillis())));
Assert.assertNotNull(result); Assert.assertNotNull(result);
} }
@Test @Test
public void testSelect() { public void testSelect() {
RandomSelector selector = new RandomSelector(); RandomSelector selector = new RandomSelector();
Host result = selector.select(Arrays.asList(new Host("192.168.1.1",80,100),new Host("192.168.1.1",80,20))); Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.1", 80, 20, System.currentTimeMillis())));
Assert.assertNotNull(result); Assert.assertNotNull(result);
} }

52
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java

@ -14,16 +14,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.dispatch.host.assign; package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List;
/** /**
* round robin selector * round robin selector
@ -33,44 +33,60 @@ public class RoundRobinSelectorTest {
@Test(expected = IllegalArgumentException.class) @Test(expected = IllegalArgumentException.class)
public void testSelectWithIllegalArgumentException() { public void testSelectWithIllegalArgumentException() {
RoundRobinSelector selector = new RoundRobinSelector(); RoundRobinSelector selector = new RoundRobinSelector();
selector.select(Collections.EMPTY_LIST); selector.select(null);
} }
@Test @Test
public void testSelect1() { public void testSelect1() {
RoundRobinSelector selector = new RoundRobinSelector(); RoundRobinSelector selector = new RoundRobinSelector();
Host result = null; Host result;
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.2", result.getIp()); Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
// add new host // add new host
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.2", result.getIp()); Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.3", result.getIp()); Assert.assertEquals("192.168.1.3", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.2", result.getIp()); Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"), new Host("192.168.1.3", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris"),
new Host("192.168.1.3", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.3", result.getIp()); Assert.assertEquals("192.168.1.3", result.getIp());
// remove host3 // remove host3
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.2", result.getIp()); Assert.assertEquals("192.168.1.2", result.getIp());
result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, "kris"), new Host("192.168.1.2", 80, 10, "kris"))); result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis(), "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis(), "kris")));
Assert.assertEquals("192.168.1.1", result.getIp()); Assert.assertEquals("192.168.1.1", result.getIp());
} }
@Test
public void testWarmUpRoundRobinSelector() {
RoundRobinSelector selector = new RoundRobinSelector();
Host result;
result = selector.select(
Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis() - 60 * 1000 * 2, "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis() - 60 * 1000 * 10, "kris")));
Assert.assertEquals("192.168.1.2", result.getIp());
}
} }

43
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert;
import org.junit.Test;
/**
* host test
*/
public class HostTest {
@Test
public void testHostWarmUp() {
Host host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)));
Assert.assertEquals(50, host.getWeight());
host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 10 * 1000)));
Assert.assertEquals(100, host.getWeight());
}
@Test
public void testHost() {
Host host = Host.of("192.158.2.2:22");
Assert.assertEquals(22, host.getPort());
}
}

1
pom.xml

@ -832,6 +832,7 @@
<include>**/server/register/ZookeeperNodeManagerTest.java</include> <include>**/server/register/ZookeeperNodeManagerTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include> <include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include> <include>**/server/utils/ExecutionContextTestUtils.java</include>
<include>**/server/utils/HostTest.java</include>
<!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>--> <!--<include>**/server/utils/FlinkArgsUtilsTest.java</include>-->
<include>**/server/utils/LogUtilsTest.java</include> <include>**/server/utils/LogUtilsTest.java</include>
<include>**/server/utils/ParamUtilsTest.java</include> <include>**/server/utils/ParamUtilsTest.java</include>

Loading…
Cancel
Save