From 2330cc8872ab7852cd91f0f64815c73679d59b9e Mon Sep 17 00:00:00 2001 From: qiaozhanwei Date: Tue, 19 May 2020 11:38:41 +0800 Subject: [PATCH] master select worker filter high load worker #2704 (#2733) * add LoggerServerTest UT * add LoggerServerTest UT * add LoggerServerTest UT add RemoveTaskLogRequestCommandTest UT add RemoveTaskLogResponseCommandTest * master select worker filter high load worker #2704 * master select worker filter high load worker #2704 * master select worker filter high load worker #2704 * master select worker filter high load worker #2704 * master select worker filter high load worker #2704 * master select worker filter high load worker #2704 Co-authored-by: qiaozhanwei --- .../dolphinscheduler/common/Constants.java | 6 +- .../common/utils/ResInfo.java | 4 +- .../dispatch/host/LowerWeightHostManager.java | 25 ++++-- .../master/registry/MasterRegistry.java | 31 ++----- .../server/registry/HeartBeatTask.java | 82 +++++++++++++++++++ .../server/worker/config/WorkerConfig.java | 2 +- .../worker/registry/WorkerRegistry.java | 31 ++----- .../src/main/resources/master.properties | 4 +- .../src/main/resources/worker.properties | 4 +- .../master/registry/MasterRegistryTest.java | 4 +- .../worker/registry/WorkerRegistryTest.java | 4 +- 11 files changed, 132 insertions(+), 65 deletions(-) create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index effa4f0f8e..d9589320ca 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -507,7 +507,7 @@ public final class Constants { /** * heartbeat for zk info length */ - public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 5; + public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 9; /** @@ -961,4 +961,8 @@ public final class Constants { */ public static final String PLUGIN_JAR_SUFFIX = ".jar"; + public static final int NORAML_NODE_STATUS = 0; + public static final int ABNORMAL_NODE_STATUS = 1; + + } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java index feadb68ee6..9c1d8806c4 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java @@ -107,8 +107,8 @@ public class ResInfo { masterServer.setResInfo(getResInfoJson(Double.parseDouble(masterArray[0]), Double.parseDouble(masterArray[1]), Double.parseDouble(masterArray[2]))); - masterServer.setCreateTime(DateUtils.stringToDate(masterArray[3])); - masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[4])); + masterServer.setCreateTime(DateUtils.stringToDate(masterArray[6])); + masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[7])); return masterServer; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index 99cae6954c..5989519c4c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; @@ -68,7 +69,7 @@ public class LowerWeightHostManager extends CommonHostManager { /** * worker host weights */ - private ConcurrentHashMap> workerHostWeights; + private ConcurrentHashMap> workerHostWeightsMap; /** * worker group host lock @@ -83,7 +84,7 @@ public class LowerWeightHostManager extends CommonHostManager { @PostConstruct public void init(){ this.selector = new LowerWeightRoundRobin(); - this.workerHostWeights = new ConcurrentHashMap<>(); + this.workerHostWeightsMap = new ConcurrentHashMap<>(); this.lock = new ReentrantLock(); this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor")); this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),35, 40, TimeUnit.SECONDS); @@ -106,9 +107,8 @@ public class LowerWeightHostManager extends CommonHostManager { Set workerHostWeights = getWorkerHostWeights(context.getWorkerGroup()); if(CollectionUtils.isNotEmpty(workerHostWeights)){ return selector.select(workerHostWeights).getHost(); - } else{ - return roundRobinHostManager.select(context); } + return new Host(); } @Override @@ -119,8 +119,8 @@ public class LowerWeightHostManager extends CommonHostManager { private void syncWorkerHostWeight(Map> workerHostWeights){ lock.lock(); try { - workerHostWeights.clear(); - workerHostWeights.putAll(workerHostWeights); + workerHostWeightsMap.clear(); + workerHostWeightsMap.putAll(workerHostWeights); } finally { lock.unlock(); } @@ -129,7 +129,7 @@ public class LowerWeightHostManager extends CommonHostManager { private Set getWorkerHostWeights(String workerGroup){ lock.lock(); try { - return workerHostWeights.get(workerGroup); + return workerHostWeightsMap.get(workerGroup); } finally { lock.unlock(); } @@ -150,8 +150,17 @@ public class LowerWeightHostManager extends CommonHostManager { Set hostWeights = new HashSet<>(nodes.size()); for(String node : nodes){ String heartbeat = registryCenter.getZookeeperCachedOperator().get(workerGroupPath + "/" + node); - if(StringUtils.isNotEmpty(heartbeat) && heartbeat.contains(COMMA) && heartbeat.split(COMMA).length == 5){ + if(StringUtils.isNotEmpty(heartbeat) + && heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ String[] parts = heartbeat.split(COMMA); + + int status = Integer.parseInt(parts[8]); + if (status == Constants.ABNORMAL_NODE_STATUS){ + logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", + Double.parseDouble(parts[3]) , Double.parseDouble(parts[2])); + continue; + } + double cpu = Double.parseDouble(parts[0]); double memory = Double.parseDouble(parts[1]); double loadAverage = Double.parseDouble(parts[2]); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index b6582981f2..de6d3bc2cf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,7 +96,13 @@ public class MasterRegistry { } }); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); - this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); + HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, + masterConfig.getMasterReservedMemory(), + masterConfig.getMasterMaxCpuloadAvg(), + getMasterPath(), + zookeeperRegistryCenter); + + this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); } @@ -126,26 +133,4 @@ public class MasterRegistry { private String getLocalAddress(){ return OSUtils.getHost() + ":" + masterConfig.getListenPort(); } - - /** - * hear beat task - */ - class HeartBeatTask implements Runnable{ - - @Override - public void run() { - try { - StringBuilder builder = new StringBuilder(100); - builder.append(OSUtils.cpuUsage()).append(COMMA); - builder.append(OSUtils.memoryUsage()).append(COMMA); - builder.append(OSUtils.loadAverage()).append(COMMA); - builder.append(startTime).append(COMMA); - builder.append(DateUtils.dateToString(new Date())); - String masterPath = getMasterPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().update(masterPath, builder.toString()); - } catch (Throwable ex){ - logger.error("error write master heartbeat info", ex); - } - } - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java new file mode 100644 index 0000000000..6d0eae9316 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.registry; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.OSUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Date; + +import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; + +public class HeartBeatTask extends Thread{ + + private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); + + private String startTime; + private double reservedMemory; + private double maxCpuloadAvg; + private String heartBeatPath; + private ZookeeperRegistryCenter zookeeperRegistryCenter; + + public HeartBeatTask(String startTime, + double reservedMemory, + double maxCpuloadAvg, + String heartBeatPath, + ZookeeperRegistryCenter zookeeperRegistryCenter){ + this.startTime = startTime; + this.reservedMemory = reservedMemory; + this.maxCpuloadAvg = maxCpuloadAvg; + this.heartBeatPath = heartBeatPath; + this.zookeeperRegistryCenter = zookeeperRegistryCenter; + } + + @Override + public void run() { + try { + + double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); + double loadAverage = OSUtils.loadAverage(); + + int status = Constants.NORAML_NODE_STATUS; + + if(availablePhysicalMemorySize < reservedMemory + || loadAverage > maxCpuloadAvg){ + logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize , loadAverage); + status = Constants.ABNORMAL_NODE_STATUS; + } + + StringBuilder builder = new StringBuilder(100); + builder.append(OSUtils.cpuUsage()).append(COMMA); + builder.append(OSUtils.memoryUsage()).append(COMMA); + builder.append(OSUtils.loadAverage()).append(COMMA); + builder.append(OSUtils.availablePhysicalMemorySize()).append(Constants.COMMA); + builder.append(maxCpuloadAvg).append(Constants.COMMA); + builder.append(reservedMemory).append(Constants.COMMA); + builder.append(startTime).append(Constants.COMMA); + builder.append(DateUtils.dateToString(new Date())).append(Constants.COMMA); + builder.append(status); + zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString()); + } catch (Throwable ex){ + logger.error("error write heartbeat info", ex); + } + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 7f4d93fdf8..1a31fa09fe 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -38,7 +38,7 @@ public class WorkerConfig { @Value("${worker.max.cpuload.avg:-1}") private int workerMaxCpuloadAvg; - @Value("${worker.reserved.memory:0.5}") + @Value("${worker.reserved.memory:0.3}") private double workerReservedMemory; @Value("${worker.group: default}") diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 4d723404a5..f7093a1ec7 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; +import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.slf4j.Logger; @@ -102,7 +103,13 @@ public class WorkerRegistry { } }); int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); - this.heartBeatExecutor.scheduleAtFixedRate(new HeartBeatTask(), workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); + + HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, + workerConfig.getWorkerReservedMemory(), + workerConfig.getWorkerMaxCpuloadAvg(), + getWorkerPath(), + zookeeperRegistryCenter); + this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); logger.info("worker node : {} registry to ZK successfully with heartBeatInterval : {}s", address, workerHeartbeatInterval); } @@ -143,26 +150,4 @@ public class WorkerRegistry { private String getLocalAddress(){ return OSUtils.getHost() + ":" + workerConfig.getListenPort(); } - - /** - * hear beat task - */ - class HeartBeatTask implements Runnable{ - - @Override - public void run() { - try { - StringBuilder builder = new StringBuilder(100); - builder.append(OSUtils.cpuUsage()).append(COMMA); - builder.append(OSUtils.memoryUsage()).append(COMMA); - builder.append(OSUtils.loadAverage()).append(COMMA); - builder.append(startTime).append(COMMA); - builder.append(DateUtils.dateToString(new Date())); - String workerPath = getWorkerPath(); - zookeeperRegistryCenter.getZookeeperCachedOperator().update(workerPath, builder.toString()); - } catch (Throwable ex){ - logger.error("error write worker heartbeat info", ex); - } - } - } } diff --git a/dolphinscheduler-server/src/main/resources/master.properties b/dolphinscheduler-server/src/main/resources/master.properties index 2f75aa50ad..f09f4693fc 100644 --- a/dolphinscheduler-server/src/main/resources/master.properties +++ b/dolphinscheduler-server/src/main/resources/master.properties @@ -31,8 +31,8 @@ #master.task.commit.interval=1000 -# only less than cpu avg load, master server can work. default value : the number of cpu cores * 2 -#master.max.cpuload.avg=100 +# only less than cpu avg load, master server can work. default value -1 : the number of cpu cores * 2 +#master.max.cpuload.avg=-1 # only larger than reserved memory, master server can work. default value : physical memory * 1/10, unit is G. #master.reserved.memory=0.3 diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties index 36bc132743..9bbf90102c 100644 --- a/dolphinscheduler-server/src/main/resources/worker.properties +++ b/dolphinscheduler-server/src/main/resources/worker.properties @@ -24,8 +24,8 @@ # submit the number of tasks at a time TODO #worker.fetch.task.num = 3 -# only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2 -#worker.max.cpuload.avg=100 +# only less than cpu avg load, worker server can work. default value -1: the number of cpu cores * 2 +#worker.max.cpuload.avg= -1 # only larger than reserved memory, worker server can work. default value : physical memory * 1/6, unit is G. #worker.reserved.memory=0.3 diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java index a482029a1e..9d90f20706 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryTest.java @@ -33,7 +33,7 @@ import org.springframework.test.context.junit4.SpringRunner; import java.util.List; import java.util.concurrent.TimeUnit; - +import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; /** * master registry test */ @@ -57,7 +57,7 @@ public class MasterRegistryTest { TimeUnit.SECONDS.sleep(masterConfig.getMasterHeartbeatInterval() + 2); //wait heartbeat info write into zk node String masterNodePath = masterPath + "/" + (Constants.LOCAL_ADDRESS + ":" + masterConfig.getListenPort()); String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(masterNodePath); - Assert.assertEquals(5, heartbeat.split(",").length); + Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length); } @Test diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java index d5f836e403..6ecff51f70 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryTest.java @@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit; import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; + +import static org.apache.dolphinscheduler.common.Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; /** * worker registry test */ @@ -61,7 +63,7 @@ public class WorkerRegistryTest { String instancePath = workerPath + "/" + workerConfig.getWorkerGroup().trim() + "/" + (OSUtils.getHost() + ":" + workerConfig.getListenPort()); TimeUnit.SECONDS.sleep(workerConfig.getWorkerHeartbeatInterval() + 2); //wait heartbeat info write into zk node String heartbeat = zookeeperRegistryCenter.getZookeeperCachedOperator().get(instancePath); - Assert.assertEquals(5, heartbeat.split(",").length); + Assert.assertEquals(HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH, heartbeat.split(",").length); } @Test