Browse Source

master select worker filter high load worker #2704 (#2733)

* add LoggerServerTest UT

* add LoggerServerTest UT

* add LoggerServerTest UT
add RemoveTaskLogRequestCommandTest UT
add RemoveTaskLogResponseCommandTest

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

* master select worker filter high load worker #2704

Co-authored-by: qiaozhanwei <qiaozhanwei@analysys.com.cn>
pull/3/MERGE
qiaozhanwei 5 years ago committed by GitHub
parent
commit
2330cc8872
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  2. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java
  3. 25
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  4. 31
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java
  5. 82
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java
  6. 2
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  7. 31
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java
  8. 4
      dolphinscheduler-server/src/main/resources/master.properties
  9. 4
      dolphinscheduler-server/src/main/resources/worker.properties
  10. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java
  11. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java

6
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -507,7 +507,7 @@ public final class Constants {
/** /**
* heartbeat for zk info length * heartbeat for zk info length
*/ */
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 5; public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 9;
/** /**
@ -961,4 +961,8 @@ public final class Constants {
*/ */
public static final String PLUGIN_JAR_SUFFIX = ".jar"; public static final String PLUGIN_JAR_SUFFIX = ".jar";
public static final int NORAML_NODE_STATUS = 0;
public static final int ABNORMAL_NODE_STATUS = 1;
} }

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java

@ -107,8 +107,8 @@ public class ResInfo {
masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[0]), masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[0]),
Double.parseDouble(masterArray[1]), Double.parseDouble(masterArray[1]),
Double.parseDouble(masterArray[2]))); Double.parseDouble(masterArray[2])));
masterServer.setCreateTime(DateUtils.stringToDate(masterArray[3])); masterServer.setCreateTime(DateUtils.stringToDate(masterArray[6]));
masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[4])); masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[7]));
return masterServer; return masterServer;
} }

25
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host; package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
@ -68,7 +69,7 @@ public class LowerWeightHostManager extends CommonHostManager {
/** /**
* worker host weights * worker host weights
*/ */
private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeights; private ConcurrentHashMap<String, Set<HostWeight>> workerHostWeightsMap;
/** /**
* worker group host lock * worker group host lock
@ -83,7 +84,7 @@ public class LowerWeightHostManager extends CommonHostManager {
@PostConstruct @PostConstruct
public void init(){ public void init(){
this.selector = new LowerWeightRoundRobin(); this.selector = new LowerWeightRoundRobin();
this.workerHostWeights = new ConcurrentHashMap<>(); this.workerHostWeightsMap = new ConcurrentHashMap<>();
this.lock = new ReentrantLock(); this.lock = new ReentrantLock();
this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor")); this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor"));
this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS); this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS);
@ -106,9 +107,8 @@ public class LowerWeightHostManager extends CommonHostManager {
Set<HostWeight> workerHostWeights = getWorkerHostWeights(context.getWorkerGroup()); Set<HostWeight> workerHostWeights = getWorkerHostWeights(context.getWorkerGroup());
if(CollectionUtils.isNotEmpty(workerHostWeights)){ if(CollectionUtils.isNotEmpty(workerHostWeights)){
return selector.select(workerHostWeights).getHost(); return selector.select(workerHostWeights).getHost();
} else{
return roundRobinHostManager.select(context);
} }
return new Host();
} }
@Override @Override
@ -119,8 +119,8 @@ public class LowerWeightHostManager extends CommonHostManager {
private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights){ private void syncWorkerHostWeight(Map<String, Set<HostWeight>> workerHostWeights){
lock.lock(); lock.lock();
try { try {
workerHostWeights.clear(); workerHostWeightsMap.clear();
workerHostWeights.putAll(workerHostWeights); workerHostWeightsMap.putAll(workerHostWeights);
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -129,7 +129,7 @@ public class LowerWeightHostManager extends CommonHostManager {
private Set<HostWeight> getWorkerHostWeights(String workerGroup){ private Set<HostWeight> getWorkerHostWeights(String workerGroup){
lock.lock(); lock.lock();
try { try {
return workerHostWeights.get(workerGroup); return workerHostWeightsMap.get(workerGroup);
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -150,8 +150,17 @@ public class LowerWeightHostManager extends CommonHostManager {
Set<HostWeight> hostWeights = new HashSet<>(nodes.size()); Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for(String node : nodes){ for(String node : nodes){
String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node); String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node);
if(StringUtils.isNotEmpty(heartbeat) && heartbeat.contains(COMMA) && heartbeat.split(COMMA).length == 5){ if(StringUtils.isNotEmpty(heartbeat)
&& heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
String[] parts = heartbeat.split(COMMA); String[] parts = heartbeat.split(COMMA);
int status = Integer.parseInt(parts[8]);
if (status == Constants.ABNORMAL_NODE_STATUS){
logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}",
Double.parseDouble(parts[3]) , Double.parseDouble(parts[2]));
continue;
}
double cpu = Double.parseDouble(parts[0]); double cpu = Double.parseDouble(parts[0]);
double memory = Double.parseDouble(parts[1]); double memory = Double.parseDouble(parts[1]);
double loadAverage = Double.parseDouble(parts[2]); double loadAverage = Double.parseDouble(parts[2]);

31
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -95,7 +96,13 @@ public class MasterRegistry {
} }
}); });
int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval();
this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
masterConfig.getMasterReservedMemory(),
masterConfig.getMasterMaxCpuloadAvg(),
getMasterPath(),
zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS);
logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval);
} }
@ -126,26 +133,4 @@ public class MasterRegistry {
private String getLocalAddress(){ private String getLocalAddress(){
return OSUtils.getHost() + ":" + masterConfig.getListenPort(); return OSUtils.getHost() + ":" + masterConfig.getListenPort();
} }
/**
* hear beat task
*/
class HeartBeatTask implements Runnable{
@Override
public void run() {
try {
StringBuilder builder = new StringBuilder(100);
builder.append(OSUtils.cpuUsage()).append(COMMA);
builder.append(OSUtils.memoryUsage()).append(COMMA);
builder.append(OSUtils.loadAverage()).append(COMMA);
builder.append(startTime).append(COMMA);
builder.append(DateUtils.dateToString(new Date()));
String masterPath = getMasterPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().update(masterPath, builder.toString());
} catch (Throwable ex){
logger.error("error write master heartbeat info", ex);
}
}
}
} }

82
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.registry;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA;
public class HeartBeatTask extends Thread{
private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
private String startTime;
private double reservedMemory;
private double maxCpuloadAvg;
private String heartBeatPath;
private ZookeeperRegistryCenter zookeeperRegistryCenter;
public HeartBeatTask(String startTime,
double reservedMemory,
double maxCpuloadAvg,
String heartBeatPath,
ZookeeperRegistryCenter zookeeperRegistryCenter){
this.startTime = startTime;
this.reservedMemory = reservedMemory;
this.maxCpuloadAvg = maxCpuloadAvg;
this.heartBeatPath = heartBeatPath;
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
}
@Override
public void run() {
try {
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
double loadAverage = OSUtils.loadAverage();
int status = Constants.NORAML_NODE_STATUS;
if(availablePhysicalMemorySize < reservedMemory
|| loadAverage > maxCpuloadAvg){
logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize , loadAverage);
status = Constants.ABNORMAL_NODE_STATUS;
}
StringBuilder builder = new StringBuilder(100);
builder.append(OSUtils.cpuUsage()).append(COMMA);
builder.append(OSUtils.memoryUsage()).append(COMMA);
builder.append(OSUtils.loadAverage()).append(COMMA);
builder.append(OSUtils.availablePhysicalMemorySize()).append(Constants.COMMA);
builder.append(maxCpuloadAvg).append(Constants.COMMA);
builder.append(reservedMemory).append(Constants.COMMA);
builder.append(startTime).append(Constants.COMMA);
builder.append(DateUtils.dateToString(new Date())).append(Constants.COMMA);
builder.append(status);
zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString());
} catch (Throwable ex){
logger.error("error write heartbeat info", ex);
}
}
}

2
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -38,7 +38,7 @@ public class WorkerConfig {
@Value("${worker.max.cpuload.avg:-1}") @Value("${worker.max.cpuload.avg:-1}")
private int workerMaxCpuloadAvg; private int workerMaxCpuloadAvg;
@Value("${worker.reserved.memory:0.5}") @Value("${worker.reserved.memory:0.3}")
private double workerReservedMemory; private double workerReservedMemory;
@Value("${worker.group: default}") @Value("${worker.group: default}")

31
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java

@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.registry.HeartBeatTask;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -102,7 +103,13 @@ public class WorkerRegistry {
} }
}); });
int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval();
this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
HeartBeatTask heartBeatTask = new HeartBeatTask(startTime,
workerConfig.getWorkerReservedMemory(),
workerConfig.getWorkerMaxCpuloadAvg(),
getWorkerPath(),
zookeeperRegistryCenter);
this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS);
logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, workerHeartbeatInterval); logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, workerHeartbeatInterval);
} }
@ -143,26 +150,4 @@ public class WorkerRegistry {
private String getLocalAddress(){ private String getLocalAddress(){
return OSUtils.getHost() + ":" + workerConfig.getListenPort(); return OSUtils.getHost() + ":" + workerConfig.getListenPort();
} }
/**
* hear beat task
*/
class HeartBeatTask implements Runnable{
@Override
public void run() {
try {
StringBuilder builder = new StringBuilder(100);
builder.append(OSUtils.cpuUsage()).append(COMMA);
builder.append(OSUtils.memoryUsage()).append(COMMA);
builder.append(OSUtils.loadAverage()).append(COMMA);
builder.append(startTime).append(COMMA);
builder.append(DateUtils.dateToString(new Date()));
String workerPath = getWorkerPath();
zookeeperRegistryCenter.getZookeeperCachedOperator().update(workerPath, builder.toString());
} catch (Throwable ex){
logger.error("error write worker heartbeat info", ex);
}
}
}
} }

4
dolphinscheduler-server/src/main/resources/master.properties

@ -31,8 +31,8 @@
#master.task.commit.interval=1000 #master.task.commit.interval=1000
# only less than cpu avg load, master server can work. default value : the number of cpu cores * 2 # only less than cpu avg load, master server can work. default value -1 : the number of cpu cores * 2
#master.max.cpuload.avg=100 #master.max.cpuload.avg=-1
# only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G. # only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G.
#master.reserved.memory=0.3 #master.reserved.memory=0.3

4
dolphinscheduler-server/src/main/resources/worker.properties

@ -24,8 +24,8 @@
# submit the number of tasks at a time TODO # submit the number of tasks at a time TODO
#worker.fetch.task.num = 3 #worker.fetch.task.num = 3
# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2 # only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2
#worker.max.cpuload.avg=100 #worker.max.cpuload.avg= -1
# only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G. # only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G.
#worker.reserved.memory=0.3 #worker.reserved.memory=0.3

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java

@ -33,7 +33,7 @@ import org.springframework.test.context.junit4.SpringRunner;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
/** /**
* master registry test * master registry test
*/ */
@ -57,7 +57,7 @@ public class MasterRegistryTest {
TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node
String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort()); String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort());
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath); String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath);
Assert.assertEquals(5, heartbeat.split(",").length); Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length);
} }
@Test @Test

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java

@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit;
import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP;
import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH;
/** /**
* worker registry test * worker registry test
*/ */
@ -61,7 +63,7 @@ public class WorkerRegistryTest {
String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (OSUtils.getHost() + ":" + workerConfig.getListenPort()); String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (OSUtils.getHost() + ":" + workerConfig.getListenPort());
TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node
String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath); String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath);
Assert.assertEquals(5, heartbeat.split(",").length); Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length);
} }
@Test @Test

Loading…
Cancel
Save