Browse Source

[Improvement-4984][Worker] Refactor and Improve worker load balance (#4996)

* [Improvement][Remote] Improve unit tests

* [Improvement][Worker] Improve worker load balance

* [Improvement][Worker] Fix code smells

* [Improvement][Worker] Rename weight to hostWeight
pull/3/MERGE
Shiwen Cheng 4 years ago committed by GitHub
parent
commit
5856a12855
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  2. 10
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
  3. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  4. 58
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
  5. 143
      dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java
  6. 13
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java
  7. 12
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/future/ResponseFutureTest.java
  8. 10
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommandTest.java
  9. 9
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommandTest.java
  10. 23
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/HostTest.java
  11. 5
      dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/NettyUtilTest.java
  12. 48
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java
  13. 60
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  14. 11
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java
  15. 7
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java
  16. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java
  17. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java
  18. 44
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java
  19. 77
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java
  20. 14
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java
  21. 27
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java
  22. 4
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  23. 30
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
  24. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  25. 24
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  26. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java
  27. 43
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java
  28. 22
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java
  29. 10
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java
  30. 36
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java
  31. 3
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
  32. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java
  33. 16
      pom.xml

13
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -163,18 +163,13 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
if (CollectionUtils.isEmpty(childrenNodes)) {
continue;
}
String timeStamp = childrenNodes.get(0);
for (int i = 0; i < childrenNodes.size(); i++) {
childrenNodes.set(i, Host.of(childrenNodes.get(i)).getAddressAndWeight());
}
WorkerGroup wg = new WorkerGroup();
wg.setName(workerGroup);
if (isPaging) {
wg.setIpList(childrenNodes);
String registeredIpValue = zookeeperCachedOperator.get(workerGroupPath + SLASH + timeStamp);
wg.setCreateTime(DateUtils.stringToDate(registeredIpValue.split(",")[6]));
wg.setUpdateTime(DateUtils.stringToDate(registeredIpValue.split(",")[7]));
wg.setIpList(childrenNodes.stream().map(node -> Host.of(node).getIp()).collect(Collectors.toList()));
String registeredValue = zookeeperCachedOperator.get(workerGroupPath + SLASH + childrenNodes.get(0));
wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(",")[6]));
wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(",")[7]));
}
workerGroups.add(wg);
}

10
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

@ -70,13 +70,13 @@ public class WorkerGroupServiceTest {
workerGroupStrList.add("test");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList);
List<String> defaultIpList = new ArrayList<>();
defaultIpList.add("192.168.220.188:1234:100:1234567");
defaultIpList.add("192.168.220.189:1234:100:1234567");
List<String> defaultAddressList = new ArrayList<>();
defaultAddressList.add("192.168.220.188:1234");
defaultAddressList.add("192.168.220.189:1234");
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultIpList);
Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultAddressList);
Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultIpList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238");
Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238");
}
/**

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -385,6 +385,10 @@ public final class Constants {
*/
public static final double DEFAULT_WORKER_RESERVED_MEMORY = OSUtils.totalMemorySize() / 10;
/**
* worker host weight
*/
public static final int DEFAULT_WORKER_HOST_WEIGHT = 100;
/**
* default log cache rows num,output when reach the number
@ -542,7 +546,7 @@ public final class Constants {
* heartbeat for zk info length
*/
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
public static final int HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH = 11;
/**
* jar

58
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java

@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.model.Server;
@ -38,10 +40,8 @@ public class ResInfo {
*/
private double loadAverage;
public ResInfo(){}
public ResInfo(double cpuUsage , double memoryUsage){
this.cpuUsage = cpuUsage ;
public ResInfo(double cpuUsage, double memoryUsage) {
this.cpuUsage = cpuUsage;
this.memoryUsage = memoryUsage;
}
@ -81,35 +81,53 @@ public class ResInfo {
* @param loadAverage load average
* @return cpu and memory usage
*/
public static String getResInfoJson(double cpuUsage , double memoryUsage,double loadAverage){
public static String getResInfoJson(double cpuUsage, double memoryUsage, double loadAverage) {
ResInfo resInfo = new ResInfo(cpuUsage,memoryUsage,loadAverage);
return JSONUtils.toJsonString(resInfo);
}
/**
* parse heartbeat info for zk
* @param heartBeatInfo heartbeat info
* @return heartbeat info to Server
*/
public static Server parseHeartbeatForZKInfo(String heartBeatInfo){
if (StringUtils.isEmpty(heartBeatInfo)) {
public static Server parseHeartbeatForZKInfo(String heartBeatInfo) {
if (!isValidHeartbeatForZKInfo(heartBeatInfo)) {
return null;
}
String[] masterArray = heartBeatInfo.split(Constants.COMMA);
if(masterArray.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
return null;
String[] parts = heartBeatInfo.split(Constants.COMMA);
Server server = new Server();
server.setResInfo(getResInfoJson(Double.parseDouble(parts[0]),
Double.parseDouble(parts[1]),
Double.parseDouble(parts[2])));
server.setCreateTime(DateUtils.stringToDate(parts[6]));
server.setLastHeartbeatTime(DateUtils.stringToDate(parts[7]));
//set process id
server.setId(Integer.parseInt(parts[9]));
return server;
}
/**
* is valid heartbeat info for zk
* @param heartBeatInfo heartbeat info
* @return heartbeat info is valid
*/
public static boolean isValidHeartbeatForZKInfo(String heartBeatInfo) {
if (StringUtils.isNotEmpty(heartBeatInfo)) {
String[] parts = heartBeatInfo.split(Constants.COMMA);
return parts.length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH
|| parts.length == Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
}
Server masterServer = new Server();
masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[0]),
Double.parseDouble(masterArray[1]),
Double.parseDouble(masterArray[2])));
masterServer.setCreateTime(DateUtils.stringToDate(masterArray[6]));
masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[7]));
//set process id
masterServer.setId(Integer.parseInt(masterArray[9]));
return masterServer;
return false;
}
/**
* is new heartbeat info for zk with weight
* @param parts heartbeat info parts
* @return heartbeat info is new with weight
*/
public static boolean isNewHeartbeatWithWeight(String[] parts) {
return parts.length == Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH;
}
}

143
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/utils/Host.java

@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.remote.utils;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import java.io.Serializable;
import java.util.Objects;
import java.util.StringJoiner;
/**
* server address
@ -43,21 +41,6 @@ public class Host implements Serializable {
*/
private int port;
/**
* weight
*/
private int weight;
/**
* startTime
*/
private long startTime;
/**
* workGroup
*/
private String workGroup;
public Host() {
}
@ -67,21 +50,11 @@ public class Host implements Serializable {
this.address = ip + COLON + port;
}
public Host(String ip, int port, int weight, long startTime) {
this.ip = ip;
this.port = port;
this.address = ip + COLON + port;
this.weight = getWarmUpWeight(weight, startTime);
this.startTime = startTime;
}
public Host(String ip, int port, int weight, long startTime, String workGroup) {
this.ip = ip;
this.port = port;
this.address = ip + COLON + port;
this.weight = getWarmUpWeight(weight, startTime);
this.workGroup = workGroup;
this.startTime = startTime;
public Host(String address) {
String[] parts = splitAddress(address);
this.ip = parts[0];
this.port = Integer.parseInt(parts[1]);
this.address = address;
}
public String getAddress() {
@ -89,6 +62,9 @@ public class Host implements Serializable {
}
public void setAddress(String address) {
String[] parts = splitAddress(address);
this.ip = parts[0];
this.port = Integer.parseInt(parts[1]);
this.address = address;
}
@ -101,22 +77,6 @@ public class Host implements Serializable {
this.address = ip + COLON + port;
}
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public int getPort() {
return port;
}
@ -126,12 +86,15 @@ public class Host implements Serializable {
this.address = ip + COLON + port;
}
public String getWorkGroup() {
return workGroup;
}
public void setWorkGroup(String workGroup) {
this.workGroup = workGroup;
/**
* address convert host
*
* @param address address
* @return host
*/
public static Host of(String address) {
String[] parts = splitAddress(address);
return new Host(parts[0], Integer.parseInt(parts[1]));
}
/**
@ -140,37 +103,15 @@ public class Host implements Serializable {
* @param address address
* @return host
*/
public static Host of(String address) {
public static String[] splitAddress(String address) {
if (address == null) {
throw new IllegalArgumentException("Host : address is null.");
}
String[] parts = address.split(COLON);
if (parts.length < 2) {
if (parts.length != 2) {
throw new IllegalArgumentException(String.format("Host : %s illegal.", address));
}
Host host = null;
if (parts.length == 2) {
host = new Host(parts[0], Integer.parseInt(parts[1]));
}
if (parts.length == 4) {
host = new Host(parts[0], Integer.parseInt(parts[1]), Integer.parseInt(parts[2]), Long.parseLong(parts[3]));
}
return host;
}
/**
* generate host string
* @param address address
* @param weight weight
* @param startTime startTime
* @return address:weight:startTime
*/
public static String generate(String address, int weight, long startTime) {
StringJoiner stringJoiner = new StringJoiner(COLON);
stringJoiner.add(address)
.add(String.valueOf(weight))
.add(String.valueOf(startTime));
return stringJoiner.toString();
return parts;
}
/**
@ -181,54 +122,16 @@ public class Host implements Serializable {
*/
public static Boolean isOldVersion(String address) {
String[] parts = address.split(COLON);
return parts.length != 2 && parts.length != 3;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Host host = (Host) o;
return Objects.equals(getAddress(), host.getAddress());
}
@Override
public int hashCode() {
return Objects.hash(getAddress());
return parts.length != 2;
}
@Override
public String toString() {
return "Host{"
+ "address='" + address + '\''
+ ", weight=" + weight
+ ", startTime=" + startTime
+ ", workGroup='" + workGroup + '\''
+ ", ip='" + ip + '\''
+ ", port=" + port
+ '}';
}
/**
* warm up
*/
private int getWarmUpWeight(int weight, long startTime) {
long uptime = System.currentTimeMillis() - startTime;
//If the warm-up is not over, reduce the weight
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
return (int) (weight * ((float) uptime / Constants.WARM_UP_TIME));
}
return weight;
}
/**
* get address and weight
*
* @return address:weight
*/
public String getAddressAndWeight() {
return address + COLON + weight;
}
}

13
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyRemotingClientTest.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.remote;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.Ping;
@ -28,23 +27,25 @@ import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Test;
import io.netty.channel.Channel;
/**
* netty remote client test
*/
public class NettyRemotingClientTest {
/**
* test sned sync
* test send sync
*/
@Test
public void testSendSync(){
public void testSendSync() {
NettyServerConfig serverConfig = new NettyServerConfig();
NettyRemotingServer server = new NettyRemotingServer(serverConfig);

12
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/ResponseFutureTest.java → dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/future/ResponseFutureTest.java

@ -15,24 +15,24 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote;
package org.apache.dolphinscheduler.remote.command.future;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
public class ResponseFutureTest {
@Test
public void testScanFutureTable(){
public void testScanFutureTable() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("executor-service"));
executorService.scheduleAtFixedRate(new Runnable() {
@Override
@ -51,7 +51,7 @@ public class ResponseFutureTest {
ResponseFuture future = new ResponseFuture(1, 2000, invokeCallback, null);
try {
latch.await(5000, TimeUnit.MILLISECONDS);
Assert.assertTrue(ResponseFuture.getFuture(1) == null);
Assert.assertNull(ResponseFuture.getFuture(1));
} catch (InterruptedException e) {
e.printStackTrace();
}

10
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogRequestCommandTest.java → dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogRequestCommandTest.java

@ -15,18 +15,18 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote;
package org.apache.dolphinscheduler.remote.command.log;
import junit.framework.Assert;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogResponseCommand;
import org.junit.Test;
import junit.framework.Assert;
public class RemoveTaskLogRequestCommandTest {
@Test
public void testConvert2Command(){
public void testConvert2Command() {
RemoveTaskLogResponseCommand removeTaskLogResponseCommand = new RemoveTaskLogResponseCommand();
removeTaskLogResponseCommand.setStatus(true);
Command command = removeTaskLogResponseCommand.convert2Command(122);

9
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/RemoveTaskLogResponseCommandTest.java → dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/command/log/RemoveTaskLogResponseCommandTest.java

@ -15,17 +15,18 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote;
package org.apache.dolphinscheduler.remote.command.log;
import junit.framework.Assert;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.log.RemoveTaskLogRequestCommand;
import org.junit.Test;
import junit.framework.Assert;
public class RemoveTaskLogResponseCommandTest {
@Test
public void testConvert2Command(){
public void testConvert2Command() {
RemoveTaskLogRequestCommand removeTaskLogRequestCommand = new RemoveTaskLogRequestCommand();
removeTaskLogRequestCommand.setPath("/opt/zhangsan");
Command command = removeTaskLogRequestCommand.convert2Command();

23
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/HostTest.java → dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/HostTest.java

@ -15,9 +15,7 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.utils;
import org.apache.dolphinscheduler.remote.utils.Host;
package org.apache.dolphinscheduler.remote.utils;
import org.junit.Assert;
import org.junit.Test;
@ -27,26 +25,13 @@ import org.junit.Test;
*/
public class HostTest {
@Test
public void testHostWarmUp() {
Host host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)));
Assert.assertEquals(50, host.getWeight());
host = Host.of(("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 10 * 1000)));
Assert.assertEquals(100, host.getWeight());
}
@Test
public void testHost() {
Host host = Host.of("192.158.2.2:22");
Assert.assertEquals(22, host.getPort());
host.setAddress("127.0.0.1:8888");
Assert.assertEquals("127.0.0.1", host.getIp());
Assert.assertEquals(8888, host.getPort());
}
@Test
public void testGenerate() {
String address = "192.158.2.2:22";
int weight = 100;
long startTime = System.currentTimeMillis();
String generateHost = Host.generate(address, weight, startTime);
Assert.assertEquals(address + ":" + weight + ":" + startTime, generateHost);
}
}

5
dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/NettyUtilTest.java → dolphinscheduler-remote/src/test/java/org/apache/dolphinscheduler/remote/utils/NettyUtilTest.java

@ -15,12 +15,10 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote;
package org.apache.dolphinscheduler.remote.utils;
import static org.apache.dolphinscheduler.remote.utils.Constants.OS_NAME;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.junit.Assert;
import org.junit.Test;
@ -31,7 +29,6 @@ import io.netty.channel.epoll.Epoll;
*/
public class NettyUtilTest {
@Test
public void testUserEpoll() {
if (OS_NAME.toLowerCase().contains("linux") && Epoll.isAvailable()) {

48
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java

@ -17,26 +17,32 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
/**
* round robin host manager
*/
public abstract class CommonHostManager implements HostManager {
private final Logger logger = LoggerFactory.getLogger(CommonHostManager.class);
/**
* zookeeper registry center
*/
@Autowired
protected ZookeeperRegistryCenter registryCenter;
/**
* zookeeperNodeManager
@ -50,16 +56,15 @@ public abstract class CommonHostManager implements HostManager {
* @return host
*/
@Override
public Host select(ExecutionContext context){
public Host select(ExecutionContext context) {
Host host = new Host();
Collection<String> nodes = null;
/**
* executor type
*/
String workerGroup = context.getWorkerGroup();
// executor type
ExecutorType executorType = context.getExecutorType();
switch (executorType){
switch (executorType) {
case WORKER:
nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup());
nodes = zookeeperNodeManager.getWorkerGroupNodes(workerGroup);
break;
case CLIENT:
break;
@ -67,21 +72,26 @@ public abstract class CommonHostManager implements HostManager {
throw new IllegalArgumentException("invalid executorType : " + executorType);
}
if(CollectionUtils.isEmpty(nodes)){
if (nodes == null || nodes.isEmpty()) {
return host;
}
List<Host> candidateHosts = new ArrayList<>(nodes.size());
List<HostWorker> candidateHosts = new ArrayList<>();
nodes.forEach(node -> {
Host nodeHost=Host.of(node);
nodeHost.setWorkGroup(context.getWorkerGroup());
candidateHosts.add(nodeHost);
String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup);
String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT;
if (StringUtils.isNotEmpty(heartbeat)) {
String[] parts = heartbeat.split(Constants.COMMA);
if (ResInfo.isNewHeartbeatWithWeight(parts)) {
hostWeight = Integer.parseInt(parts[10]);
}
}
candidateHosts.add(HostWorker.of(node, hostWeight, workerGroup));
});
return select(candidateHosts);
}
protected abstract Host select(Collection<Host> nodes);
protected abstract HostWorker select(Collection<HostWorker> nodes);
public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) {
this.zookeeperNodeManager = zookeeperNodeManager;

60
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -19,20 +19,20 @@ package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.ResInfo;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -40,8 +40,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dolphinscheduler.common.Constants.COMMA;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* round robin host manager
@ -50,12 +53,6 @@ public class LowerWeightHostManager extends CommonHostManager {
private final Logger logger = LoggerFactory.getLogger(LowerWeightHostManager.class);
/**
* zookeeper registry center
*/
@Autowired
private ZookeeperRegistryCenter registryCenter;
/**
* round robin host manager
*/
@ -82,7 +79,7 @@ public class LowerWeightHostManager extends CommonHostManager {
private ScheduledExecutorService executorService;
@PostConstruct
public void init(){
public void init() {
this.selector = new LowerWeightRoundRobin();
this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock();
@ -103,20 +100,20 @@ public class LowerWeightHostManager extends CommonHostManager {
* @return host
*/
@Override
public Host select(ExecutionContext context){
public Host select(ExecutionContext context) {
Set<HostWeight> workerHostWeights = getWorkerHostWeights(context.getWorkerGroup());
if(CollectionUtils.isNotEmpty(workerHostWeights)){
if (CollectionUtils.isNotEmpty(workerHostWeights)) {
return selector.select(workerHostWeights).getHost();
}
return new Host();
}
@Override
public Host select(Collection<Host> nodes) {
public HostWorker select(Collection<HostWorker> nodes) {
throw new UnsupportedOperationException("not support");
}
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights){
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights) {
lock.lock();
try {
workerHostWeightsMap.clear();
@ -126,7 +123,7 @@ public class LowerWeightHostManager extends CommonHostManager {
}
}
private Set<HostWeight> getWorkerHostWeights(String workerGroup){
private Set<HostWeight> getWorkerHostWeights(String workerGroup) {
lock.lock();
try {
return workerHostWeightsMap.get(workerGroup);
@ -135,7 +132,7 @@ public class LowerWeightHostManager extends CommonHostManager {
}
}
class RefreshResourceTask implements Runnable{
class RefreshResourceTask implements Runnable {
@Override
public void run() {
@ -143,35 +140,34 @@ public class LowerWeightHostManager extends CommonHostManager {
Map<String, Set<String>> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes();
Set<Map.Entry<String, Set<String>>> entries = workerGroupNodes.entrySet();
Map<String, Set<HostWeight>> workerHostWeights = new HashMap<>();
for(Map.Entry<String, Set<String>> entry : entries){
for (Map.Entry<String, Set<String>> entry : entries) {
String workerGroup = entry.getKey();
Set<String> nodes = entry.getValue();
String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup);
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for(String node : nodes){
for (String node : nodes) {
String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node);
if(StringUtils.isNotEmpty(heartbeat)
&& heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
String[] parts = heartbeat.split(COMMA);
if (ResInfo.isValidHeartbeatForZKInfo(heartbeat)) {
String[] parts = heartbeat.split(Constants.COMMA);
int status = Integer.parseInt(parts[8]);
if (status == Constants.ABNORMAL_NODE_STATUS){
if (status == Constants.ABNORMAL_NODE_STATUS) {
logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}",
Double.parseDouble(parts[3]) , Double.parseDouble(parts[2]));
continue;
}
double cpu = Double.parseDouble(parts[0]);
double memory = Double.parseDouble(parts[1]);
double loadAverage = Double.parseDouble(parts[2]);
HostWeight hostWeight = new HostWeight(Host.of(node), cpu, memory, loadAverage);
long startTime = DateUtils.stringToDate(parts[6]).getTime();
int weight = ResInfo.isNewHeartbeatWithWeight(parts) ? Integer.parseInt(parts[10]) : Constants.DEFAULT_WORKER_HOST_WEIGHT;
HostWeight hostWeight = new HostWeight(HostWorker.of(node, weight, workerGroup), cpu, memory, loadAverage, startTime);
hostWeights.add(hostWeight);
}
}
workerHostWeights.put(workerGroup, hostWeights);
}
syncWorkerHostWeight(workerHostWeights);
} catch (Throwable ex){
} catch (Throwable ex) {
logger.error("RefreshResourceTask error", ex);
}
}

11
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java

@ -17,13 +17,11 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RandomSelector;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import java.util.Collection;
/**
* round robin host manager
*/
@ -32,17 +30,18 @@ public class RandomHostManager extends CommonHostManager {
/**
* selector
*/
private final Selector<Host> selector;
private final RandomSelector selector;
/**
* set round robin
*/
public RandomHostManager(){
public RandomHostManager() {
this.selector = new RandomSelector();
}
@Override
public Host select(Collection<Host> nodes) {
public HostWorker select(Collection<HostWorker> nodes) {
return selector.select(nodes);
}
}

7
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java

@ -17,9 +17,8 @@
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.RoundRobinSelector;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector;
import java.util.Collection;
@ -32,7 +31,7 @@ public class RoundRobinHostManager extends CommonHostManager {
/**
* selector
*/
private final Selector<Host> selector;
private final RoundRobinSelector selector;
/**
* set round robin
@ -42,7 +41,7 @@ public class RoundRobinHostManager extends CommonHostManager {
}
@Override
public Host select(Collection<Host> nodes) {
public HostWorker select(Collection<HostWorker> nodes) {
return selector.select(nodes);
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/AbstractSelector.java

@ -23,7 +23,7 @@ import java.util.Collection;
/**
* AbstractSelector
*/
public abstract class AbstractSelector<T> implements Selector<T>{
public abstract class AbstractSelector<T> implements Selector<T> {
@Override
public T select(Collection<T> source) {

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostSelector.java

@ -28,9 +28,9 @@ public enum HostSelector {
LOWERWEIGHT;
public static HostSelector of(String selector){
for(HostSelector hs : values()){
if(hs.name().equalsIgnoreCase(selector)){
public static HostSelector of(String selector) {
for (HostSelector hs : values()) {
if (hs.name().equalsIgnoreCase(selector)) {
return hs;
}
}

44
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWeight.java

@ -31,57 +31,55 @@ public class HostWeight {
private final int LOAD_AVERAGE_FACTOR = 70;
private final Host host;
private final HostWorker hostWorker;
private final double weight;
private double currentWeight;
public HostWeight(Host host, double cpu, double memory, double loadAverage) {
this.weight = getWeight(cpu, memory, loadAverage, host);
this.host = host;
this.currentWeight = weight;
}
public double getCurrentWeight() {
return currentWeight;
public HostWeight(HostWorker hostWorker, double cpu, double memory, double loadAverage, long startTime) {
this.hostWorker = hostWorker;
this.weight = calculateWeight(cpu, memory, loadAverage, startTime);
this.currentWeight = this.weight;
}
public double getWeight() {
return weight;
}
public double getCurrentWeight() {
return currentWeight;
}
public void setCurrentWeight(double currentWeight) {
this.currentWeight = currentWeight;
}
public HostWorker getHostWorker() {
return hostWorker;
}
public Host getHost() {
return host;
return (Host)hostWorker;
}
@Override
public String toString() {
return "HostWeight{"
+ "host=" + host
+ "hostWorker=" + hostWorker
+ ", weight=" + weight
+ ", currentWeight=" + currentWeight
+ '}';
}
private double getWeight(double cpu, double memory, double loadAverage, Host host) {
double calculateWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;
return getWarmUpWeight(host, calculateWeight);
}
/**
* If the warm-up is not over, add the weight
*/
private double getWarmUpWeight(Host host, double weight) {
long startTime = host.getStartTime();
private double calculateWeight(double cpu, double memory, double loadAverage, long startTime) {
double calculatedWeight = cpu * CPU_FACTOR + memory * MEMORY_FACTOR + loadAverage * LOAD_AVERAGE_FACTOR;
long uptime = System.currentTimeMillis() - startTime;
if (uptime > 0 && uptime < Constants.WARM_UP_TIME) {
return weight * Constants.WARM_UP_TIME / uptime;
// If the warm-up is not over, add the weight
return calculatedWeight * Constants.WARM_UP_TIME / uptime;
}
return weight;
return calculatedWeight;
}
}

77
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorker.java

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
/**
* host worker
*/
public class HostWorker extends Host {
/**
* host weight
*/
private int hostWeight;
/**
* worker group
*/
private String workerGroup;
public HostWorker(String ip, int port, int hostWeight, String workerGroup) {
super(ip, port);
this.hostWeight = hostWeight;
this.workerGroup = workerGroup;
}
public HostWorker(String address, int hostWeight, String workerGroup) {
super(address);
this.hostWeight = hostWeight;
this.workerGroup = workerGroup;
}
public int getHostWeight() {
return hostWeight;
}
public void setHostWeight(int hostWeight) {
this.hostWeight = hostWeight;
}
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public static HostWorker of(String address, int hostWeight, String workerGroup) {
return new HostWorker(address, hostWeight, workerGroup);
}
@Override
public String toString() {
return "Host{"
+ "hostWeight=" + hostWeight
+ ", workerGroup='" + workerGroup + '\''
+ '}';
}
}

14
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelector.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@ -27,20 +25,20 @@ import java.util.concurrent.ThreadLocalRandom;
/**
* random selector
*/
public class RandomSelector extends AbstractSelector<Host> {
public class RandomSelector extends AbstractSelector<HostWorker> {
@Override
public Host doSelect(final Collection<Host> source) {
public HostWorker doSelect(final Collection<HostWorker> source) {
List<Host> hosts = new ArrayList<>(source);
List<HostWorker> hosts = new ArrayList<>(source);
int size = hosts.size();
int[] weights = new int[size];
int totalWeight = 0;
int index = 0;
for (Host host : hosts) {
totalWeight += host.getWeight();
weights[index] = host.getWeight();
for (HostWorker host : hosts) {
totalWeight += host.getHostWeight();
weights[index] = host.getHostWeight();
index++;
}

27
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelector.java

@ -16,20 +16,21 @@
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.stereotype.Service;
/**
* Smooth Weight Round Robin
*/
@Service
public class RoundRobinSelector extends AbstractSelector<Host> {
public class RoundRobinSelector extends AbstractSelector<HostWorker> {
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> workGroupWeightMap = new ConcurrentHashMap<>();
@ -69,12 +70,11 @@ public class RoundRobinSelector extends AbstractSelector<Host> {
}
@Override
public Host doSelect(Collection<Host> source) {
public HostWorker doSelect(Collection<HostWorker> source) {
List<Host> hosts = new ArrayList<>(source);
String key = hosts.get(0).getWorkGroup();
List<HostWorker> hosts = new ArrayList<>(source);
String key = hosts.get(0).getWorkerGroup();
ConcurrentMap<String, WeightedRoundRobin> map = workGroupWeightMap.get(key);
if (map == null) {
workGroupWeightMap.putIfAbsent(key, new ConcurrentHashMap<>());
@ -84,13 +84,13 @@ public class RoundRobinSelector extends AbstractSelector<Host> {
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Host selectedHost = null;
HostWorker selectedHost = null;
WeightedRoundRobin selectWeightRoundRobin = null;
for (Host host : hosts) {
String workGroupHost = host.getWorkGroup() + host.getAddress();
for (HostWorker host : hosts) {
String workGroupHost = host.getWorkerGroup() + host.getAddress();
WeightedRoundRobin weightedRoundRobin = map.get(workGroupHost);
int weight = host.getWeight();
int weight = host.getHostWeight();
if (weight < 0) {
weight = 0;
}
@ -117,7 +117,6 @@ public class RoundRobinSelector extends AbstractSelector<Host> {
totalWeight += weight;
}
if (!updateLock.get() && hosts.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);

4
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -98,8 +98,8 @@ public class MasterRegistry {
});
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
masterConfig.getMasterReservedMemory(),
masterConfig.getMasterMaxCpuloadAvg(),
masterConfig.getMasterReservedMemory(),
Sets.newHashSet(getMasterPath()),
Constants.MASTER_PREFIX,
zookeeperRegistryCenter);
@ -132,9 +132,7 @@ public class MasterRegistry {
* get local address
*/
private String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
}
/**

30
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

@ -38,8 +38,9 @@ public class HeartBeatTask implements Runnable {
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
private String startTime;
private double reservedMemory;
private double maxCpuloadAvg;
private double reservedMemory;
private int hostWeight; // worker host weight
private Set<String> heartBeatPaths;
private String serverType;
private ZookeeperRegistryCenter zookeeperRegistryCenter;
@ -48,23 +49,38 @@ public class HeartBeatTask implements Runnable {
protected IStoppable stoppable = null;
public HeartBeatTask(String startTime,
double reservedMemory,
double maxCpuloadAvg,
double reservedMemory,
Set<String> heartBeatPaths,
String serverType,
ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.startTime = startTime;
this.reservedMemory = reservedMemory;
this.maxCpuloadAvg = maxCpuloadAvg;
this.reservedMemory = reservedMemory;
this.heartBeatPaths = heartBeatPaths;
this.serverType = serverType;
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
}
public HeartBeatTask(String startTime,
double maxCpuloadAvg,
double reservedMemory,
int hostWeight,
Set<String> heartBeatPaths,
String serverType,
ZookeeperRegistryCenter zookeeperRegistryCenter) {
this.startTime = startTime;
this.maxCpuloadAvg = maxCpuloadAvg;
this.reservedMemory = reservedMemory;
this.hostWeight = hostWeight;
this.heartBeatPaths = heartBeatPaths;
this.serverType = serverType;
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
}
@Override
public void run() {
try {
// check dead or not in zookeeper
for (String heartBeatPath : heartBeatPaths) {
if (zookeeperRegistryCenter.checkIsDeadServer(heartBeatPath, serverType)) {
@ -94,8 +110,12 @@ public class HeartBeatTask implements Runnable {
builder.append(startTime).append(Constants.COMMA);
builder.append(DateUtils.dateToString(new Date())).append(Constants.COMMA);
builder.append(status).append(COMMA);
//save process id
// save process id
builder.append(OSUtils.getProcessID());
// worker host weight
if (Constants.WORKER_PREFIX.equals(serverType)) {
builder.append(Constants.COMMA).append(hostWeight);
}
for (String heartBeatPath : heartBeatPaths) {
zookeeperRegistryCenter.getRegisterOperator().update(heartBeatPath, builder.toString());

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -47,7 +47,7 @@ public class WorkerConfig {
@Value("#{'${worker.groups:default}'.split(',')}")
private Set<String> workerGroups;
@Value("${worker.listen.port: 1234}")
@Value("${worker.listen.port:1234}")
private int listenPort;
@Value("${worker.host.weight:100}")
@ -119,8 +119,8 @@ public class WorkerConfig {
return hostWeight;
}
public void setHostWeight(int weight) {
this.hostWeight = weight;
public void setHostWeight(int hostWeight) {
this.hostWeight = hostWeight;
}
public String getAlertListenHost() {

24
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@ -34,6 +33,7 @@ import org.apache.curator.framework.state.ConnectionState;
import java.util.Date;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ -120,12 +120,13 @@ public class WorkerRegistry {
logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath);
}
HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime,
this.workerConfig.getWorkerReservedMemory(),
this.workerConfig.getWorkerMaxCpuloadAvg(),
HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
workerConfig.getWorkerMaxCpuloadAvg(),
workerConfig.getWorkerReservedMemory(),
workerConfig.getHostWeight(),
workerZkPaths,
Constants.WORKER_PREFIX,
this.zookeeperRegistryCenter);
zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval);
@ -150,22 +151,19 @@ public class WorkerRegistry {
*/
public Set<String> getWorkerZkPaths() {
Set<String> workerZkPaths = Sets.newHashSet();
String address = getLocalAddress();
String workerZkPathPrefix = this.zookeeperRegistryCenter.getWorkerPath();
int weight = workerConfig.getHostWeight();
long workerStartTime = System.currentTimeMillis();
for (String workGroup : this.workerGroups) {
StringBuilder workerZkPathBuilder = new StringBuilder(100);
workerZkPathBuilder.append(workerZkPathPrefix).append(SLASH);
StringJoiner workerZkPathJoiner = new StringJoiner(SLASH);
workerZkPathJoiner.add(workerZkPathPrefix);
if (StringUtils.isEmpty(workGroup)) {
workGroup = DEFAULT_WORKER_GROUP;
}
// trim and lower case is need
workerZkPathBuilder.append(workGroup.trim().toLowerCase()).append(SLASH);
workerZkPathBuilder.append(Host.generate(address, weight, workerStartTime));
workerZkPaths.add(workerZkPathBuilder.toString());
workerZkPathJoiner.add(workGroup.trim().toLowerCase());
workerZkPathJoiner.add(address);
workerZkPaths.add(workerZkPathJoiner.toString());
}
return workerZkPaths;
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java

@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.MasterServer;
@ -308,7 +307,6 @@ public class ZKMasterClient extends AbstractZKClient {
* @throws Exception exception
*/
private void failoverWorker(String workerHost, boolean needCheckWorkerAlive) throws Exception {
workerHost = Host.of(workerHost).getAddress();
logger.info("start worker[{}] failover ...", workerHost);
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {

43
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/HostWorkerTest.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.junit.Assert;
import org.junit.Test;
public class HostWorkerTest {
@Test
public void testHostWorker1() {
HostWorker hostWorker = new HostWorker("192.158.2.2", 11, 20, "default");
Assert.assertEquals("192.158.2.2", hostWorker.getIp());
Assert.assertEquals(11, hostWorker.getPort());
Assert.assertEquals(20, hostWorker.getHostWeight());
Assert.assertEquals("default", hostWorker.getWorkerGroup());
}
@Test
public void testHostWorker2() {
HostWorker hostWorker = HostWorker.of("192.158.2.2:22", 80, "default");
Assert.assertEquals("192.158.2.2", hostWorker.getIp());
Assert.assertEquals(22, hostWorker.getPort());
Assert.assertEquals(80, hostWorker.getHostWeight());
Assert.assertEquals("default", hostWorker.getWorkerGroup());
}
}

22
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java

@ -17,24 +17,20 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import org.junit.Assert;
import org.junit.Test;
public class LowerWeightRoundRobinTest {
@Test
public void testSelect() {
Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.56, 3.24));
sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 2 * 1000)), 0.06, 0.80, 3.15));
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.56, 3.24, System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.80, 3.15, System.currentTimeMillis() - 60 * 2 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;
@ -55,10 +51,10 @@ public class LowerWeightRoundRobinTest {
@Test
public void testWarmUpSelect() {
Collection<HostWeight> sources = new ArrayList<>();
sources.add(new HostWeight(Host.of("192.158.2.1:11:100:" + (System.currentTimeMillis() - 60 * 8 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.2:22:100:" + (System.currentTimeMillis() - 60 * 5 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.3:33:100:" + (System.currentTimeMillis() - 60 * 3 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(Host.of("192.158.2.4:33:100:" + (System.currentTimeMillis() - 60 * 11 * 1000)), 0.06, 0.44, 3.84));
sources.add(new HostWeight(HostWorker.of("192.158.2.1:11", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 8 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.2:22", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 5 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.3:33", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 3 * 1000));
sources.add(new HostWeight(HostWorker.of("192.158.2.4:33", 100, "default"), 0.06, 0.44, 3.84, System.currentTimeMillis() - 60 * 11 * 1000));
LowerWeightRoundRobin roundRobin = new LowerWeightRoundRobin();
HostWeight result;

10
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RandomSelectorTest.java

@ -17,14 +17,11 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
/**
* random selector
*/
@ -39,15 +36,14 @@ public class RandomSelectorTest {
@Test
public void testSelect1() {
RandomSelector selector = new RandomSelector();
Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.2", 80, 20, System.currentTimeMillis())));
HostWorker result = selector.select(Arrays.asList(new HostWorker("192.168.1.1:11", 100, "default"), new HostWorker("192.168.1.2:22", 80, "default")));
Assert.assertNotNull(result);
}
@Test
public void testSelect() {
RandomSelector selector = new RandomSelector();
Host result = selector.select(Arrays.asList(new Host("192.168.1.1", 80, 100, System.currentTimeMillis()), new Host("192.168.1.1", 80, 20, System.currentTimeMillis())));
HostWorker result = selector.select(Arrays.asList(new HostWorker("192.168.1.1", 11, 100, "default"), new HostWorker("192.168.1.2:", 22, 20, "default")));
Assert.assertNotNull(result);
}
}

36
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/assign/RoundRobinSelectorTest.java

@ -17,14 +17,12 @@
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import org.apache.dolphinscheduler.remote.utils.Host;
import java.util.Arrays;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
/**
* round robin selector
*/
@ -39,18 +37,16 @@ public class RoundRobinSelectorTest {
@Test
public void testSelect1() {
RoundRobinSelector selector = new RoundRobinSelector();
// dismiss of server warm-up time
long startTime = System.currentTimeMillis() - 60 * 10 * 1000;
List<Host> hostOneList = Arrays.asList(
new Host("192.168.1.1", 80, 20, startTime, "kris"),
new Host("192.168.1.2", 80, 10, startTime, "kris"));
List<Host> hostTwoList = Arrays.asList(
new Host("192.168.1.1", 80, 20, startTime, "kris"),
new Host("192.168.1.2", 80, 10, startTime, "kris"),
new Host("192.168.1.3", 80, 10, startTime, "kris"));
Host result;
List<HostWorker> hostOneList = Arrays.asList(
new HostWorker("192.168.1.1", 80, 20, "kris"),
new HostWorker("192.168.1.2", 80, 10, "kris"));
List<HostWorker> hostTwoList = Arrays.asList(
new HostWorker("192.168.1.1", 80, 20, "kris"),
new HostWorker("192.168.1.2", 80, 10, "kris"),
new HostWorker("192.168.1.3", 80, 10, "kris"));
HostWorker result;
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.1", result.getIp());
@ -93,17 +89,15 @@ public class RoundRobinSelectorTest {
result = selector.select(hostOneList);
Assert.assertEquals("192.168.1.1", result.getIp());
}
@Test
public void testWarmUpRoundRobinSelector() {
public void testWeightRoundRobinSelector() {
RoundRobinSelector selector = new RoundRobinSelector();
Host result;
HostWorker result;
result = selector.select(
Arrays.asList(new Host("192.168.1.1", 80, 20, System.currentTimeMillis() - 60 * 1000 * 2, "kris"), new Host("192.168.1.2", 80, 10, System.currentTimeMillis() - 60 * 1000 * 10, "kris")));
Arrays.asList(new HostWorker("192.168.1.1", 11, 20, "kris"), new HostWorker("192.168.1.2", 22, 80, "kris")));
Assert.assertEquals("192.168.1.2", result.getIp());
}
}

3
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
@ -59,7 +60,7 @@ public class MasterRegistryTest {
masterRegistry.registry();
String masterPath = zookeeperRegistryCenter.getMasterPath();
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort());
String masterNodePath = masterPath + "/" + (NetUtils.getAddr(Constants.LOCAL_ADDRESS, masterConfig.getListenPort()));
String heartbeat = zookeeperRegistryCenter.getRegisterOperator().get(masterNodePath);
Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length);
masterRegistry.unRegistry();

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java

@ -69,10 +69,10 @@ public class RegisterOperator extends ZookeeperCachedOperator {
}
/**
* get host ip, string format: masterParentPath/ip
* get host ip:port, string format: parentPath/ip:port
*
* @param path path
* @return host ip, string format: masterParentPath/ip
* @return host ip:port, string format: parentPath/ip:port
*/
protected String getHostByEventDataPath(String path) {
if (StringUtils.isEmpty(path)) {

16
pom.xml

@ -896,15 +896,15 @@
<include>**/dao/datasource/MySQLDataSourceTest.java</include>
<include>**/dao/entity/TaskInstanceTest.java</include>
<include>**/dao/entity/UdfFuncTest.java</include>
<include>**/remote/JsonSerializerTest.java</include>
<include>**/remote/RemoveTaskLogResponseCommandTest.java</include>
<include>**/rpc/RpcTest.java</include>
<include>**/remote/RemoveTaskLogRequestCommandTest.java</include>
<include>**/remote/NettyRemotingClientTest.java</include>
<include>**/remote/NettyUtilTest.java</include>
<include>**/remote/ResponseFutureTest.java</include>
<include>**/remote/command/alert/AlertSendRequestCommandTest.java</include>
<include>**/remote/command/alert/AlertSendResponseCommandTest.java</include>
<include>**/remote/command/future/ResponseFutureTest.java</include>
<include>**/remote/command/log/RemoveTaskLogRequestCommandTest.java</include>
<include>**/remote/command/log/RemoveTaskLogResponseCommandTest.java</include>
<include>**/remote/utils/HostTest.java</include>
<include>**/remote/utils/NettyUtilTest.java</include>
<include>**/remote/NettyRemotingClientTest.java</include>
<include>**/rpc/RpcTest.java</include>
<include>**/server/log/LoggerServerTest.java</include>
<include>**/server/entity/SQLTaskExecutionContextTest.java</include>
<include>**/server/log/MasterLogFilterTest.java</include>
@ -919,6 +919,7 @@
<include>**/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java</include>
<include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include>
<include>**/server/master/dispatch/host/assign/RoundRobinSelectorTest.java</include>
<include>**/server/master/dispatch/host/assign/HostWorkerTest.java</include>
<include>**/server/master/register/MasterRegistryTest.java</include>
<include>**/server/master/dispatch/host/assign/RoundRobinHostManagerTest.java</include>
<include>**/server/master/AlertManagerTest.java</include>
@ -935,7 +936,6 @@
<include>**/server/register/ZookeeperRegistryCenterTest.java</include>
<include>**/server/utils/DataxUtilsTest.java</include>
<include>**/server/utils/ExecutionContextTestUtils.java</include>
<include>**/server/utils/HostTest.java</include>
<include>**/server/utils/FlinkArgsUtilsTest.java</include>
<include>**/server/utils/LogUtilsTest.java</include>
<include>**/server/utils/MapReduceArgsUtilsTest.java</include>

Loading…
Cancel
Save