From 5856a12855328e67aeb6a2005f86b3c1081750a1 Mon Sep 17 00:00:00 2001 From: Shiwen Cheng Date: Thu, 18 Mar 2021 10:03:58 +0800 Subject: [PATCH] [Improvement-4984][Worker] Refactor and Improve worker load balance (#4996) * [Improvement][Remote] Improve unit tests * [Improvement][Worker] Improve worker load balance * [Improvement][Worker] Fix code smells * [Improvement][Worker] Rename weight to hostWeight --- .../service/impl/WorkerGroupServiceImpl.java | 13 +- .../api/service/WorkerGroupServiceTest.java | 10 +- .../dolphinscheduler/common/Constants.java | 6 +- .../common/utils/ResInfo.java | 58 ++++--- .../dolphinscheduler/remote/utils/Host.java | 143 +++--------------- .../remote/NettyRemotingClientTest.java | 13 +- .../future}/ResponseFutureTest.java | 12 +- .../log}/RemoveTaskLogRequestCommandTest.java | 10 +- .../RemoveTaskLogResponseCommandTest.java | 9 +- .../remote}/utils/HostTest.java | 23 +-- .../remote/{ => utils}/NettyUtilTest.java | 5 +- .../dispatch/host/CommonHostManager.java | 48 +++--- .../dispatch/host/LowerWeightHostManager.java | 60 ++++---- .../dispatch/host/RandomHostManager.java | 11 +- .../dispatch/host/RoundRobinHostManager.java | 7 +- .../host/assign/AbstractSelector.java | 4 +- .../dispatch/host/assign/HostSelector.java | 6 +- .../dispatch/host/assign/HostWeight.java | 44 +++--- .../dispatch/host/assign/HostWorker.java | 77 ++++++++++ .../dispatch/host/assign/RandomSelector.java | 14 +- .../host/assign/RoundRobinSelector.java | 27 ++-- .../master/registry/MasterRegistry.java | 4 +- .../server/registry/HeartBeatTask.java | 30 +++- .../server/worker/config/WorkerConfig.java | 6 +- .../worker/registry/WorkerRegistry.java | 24 ++- .../server/zk/ZKMasterClient.java | 2 - .../dispatch/host/assign/HostWorkerTest.java | 43 ++++++ .../assign/LowerWeightRoundRobinTest.java | 22 ++- .../host/assign/RandomSelectorTest.java | 10 +- .../host/assign/RoundRobinSelectorTest.java | 36 ++--- .../master/registry/MasterRegistryTest.java | 3 +- .../service/zk/RegisterOperator.java | 4 +- pom.xml | 16 +- 33 files changed, 412 insertions(+), 388 deletions(-) rename dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/{ => command/future}/ResponseFutureTest.java (93%) rename dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/{ => command/log}/RemoveTaskLogRequestCommandTest.java (83%) rename dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/{ => command/log}/RemoveTaskLogResponseCommandTest.java (88%) rename {dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server => dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote}/utils/HostTest.java (56%) rename dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/{ => utils}/NettyUtilTest.java (92%) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index 7ce6a74d6b..e0282de221 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -163,18 +163,13 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro if (CollectionUtils.isEmpty(childrenNodes)) { continue; } - String timeStamp = childrenNodes.get(0); - for (int i = 0; i < childrenNodes.size(); i++) { - childrenNodes.set(i, Host.of(childrenNodes.get(i)).getAddressAndWeight()); - } - WorkerGroup wg = new WorkerGroup(); wg.setName(workerGroup); if (isPaging) { - wg.setIpList(childrenNodes); - String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + SLASH + timeStamp); - wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6])); - wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7])); + wg.setIpList(childrenNodes.stream().map(node -> Host.of(node).getIp()).collect(Collectors.toList())); + String registeredValue = zookeeperCachedOperator.get(workerGroupPath + SLASH + childrenNodes.get(0)); + wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(",")[6])); + wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(",")[7])); } workerGroups.add(wg); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index bc95ad1e31..ce9f239d8a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -70,13 +70,13 @@ public class WorkerGroupServiceTest { workerGroupStrList.add("test"); Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList); - List defaultIpList = new ArrayList<>(); - defaultIpList.add("192.168.220.188:1234:100:1234567"); - defaultIpList.add("192.168.220.189:1234:100:1234567"); + List defaultAddressList = new ArrayList<>(); + defaultAddressList.add("192.168.220.188:1234"); + defaultAddressList.add("192.168.220.189:1234"); - Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultIpList); + Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultAddressList); - Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultIpList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238"); + Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238"); } /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 17412575c5..d6d571f145 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -385,6 +385,10 @@ public final class Constants { */ public static final double DEFAULT_WORKER_RESERVED_MEMORY = OSUtils.totalMemorySize() / 10; + /** + * worker host weight + */ + public static final int DEFAULT_WORKER_HOST_WEIGHT = 100; /** * default log cache rows num,output when reach the number @@ -542,7 +546,7 @@ public final class Constants { * heartbeat for zk info length */ public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10; - + public static final int HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH = 11; /** * jar diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java index 8a6ef1ea8c..8f533a022f 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java @@ -14,7 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.common.utils; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.model.Server; @@ -38,10 +40,8 @@ public class ResInfo { */ private double loadAverage; - public ResInfo(){} - - public ResInfo(double cpuUsage , double memoryUsage){ - this.cpuUsage = cpuUsage ; + public ResInfo(double cpuUsage, double memoryUsage) { + this.cpuUsage = cpuUsage; this.memoryUsage = memoryUsage; } @@ -81,35 +81,53 @@ public class ResInfo { * @param loadAverage load average * @return cpu and memory usage */ - public static String getResInfoJson(double cpuUsage , double memoryUsage,double loadAverage){ + public static String getResInfoJson(double cpuUsage, double memoryUsage, double loadAverage) { ResInfo resInfo = new ResInfo(cpuUsage,memoryUsage,loadAverage); return JSONUtils.toJsonString(resInfo); } - /** * parse heartbeat info for zk * @param heartBeatInfo heartbeat info * @return heartbeat info to Server */ - public static Server parseHeartbeatForZKInfo(String heartBeatInfo){ - if (StringUtils.isEmpty(heartBeatInfo)) { + public static Server parseHeartbeatForZKInfo(String heartBeatInfo) { + if (!isValidHeartbeatForZKInfo(heartBeatInfo)) { return null; } - String[] masterArray = heartBeatInfo.split(Constants.COMMA); - if(masterArray.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ - return null; + String[] parts = heartBeatInfo.split(Constants.COMMA); + Server server = new Server(); + server.setResInfo(getResInfoJson(Double.parseDouble(parts[0]), + Double.parseDouble(parts[1]), + Double.parseDouble(parts[2]))); + server.setCreateTime(DateUtils.stringToDate(parts[6])); + server.setLastHeartbeatTime(DateUtils.stringToDate(parts[7])); + //set process id + server.setId(Integer.parseInt(parts[9])); + return server; + } + /** + * is valid heartbeat info for zk + * @param heartBeatInfo heartbeat info + * @return heartbeat info is valid + */ + public static boolean isValidHeartbeatForZKInfo(String heartBeatInfo) { + if (StringUtils.isNotEmpty(heartBeatInfo)) { + String[] parts = heartBeatInfo.split(Constants.COMMA); + return parts.length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH + || parts.length == Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH; } - Server masterServer = new Server(); - masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[0]), - Double.parseDouble(masterArray[1]), - Double.parseDouble(masterArray[2]))); - masterServer.setCreateTime(DateUtils.stringToDate(masterArray[6])); - masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[7])); - //set process id - masterServer.setId(Integer.parseInt(masterArray[9])); - return masterServer; + return false; + } + + /** + * is new heartbeat info for zk with weight + * @param parts heartbeat info parts + * @return heartbeat info is new with weight + */ + public static boolean isNewHeartbeatWithWeight(String[] parts) { + return parts.length == Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH; } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java index 7e42984e49..359baefae6 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java @@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.remote.utils; import static org.apache.dolphinscheduler.common.Constants.COLON; import java.io.Serializable; -import java.util.Objects; -import java.util.StringJoiner; /** * server address @@ -43,21 +41,6 @@ public class Host implements Serializable { */ private int port; - /** - * weight - */ - private int weight; - - /** - * startTime - */ - private long startTime; - - /** - * workGroup - */ - private String workGroup; - public Host() { } @@ -67,21 +50,11 @@ public class Host implements Serializable { this.address = ip + COLON + port; } - public Host(String ip, int port, int weight, long startTime) { - this.ip = ip; - this.port = port; - this.address = ip + COLON + port; - this.weight = getWarmUpWeight(weight, startTime); - this.startTime = startTime; - } - - public Host(String ip, int port, int weight, long startTime, String workGroup) { - this.ip = ip; - this.port = port; - this.address = ip + COLON + port; - this.weight = getWarmUpWeight(weight, startTime); - this.workGroup = workGroup; - this.startTime = startTime; + public Host(String address) { + String[] parts = splitAddress(address); + this.ip = parts[0]; + this.port = Integer.parseInt(parts[1]); + this.address = address; } public String getAddress() { @@ -89,6 +62,9 @@ public class Host implements Serializable { } public void setAddress(String address) { + String[] parts = splitAddress(address); + this.ip = parts[0]; + this.port = Integer.parseInt(parts[1]); this.address = address; } @@ -101,22 +77,6 @@ public class Host implements Serializable { this.address = ip + COLON + port; } - public int getWeight() { - return weight; - } - - public void setWeight(int weight) { - this.weight = weight; - } - - public long getStartTime() { - return startTime; - } - - public void setStartTime(long startTime) { - this.startTime = startTime; - } - public int getPort() { return port; } @@ -126,12 +86,15 @@ public class Host implements Serializable { this.address = ip + COLON + port; } - public String getWorkGroup() { - return workGroup; - } - - public void setWorkGroup(String workGroup) { - this.workGroup = workGroup; + /** + * address convert host + * + * @param address address + * @return host + */ + public static Host of(String address) { + String[] parts = splitAddress(address); + return new Host(parts[0], Integer.parseInt(parts[1])); } /** @@ -140,37 +103,15 @@ public class Host implements Serializable { * @param address address * @return host */ - public static Host of(String address) { + public static String[] splitAddress(String address) { if (address == null) { throw new IllegalArgumentException("Host : address is null."); } String[] parts = address.split(COLON); - if (parts.length < 2) { + if (parts.length != 2) { throw new IllegalArgumentException(String.format("Host : %s illegal.", address)); } - Host host = null; - if (parts.length == 2) { - host = new Host(parts[0], Integer.parseInt(parts[1])); - } - if (parts.length == 4) { - host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]), Long.parseLong(parts[3])); - } - return host; - } - - /** - * generate host string - * @param address address - * @param weight weight - * @param startTime startTime - * @return address:weight:startTime - */ - public static String generate(String address, int weight, long startTime) { - StringJoiner stringJoiner = new StringJoiner(COLON); - stringJoiner.add(address) - .add(String.valueOf(weight)) - .add(String.valueOf(startTime)); - return stringJoiner.toString(); + return parts; } /** @@ -181,54 +122,16 @@ public class Host implements Serializable { */ public static Boolean isOldVersion(String address) { String[] parts = address.split(COLON); - return parts.length != 2 && parts.length != 3; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - Host host = (Host) o; - return Objects.equals(getAddress(), host.getAddress()); - } - - @Override - public int hashCode() { - return Objects.hash(getAddress()); + return parts.length != 2; } @Override public String toString() { return "Host{" + "address='" + address + '\'' - + ", weight=" + weight - + ", startTime=" + startTime - + ", workGroup='" + workGroup + '\'' + + ", ip='" + ip + '\'' + + ", port=" + port + '}'; } - /** - * warm up - */ - private int getWarmUpWeight(int weight, long startTime) { - long uptime = System.currentTimeMillis() - startTime; - //If the warm-up is not over, reduce the weight - if (uptime > 0 && uptime < Constants.WARM_UP_TIME) { - return (int) (weight * ((float) uptime / Constants.WARM_UP_TIME)); - } - return weight; - } - - /** - * get address and weight - * - * @return address:weight - */ - public String getAddressAndWeight() { - return address + COLON + weight; - } } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java index cfc10b2acb..a3f6c7b582 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.remote; -import io.netty.channel.Channel; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.Ping; @@ -28,23 +27,25 @@ import org.apache.dolphinscheduler.remote.future.InvokeCallback; import org.apache.dolphinscheduler.remote.future.ResponseFuture; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.utils.Host; -import org.junit.Assert; -import org.junit.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import org.junit.Assert; +import org.junit.Test; + +import io.netty.channel.Channel; + /** * netty remote client test */ public class NettyRemotingClientTest { - /** - * test sned sync + * test send sync */ @Test - public void testSendSync(){ + public void testSendSync() { NettyServerConfig serverConfig = new NettyServerConfig(); NettyRemotingServer server = new NettyRemotingServer(serverConfig); diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/future/ResponseFutureTest.java similarity index 93% rename from dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java rename to dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/future/ResponseFutureTest.java index 8836043257..0190c9946b 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/future/ResponseFutureTest.java @@ -15,24 +15,24 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote; - +package org.apache.dolphinscheduler.remote.command.future; import org.apache.dolphinscheduler.remote.future.InvokeCallback; import org.apache.dolphinscheduler.remote.future.ResponseFuture; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; -import org.junit.Assert; -import org.junit.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.junit.Assert; +import org.junit.Test; + public class ResponseFutureTest { @Test - public void testScanFutureTable(){ + public void testScanFutureTable() { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("executor-service")); executorService.scheduleAtFixedRate(new Runnable() { @Override @@ -51,7 +51,7 @@ public class ResponseFutureTest { ResponseFuture future = new ResponseFuture(1, 2000, invokeCallback, null); try { latch.await(5000, TimeUnit.MILLISECONDS); - Assert.assertTrue(ResponseFuture.getFuture(1) == null); + Assert.assertNull(ResponseFuture.getFuture(1)); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogRequestCommandTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommandTest.java similarity index 83% rename from dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogRequestCommandTest.java rename to dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommandTest.java index 37c21064c4..10646f9f02 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogRequestCommandTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommandTest.java @@ -15,18 +15,18 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote; +package org.apache.dolphinscheduler.remote.command.log; -import junit.framework.Assert; import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand; -import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand; + import org.junit.Test; +import junit.framework.Assert; + public class RemoveTaskLogRequestCommandTest { @Test - public void testConvert2Command(){ + public void testConvert2Command() { RemoveTaskLogResponseCommand removeTaskLogResponseCommand = new RemoveTaskLogResponseCommand(); removeTaskLogResponseCommand.setStatus(true); Command command = removeTaskLogResponseCommand.convert2Command(122); diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogResponseCommandTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommandTest.java similarity index 88% rename from dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogResponseCommandTest.java rename to dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommandTest.java index aab0ad336f..87dd700b4b 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogResponseCommandTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommandTest.java @@ -15,17 +15,18 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote; +package org.apache.dolphinscheduler.remote.command.log; -import junit.framework.Assert; import org.apache.dolphinscheduler.remote.command.Command; -import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand; + import org.junit.Test; +import junit.framework.Assert; + public class RemoveTaskLogResponseCommandTest { @Test - public void testConvert2Command(){ + public void testConvert2Command() { RemoveTaskLogRequestCommand removeTaskLogRequestCommand = new RemoveTaskLogRequestCommand(); removeTaskLogRequestCommand.setPath("/opt/zhangsan"); Command command = removeTaskLogRequestCommand.convert2Command(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/HostTest.java similarity index 56% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java rename to dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/HostTest.java index 80ff11e0be..a9da007d5b 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/HostTest.java @@ -15,9 +15,7 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.server.utils; - -import org.apache.dolphinscheduler.remote.utils.Host; +package org.apache.dolphinscheduler.remote.utils; import org.junit.Assert; import org.junit.Test; @@ -27,26 +25,13 @@ import org.junit.Test; */ public class HostTest { - @Test - public void testHostWarmUp() { - Host host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000))); - Assert.assertEquals(50, host.getWeight()); - host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 10 * 1000))); - Assert.assertEquals(100, host.getWeight()); - } - @Test public void testHost() { Host host = Host.of("192.158.2.2:22"); Assert.assertEquals(22, host.getPort()); + host.setAddress("127.0.0.1:8888"); + Assert.assertEquals("127.0.0.1", host.getIp()); + Assert.assertEquals(8888, host.getPort()); } - @Test - public void testGenerate() { - String address = "192.158.2.2:22"; - int weight = 100; - long startTime = System.currentTimeMillis(); - String generateHost = Host.generate(address, weight, startTime); - Assert.assertEquals(address + ":" + weight + ":" + startTime, generateHost); - } } diff --git a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/NettyUtilTest.java similarity index 92% rename from dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java rename to dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/NettyUtilTest.java index e95dbddac9..a3e13850ac 100644 --- a/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java +++ b/dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/NettyUtilTest.java @@ -15,12 +15,10 @@ * limitations under the License. */ -package org.apache.dolphinscheduler.remote; +package org.apache.dolphinscheduler.remote.utils; import static org.apache.dolphinscheduler.remote.utils.Constants.OS_NAME; -import org.apache.dolphinscheduler.remote.utils.NettyUtils; - import org.junit.Assert; import org.junit.Test; @@ -31,7 +29,6 @@ import io.netty.channel.epoll.Epoll; */ public class NettyUtilTest { - @Test public void testUserEpoll() { if (OS_NAME.toLowerCase().contains("linux") && Epoll.isAvailable()) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java index 4a3d4bd9f1..7184b9e328 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java @@ -17,26 +17,32 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.ResInfo; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.springframework.beans.factory.annotation.Autowired; /** * round robin host manager */ public abstract class CommonHostManager implements HostManager { - private final Logger logger = LoggerFactory.getLogger(CommonHostManager.class); + /** + * zookeeper registry center + */ + @Autowired + protected ZookeeperRegistryCenter registryCenter; /** * zookeeperNodeManager @@ -50,16 +56,15 @@ public abstract class CommonHostManager implements HostManager { * @return host */ @Override - public Host select(ExecutionContext context){ + public Host select(ExecutionContext context) { Host host = new Host(); Collection nodes = null; - /** - * executor type - */ + String workerGroup = context.getWorkerGroup(); + // executor type ExecutorType executorType = context.getExecutorType(); - switch (executorType){ + switch (executorType) { case WORKER: - nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup()); + nodes = zookeeperNodeManager.getWorkerGroupNodes(workerGroup); break; case CLIENT: break; @@ -67,21 +72,26 @@ public abstract class CommonHostManager implements HostManager { throw new IllegalArgumentException("invalid executorType : " + executorType); } - if(CollectionUtils.isEmpty(nodes)){ + if (nodes == null || nodes.isEmpty()) { return host; } - List candidateHosts = new ArrayList<>(nodes.size()); + List candidateHosts = new ArrayList<>(); nodes.forEach(node -> { - Host nodeHost=Host.of(node); - nodeHost.setWorkGroup(context.getWorkerGroup()); - candidateHosts.add(nodeHost); + String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup); + String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node); + int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT; + if (StringUtils.isNotEmpty(heartbeat)) { + String[] parts = heartbeat.split(Constants.COMMA); + if (ResInfo.isNewHeartbeatWithWeight(parts)) { + hostWeight = Integer.parseInt(parts[10]); + } + } + candidateHosts.add(HostWorker.of(node, hostWeight, workerGroup)); }); - - return select(candidateHosts); } - protected abstract Host select(Collection nodes); + protected abstract HostWorker select(Collection nodes); public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) { this.zookeeperNodeManager = zookeeperNodeManager; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index ac7d8b0ffc..ef249a08dc 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -19,20 +19,20 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.ResInfo; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -40,8 +40,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.dolphinscheduler.common.Constants.COMMA; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * round robin host manager @@ -50,12 +53,6 @@ public class LowerWeightHostManager extends CommonHostManager { private final Logger logger = LoggerFactory.getLogger(LowerWeightHostManager.class); - /** - * zookeeper registry center - */ - @Autowired - private ZookeeperRegistryCenter registryCenter; - /** * round robin host manager */ @@ -82,7 +79,7 @@ public class LowerWeightHostManager extends CommonHostManager { private ScheduledExecutorService executorService; @PostConstruct - public void init(){ + public void init() { this.selector = new LowerWeightRoundRobin(); this.workerHostWeightsMap = new ConcurrentHashMap<>(); this.lock = new ReentrantLock(); @@ -103,20 +100,20 @@ public class LowerWeightHostManager extends CommonHostManager { * @return host */ @Override - public Host select(ExecutionContext context){ + public Host select(ExecutionContext context) { Set workerHostWeights = getWorkerHostWeights(context.getWorkerGroup()); - if(CollectionUtils.isNotEmpty(workerHostWeights)){ + if (CollectionUtils.isNotEmpty(workerHostWeights)) { return selector.select(workerHostWeights).getHost(); } return new Host(); } @Override - public Host select(Collection nodes) { + public HostWorker select(Collection nodes) { throw new UnsupportedOperationException("not support"); } - private void syncWorkerHostWeight(Map> workerHostWeights){ + private void syncWorkerHostWeight(Map> workerHostWeights) { lock.lock(); try { workerHostWeightsMap.clear(); @@ -126,7 +123,7 @@ public class LowerWeightHostManager extends CommonHostManager { } } - private Set getWorkerHostWeights(String workerGroup){ + private Set getWorkerHostWeights(String workerGroup) { lock.lock(); try { return workerHostWeightsMap.get(workerGroup); @@ -135,7 +132,7 @@ public class LowerWeightHostManager extends CommonHostManager { } } - class RefreshResourceTask implements Runnable{ + class RefreshResourceTask implements Runnable { @Override public void run() { @@ -143,35 +140,34 @@ public class LowerWeightHostManager extends CommonHostManager { Map> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes(); Set>> entries = workerGroupNodes.entrySet(); Map> workerHostWeights = new HashMap<>(); - for(Map.Entry> entry : entries){ + for (Map.Entry> entry : entries) { String workerGroup = entry.getKey(); Set nodes = entry.getValue(); String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup); Set hostWeights = new HashSet<>(nodes.size()); - for(String node : nodes){ + for (String node : nodes) { String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node); - if(StringUtils.isNotEmpty(heartbeat) - && heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ - String[] parts = heartbeat.split(COMMA); - + if (ResInfo.isValidHeartbeatForZKInfo(heartbeat)) { + String[] parts = heartbeat.split(Constants.COMMA); int status = Integer.parseInt(parts[8]); - if (status == Constants.ABNORMAL_NODE_STATUS){ + if (status == Constants.ABNORMAL_NODE_STATUS) { logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", Double.parseDouble(parts[3]) , Double.parseDouble(parts[2])); continue; } - double cpu = Double.parseDouble(parts[0]); double memory = Double.parseDouble(parts[1]); double loadAverage = Double.parseDouble(parts[2]); - HostWeight hostWeight = new HostWeight(Host.of(node), cpu, memory, loadAverage); + long startTime = DateUtils.stringToDate(parts[6]).getTime(); + int weight = ResInfo.isNewHeartbeatWithWeight(parts) ? Integer.parseInt(parts[10]) : Constants.DEFAULT_WORKER_HOST_WEIGHT; + HostWeight hostWeight = new HostWeight(HostWorker.of(node, weight, workerGroup), cpu, memory, loadAverage, startTime); hostWeights.add(hostWeight); } } workerHostWeights.put(workerGroup, hostWeights); } syncWorkerHostWeight(workerHostWeights); - } catch (Throwable ex){ + } catch (Throwable ex) { logger.error("RefreshResourceTask error", ex); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java index 241906a7b4..ad4f4aa6fd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java @@ -17,13 +17,11 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; -import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector; -import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector; import java.util.Collection; - /** * round robin host manager */ @@ -32,17 +30,18 @@ public class RandomHostManager extends CommonHostManager { /** * selector */ - private final Selector selector; + private final RandomSelector selector; /** * set round robin */ - public RandomHostManager(){ + public RandomHostManager() { this.selector = new RandomSelector(); } @Override - public Host select(Collection nodes) { + public HostWorker select(Collection nodes) { return selector.select(nodes); } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java index ec1945e563..c6b173472f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java @@ -17,9 +17,8 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; -import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector; -import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector; import java.util.Collection; @@ -32,7 +31,7 @@ public class RoundRobinHostManager extends CommonHostManager { /** * selector */ - private final Selector selector; + private final RoundRobinSelector selector; /** * set round robin @@ -42,7 +41,7 @@ public class RoundRobinHostManager extends CommonHostManager { } @Override - public Host select(Collection nodes) { + public HostWorker select(Collection nodes) { return selector.select(nodes); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java index 8560da957e..087a5ff002 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java @@ -23,7 +23,7 @@ import java.util.Collection; /** * AbstractSelector */ -public abstract class AbstractSelector implements Selector{ +public abstract class AbstractSelector implements Selector { @Override public T select(Collection source) { @@ -40,6 +40,6 @@ public abstract class AbstractSelector implements Selector{ return doSelect(source); } - protected abstract T doSelect(Collection source); + protected abstract T doSelect(Collection source); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java index 145393e1f0..0be87e24cd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java @@ -28,9 +28,9 @@ public enum HostSelector { LOWERWEIGHT; - public static HostSelector of(String selector){ - for(HostSelector hs : values()){ - if(hs.name().equalsIgnoreCase(selector)){ + public static HostSelector of(String selector) { + for (HostSelector hs : values()) { + if (hs.name().equalsIgnoreCase(selector)) { return hs; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java index 839ebc85c7..9d7855f054 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java @@ -31,57 +31,55 @@ public class HostWeight { private final int LOAD_AVERAGE_FACTOR = 70; - private final Host host; + private final HostWorker hostWorker; private final double weight; private double currentWeight; - public HostWeight(Host host, double cpu, double memory, double loadAverage) { - this.weight = getWeight(cpu, memory, loadAverage, host); - this.host = host; - this.currentWeight = weight; - } - - public double getCurrentWeight() { - return currentWeight; + public HostWeight(HostWorker hostWorker, double cpu, double memory, double loadAverage, long startTime) { + this.hostWorker = hostWorker; + this.weight = calculateWeight(cpu, memory, loadAverage, startTime); + this.currentWeight = this.weight; } public double getWeight() { return weight; } + public double getCurrentWeight() { + return currentWeight; + } + public void setCurrentWeight(double currentWeight) { this.currentWeight = currentWeight; } + public HostWorker getHostWorker() { + return hostWorker; + } + public Host getHost() { - return host; + return (Host)hostWorker; } @Override public String toString() { return "HostWeight{" - + "host=" + host + + "hostWorker=" + hostWorker + ", weight=" + weight + ", currentWeight=" + currentWeight + '}'; } - private double getWeight(double cpu, double memory, double loadAverage, Host host) { - double calculateWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR; - return getWarmUpWeight(host, calculateWeight); - } - - /** - * If the warm-up is not over, add the weight - */ - private double getWarmUpWeight(Host host, double weight) { - long startTime = host.getStartTime(); + private double calculateWeight(double cpu, double memory, double loadAverage, long startTime) { + double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR; long uptime = System.currentTimeMillis() - startTime; if (uptime > 0 && uptime < Constants.WARM_UP_TIME) { - return weight * Constants.WARM_UP_TIME / uptime; + // If the warm-up is not over, add the weight + return calculatedWeight * Constants.WARM_UP_TIME / uptime; } - return weight; + return calculatedWeight; } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java new file mode 100644 index 0000000000..6515464b89 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dispatch.host.assign; + +import org.apache.dolphinscheduler.remote.utils.Host; + +/** + * host worker + */ +public class HostWorker extends Host { + + /** + * host weight + */ + private int hostWeight; + + /** + * worker group + */ + private String workerGroup; + + public HostWorker(String ip, int port, int hostWeight, String workerGroup) { + super(ip, port); + this.hostWeight = hostWeight; + this.workerGroup = workerGroup; + } + + public HostWorker(String address, int hostWeight, String workerGroup) { + super(address); + this.hostWeight = hostWeight; + this.workerGroup = workerGroup; + } + + public int getHostWeight() { + return hostWeight; + } + + public void setHostWeight(int hostWeight) { + this.hostWeight = hostWeight; + } + + public String getWorkerGroup() { + return workerGroup; + } + + public void setWorkerGroup(String workerGroup) { + this.workerGroup = workerGroup; + } + + public static HostWorker of(String address, int hostWeight, String workerGroup) { + return new HostWorker(address, hostWeight, workerGroup); + } + + @Override + public String toString() { + return "Host{" + + "hostWeight=" + hostWeight + + ", workerGroup='" + workerGroup + '\'' + + '}'; + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java index 6975127b9a..2b7488a370 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; -import org.apache.dolphinscheduler.remote.utils.Host; - import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -27,20 +25,20 @@ import java.util.concurrent.ThreadLocalRandom; /** * random selector */ -public class RandomSelector extends AbstractSelector { +public class RandomSelector extends AbstractSelector { @Override - public Host doSelect(final Collection source) { + public HostWorker doSelect(final Collection source) { - List hosts = new ArrayList<>(source); + List hosts = new ArrayList<>(source); int size = hosts.size(); int[] weights = new int[size]; int totalWeight = 0; int index = 0; - for (Host host : hosts) { - totalWeight += host.getWeight(); - weights[index] = host.getWeight(); + for (HostWorker host : hosts) { + totalWeight += host.getHostWeight(); + weights[index] = host.getHostWeight(); index++; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java index 34a79ac6e8..a5d93a684b 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java @@ -16,20 +16,21 @@ */ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; -import org.apache.dolphinscheduler.remote.utils.Host; -import org.springframework.stereotype.Service; - -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.springframework.stereotype.Service; + /** * Smooth Weight Round Robin */ @Service -public class RoundRobinSelector extends AbstractSelector { +public class RoundRobinSelector extends AbstractSelector { private ConcurrentMap> workGroupWeightMap = new ConcurrentHashMap<>(); @@ -69,12 +70,11 @@ public class RoundRobinSelector extends AbstractSelector { } - @Override - public Host doSelect(Collection source) { + public HostWorker doSelect(Collection source) { - List hosts = new ArrayList<>(source); - String key = hosts.get(0).getWorkGroup(); + List hosts = new ArrayList<>(source); + String key = hosts.get(0).getWorkerGroup(); ConcurrentMap map = workGroupWeightMap.get(key); if (map == null) { workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>()); @@ -84,13 +84,13 @@ public class RoundRobinSelector extends AbstractSelector { int totalWeight = 0; long maxCurrent = Long.MIN_VALUE; long now = System.currentTimeMillis(); - Host selectedHost = null; + HostWorker selectedHost = null; WeightedRoundRobin selectWeightRoundRobin = null; - for (Host host : hosts) { - String workGroupHost = host.getWorkGroup() + host.getAddress(); + for (HostWorker host : hosts) { + String workGroupHost = host.getWorkerGroup() + host.getAddress(); WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost); - int weight = host.getWeight(); + int weight = host.getHostWeight(); if (weight < 0) { weight = 0; } @@ -117,7 +117,6 @@ public class RoundRobinSelector extends AbstractSelector { totalWeight += weight; } - if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) { try { ConcurrentMap newMap = new ConcurrentHashMap<>(map); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index e54fc84538..f77f00d53a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -98,8 +98,8 @@ public class MasterRegistry { }); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, - masterConfig.getMasterReservedMemory(), masterConfig.getMasterMaxCpuloadAvg(), + masterConfig.getMasterReservedMemory(), Sets.newHashSet(getMasterPath()), Constants.MASTER_PREFIX, zookeeperRegistryCenter); @@ -132,9 +132,7 @@ public class MasterRegistry { * get local address */ private String getLocalAddress() { - return NetUtils.getAddr(masterConfig.getListenPort()); - } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index d5f7a6edd2..e4c3d6ab6d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -38,8 +38,9 @@ public class HeartBeatTask implements Runnable { private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); private String startTime; - private double reservedMemory; private double maxCpuloadAvg; + private double reservedMemory; + private int hostWeight; // worker host weight private Set heartBeatPaths; private String serverType; private ZookeeperRegistryCenter zookeeperRegistryCenter; @@ -48,23 +49,38 @@ public class HeartBeatTask implements Runnable { protected IStoppable stoppable = null; public HeartBeatTask(String startTime, - double reservedMemory, double maxCpuloadAvg, + double reservedMemory, Set heartBeatPaths, String serverType, ZookeeperRegistryCenter zookeeperRegistryCenter) { this.startTime = startTime; - this.reservedMemory = reservedMemory; this.maxCpuloadAvg = maxCpuloadAvg; + this.reservedMemory = reservedMemory; this.heartBeatPaths = heartBeatPaths; + this.serverType = serverType; this.zookeeperRegistryCenter = zookeeperRegistryCenter; + } + + public HeartBeatTask(String startTime, + double maxCpuloadAvg, + double reservedMemory, + int hostWeight, + Set heartBeatPaths, + String serverType, + ZookeeperRegistryCenter zookeeperRegistryCenter) { + this.startTime = startTime; + this.maxCpuloadAvg = maxCpuloadAvg; + this.reservedMemory = reservedMemory; + this.hostWeight = hostWeight; + this.heartBeatPaths = heartBeatPaths; this.serverType = serverType; + this.zookeeperRegistryCenter = zookeeperRegistryCenter; } @Override public void run() { try { - // check dead or not in zookeeper for (String heartBeatPath : heartBeatPaths) { if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath, serverType)) { @@ -94,8 +110,12 @@ public class HeartBeatTask implements Runnable { builder.append(startTime).append(Constants.COMMA); builder.append(DateUtils.dateToString(new Date())).append(Constants.COMMA); builder.append(status).append(COMMA); - //save process id + // save process id builder.append(OSUtils.getProcessID()); + // worker host weight + if (Constants.WORKER_PREFIX.equals(serverType)) { + builder.append(Constants.COMMA).append(hostWeight); + } for (String heartBeatPath : heartBeatPaths) { zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath, builder.toString()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 228f6ab755..0bd84f5e6d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -47,7 +47,7 @@ public class WorkerConfig { @Value("#{'${worker.groups:default}'.split(',')}") private Set workerGroups; - @Value("${worker.listen.port: 1234}") + @Value("${worker.listen.port:1234}") private int listenPort; @Value("${worker.host.weight:100}") @@ -119,8 +119,8 @@ public class WorkerConfig { return hostWeight; } - public void setHostWeight(int weight) { - this.hostWeight = weight; + public void setHostWeight(int hostWeight) { + this.hostWeight = hostWeight; } public String getAlertListenHost() { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 06b72a5450..994fb58ff9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; -import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; @@ -34,6 +33,7 @@ import org.apache.curator.framework.state.ConnectionState; import java.util.Date; import java.util.Set; +import java.util.StringJoiner; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -120,12 +120,13 @@ public class WorkerRegistry { logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); } - HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime, - this.workerConfig.getWorkerReservedMemory(), - this.workerConfig.getWorkerMaxCpuloadAvg(), + HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, + workerConfig.getWorkerMaxCpuloadAvg(), + workerConfig.getWorkerReservedMemory(), + workerConfig.getHostWeight(), workerZkPaths, Constants.WORKER_PREFIX, - this.zookeeperRegistryCenter); + zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); @@ -150,22 +151,19 @@ public class WorkerRegistry { */ public Set getWorkerZkPaths() { Set workerZkPaths = Sets.newHashSet(); - String address = getLocalAddress(); String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath(); - int weight = workerConfig.getHostWeight(); - long workerStartTime = System.currentTimeMillis(); for (String workGroup : this.workerGroups) { - StringBuilder workerZkPathBuilder = new StringBuilder(100); - workerZkPathBuilder.append(workerZkPathPrefix).append(SLASH); + StringJoiner workerZkPathJoiner = new StringJoiner(SLASH); + workerZkPathJoiner.add(workerZkPathPrefix); if (StringUtils.isEmpty(workGroup)) { workGroup = DEFAULT_WORKER_GROUP; } // trim and lower case is need - workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH); - workerZkPathBuilder.append(Host.generate(address, weight, workerStartTime)); - workerZkPaths.add(workerZkPathBuilder.toString()); + workerZkPathJoiner.add(workGroup.trim().toLowerCase()); + workerZkPathJoiner.add(address); + workerZkPaths.add(workerZkPathJoiner.toString()); } return workerZkPaths; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index e813cdf1ae..0f30a5c155 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.MasterServer; @@ -308,7 +307,6 @@ public class ZKMasterClient extends AbstractZKClient { * @throws Exception exception */ private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception { - workerHost = Host.of(workerHost).getAddress(); logger.info("start worker[{}] failover ...", workerHost); List needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost); for (TaskInstance taskInstance : needFailoverTaskInstanceList) { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java new file mode 100644 index 0000000000..11f007ba21 --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dispatch.host.assign; + +import org.junit.Assert; +import org.junit.Test; + +public class HostWorkerTest { + + @Test + public void testHostWorker1() { + HostWorker hostWorker = new HostWorker("192.158.2.2", 11, 20, "default"); + Assert.assertEquals("192.158.2.2", hostWorker.getIp()); + Assert.assertEquals(11, hostWorker.getPort()); + Assert.assertEquals(20, hostWorker.getHostWeight()); + Assert.assertEquals("default", hostWorker.getWorkerGroup()); + } + + @Test + public void testHostWorker2() { + HostWorker hostWorker = HostWorker.of("192.158.2.2:22", 80, "default"); + Assert.assertEquals("192.158.2.2", hostWorker.getIp()); + Assert.assertEquals(22, hostWorker.getPort()); + Assert.assertEquals(80, hostWorker.getHostWeight()); + Assert.assertEquals("default", hostWorker.getWorkerGroup()); + } + +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java index fd5dda0873..f822f04d97 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java @@ -17,24 +17,20 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; -import org.apache.dolphinscheduler.remote.utils.Host; - -import org.junit.Assert; -import org.junit.Test; - import java.util.ArrayList; import java.util.Collection; +import org.junit.Assert; +import org.junit.Test; public class LowerWeightRoundRobinTest { - @Test public void testSelect() { Collection sources = new ArrayList<>(); - sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84)); - sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.56, 3.24)); - sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 2 * 1000)), 0.06, 0.80, 3.15)); + sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, System.currentTimeMillis() - 60 * 5 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, System.currentTimeMillis() - 60 * 2 * 1000)); LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); HostWeight result; @@ -55,10 +51,10 @@ public class LowerWeightRoundRobinTest { @Test public void testWarmUpSelect() { Collection sources = new ArrayList<>(); - sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84)); - sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.44, 3.84)); - sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 3 * 1000)), 0.06, 0.44, 3.84)); - sources.add(new HostWeight(Host.of("192.158.2.4:33:100:" + (System.currentTimeMillis() - 60 * 11 * 1000)), 0.06, 0.44, 3.84)); + sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 5 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 3 * 1000)); + sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 11 * 1000)); LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin(); HostWeight result; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java index 14aa7b8f1f..2173ec88db 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java @@ -17,14 +17,11 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; -import org.apache.dolphinscheduler.remote.utils.Host; +import java.util.Arrays; import org.junit.Assert; import org.junit.Test; -import java.util.Arrays; -import java.util.Collections; - /** * random selector */ @@ -39,15 +36,14 @@ public class RandomSelectorTest { @Test public void testSelect1() { RandomSelector selector = new RandomSelector(); - Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.2", 80, 20, System.currentTimeMillis()))); + HostWorker result = selector.select(Arrays.asList(new HostWorker("192.168.1.1:11", 100, "default"), new HostWorker("192.168.1.2:22", 80, "default"))); Assert.assertNotNull(result); } @Test public void testSelect() { RandomSelector selector = new RandomSelector(); - Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.1", 80, 20, System.currentTimeMillis()))); + HostWorker result = selector.select(Arrays.asList(new HostWorker("192.168.1.1", 11, 100, "default"), new HostWorker("192.168.1.2:", 22, 20, "default"))); Assert.assertNotNull(result); - } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java index 1c9f4922a7..08badd5968 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java @@ -17,14 +17,12 @@ package org.apache.dolphinscheduler.server.master.dispatch.host.assign; -import org.apache.dolphinscheduler.remote.utils.Host; +import java.util.Arrays; +import java.util.List; import org.junit.Assert; import org.junit.Test; -import java.util.Arrays; -import java.util.List; - /** * round robin selector */ @@ -39,18 +37,16 @@ public class RoundRobinSelectorTest { @Test public void testSelect1() { RoundRobinSelector selector = new RoundRobinSelector(); - // dismiss of server warm-up time - long startTime = System.currentTimeMillis() - 60 * 10 * 1000; - List hostOneList = Arrays.asList( - new Host("192.168.1.1", 80, 20, startTime, "kris"), - new Host("192.168.1.2", 80, 10, startTime, "kris")); - - List hostTwoList = Arrays.asList( - new Host("192.168.1.1", 80, 20, startTime, "kris"), - new Host("192.168.1.2", 80, 10, startTime, "kris"), - new Host("192.168.1.3", 80, 10, startTime, "kris")); - - Host result; + List hostOneList = Arrays.asList( + new HostWorker("192.168.1.1", 80, 20, "kris"), + new HostWorker("192.168.1.2", 80, 10, "kris")); + + List hostTwoList = Arrays.asList( + new HostWorker("192.168.1.1", 80, 20, "kris"), + new HostWorker("192.168.1.2", 80, 10, "kris"), + new HostWorker("192.168.1.3", 80, 10, "kris")); + + HostWorker result; result = selector.select(hostOneList); Assert.assertEquals("192.168.1.1", result.getIp()); @@ -93,17 +89,15 @@ public class RoundRobinSelectorTest { result = selector.select(hostOneList); Assert.assertEquals("192.168.1.1", result.getIp()); - } @Test - public void testWarmUpRoundRobinSelector() { + public void testWeightRoundRobinSelector() { RoundRobinSelector selector = new RoundRobinSelector(); - Host result; + HostWorker result; result = selector.select( - Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis() - 60 * 1000 * 2, "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis() - 60 * 1000 * 10, "kris"))); + Arrays.asList(new HostWorker("192.168.1.1", 11, 20, "kris"), new HostWorker("192.168.1.2", 22, 80, "kris"))); Assert.assertEquals("192.168.1.2", result.getIp()); - } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java index 9b62473930..8068ebd664 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.registry; import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; +import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.remote.utils.Constants; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; @@ -59,7 +60,7 @@ public class MasterRegistryTest { masterRegistry.registry(); String masterPath = zookeeperRegistryCenter.getMasterPath(); TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node - String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort()); + String masterNodePath = masterPath + "/" + (NetUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort())); String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath); Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length); masterRegistry.unRegistry(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java index 0fd4a4fa92..0c6ac6d0d4 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java @@ -69,10 +69,10 @@ public class RegisterOperator extends ZookeeperCachedOperator { } /** - * get host ip, string format: masterParentPath/ip + * get host ip:port, string format: parentPath/ip:port * * @param path path - * @return host ip, string format: masterParentPath/ip + * @return host ip:port, string format: parentPath/ip:port */ protected String getHostByEventDataPath(String path) { if (StringUtils.isEmpty(path)) { diff --git a/pom.xml b/pom.xml index 4662fc41ff..2d18af9c61 100644 --- a/pom.xml +++ b/pom.xml @@ -896,15 +896,15 @@ **/dao/datasource/MySQLDataSourceTest.java **/dao/entity/TaskInstanceTest.java **/dao/entity/UdfFuncTest.java - **/remote/JsonSerializerTest.java - **/remote/RemoveTaskLogResponseCommandTest.java - **/rpc/RpcTest.java - **/remote/RemoveTaskLogRequestCommandTest.java - **/remote/NettyRemotingClientTest.java - **/remote/NettyUtilTest.java - **/remote/ResponseFutureTest.java **/remote/command/alert/AlertSendRequestCommandTest.java **/remote/command/alert/AlertSendResponseCommandTest.java + **/remote/command/future/ResponseFutureTest.java + **/remote/command/log/RemoveTaskLogRequestCommandTest.java + **/remote/command/log/RemoveTaskLogResponseCommandTest.java + **/remote/utils/HostTest.java + **/remote/utils/NettyUtilTest.java + **/remote/NettyRemotingClientTest.java + **/rpc/RpcTest.java **/server/log/LoggerServerTest.java **/server/entity/SQLTaskExecutionContextTest.java **/server/log/MasterLogFilterTest.java @@ -919,6 +919,7 @@ **/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java **/server/master/dispatch/host/assign/RandomSelectorTest.java **/server/master/dispatch/host/assign/RoundRobinSelectorTest.java + **/server/master/dispatch/host/assign/HostWorkerTest.java **/server/master/register/MasterRegistryTest.java **/server/master/dispatch/host/assign/RoundRobinHostManagerTest.java **/server/master/AlertManagerTest.java @@ -935,7 +936,6 @@ **/server/register/ZookeeperRegistryCenterTest.java **/server/utils/DataxUtilsTest.java **/server/utils/ExecutionContextTestUtils.java - **/server/utils/HostTest.java **/server/utils/FlinkArgsUtilsTest.java **/server/utils/LogUtilsTest.java **/server/utils/MapReduceArgsUtilsTest.java