diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 2d695046ba..e662347a90 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -521,8 +521,7 @@ public final class Constants { /** * heartbeat for zk info length */ - public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10; - public static final int HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH = 11; + public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 13; /** * jar @@ -1029,6 +1028,7 @@ public final class Constants { public static final int NORMAL_NODE_STATUS = 0; public static final int ABNORMAL_NODE_STATUS = 1; + public static final int BUSY_NODE_STATUE = 2; public static final String START_TIME = "start time"; public static final String END_TIME = "end time"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java new file mode 100644 index 0000000000..bec0f759cd --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HeartBeat.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.utils; + +import org.apache.dolphinscheduler.common.Constants; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HeartBeat { + + private static final Logger logger = LoggerFactory.getLogger(HeartBeat.class); + public static final String COMMA = ","; + + private long startupTime; + private long reportTime; + private double cpuUsage; + private double memoryUsage; + private double loadAverage; + private double availablePhysicalMemorySize; + private double maxCpuloadAvg; + private double reservedMemory; + private int serverStatus; + private int processId; + + private int workerHostWeight; // worker host weight + private int workerWaitingTaskCount; // worker waiting task count + private int workerExecThreadCount; // worker thread pool thread count + + public long getStartupTime() { + return startupTime; + } + + public void setStartupTime(long startupTime) { + this.startupTime = startupTime; + } + + public long getReportTime() { + return reportTime; + } + + public void setReportTime(long reportTime) { + this.reportTime = reportTime; + } + + public double getCpuUsage() { + return cpuUsage; + } + + public void setCpuUsage(double cpuUsage) { + this.cpuUsage = cpuUsage; + } + + public double getMemoryUsage() { + return memoryUsage; + } + + public void setMemoryUsage(double memoryUsage) { + this.memoryUsage = memoryUsage; + } + + public double getLoadAverage() { + return loadAverage; + } + + public void setLoadAverage(double loadAverage) { + this.loadAverage = loadAverage; + } + + public double getAvailablePhysicalMemorySize() { + return availablePhysicalMemorySize; + } + + public void setAvailablePhysicalMemorySize(double availablePhysicalMemorySize) { + this.availablePhysicalMemorySize = availablePhysicalMemorySize; + } + + public double getMaxCpuloadAvg() { + return maxCpuloadAvg; + } + + public void setMaxCpuloadAvg(double maxCpuloadAvg) { + this.maxCpuloadAvg = maxCpuloadAvg; + } + + public double getReservedMemory() { + return reservedMemory; + } + + public void setReservedMemory(double reservedMemory) { + this.reservedMemory = reservedMemory; + } + + public int getServerStatus() { + return serverStatus; + } + + public void setServerStatus(int serverStatus) { + this.serverStatus = serverStatus; + } + + public int getProcessId() { + return processId; + } + + public void setProcessId(int processId) { + this.processId = processId; + } + + public int getWorkerHostWeight() { + return workerHostWeight; + } + + public void setWorkerHostWeight(int workerHostWeight) { + this.workerHostWeight = workerHostWeight; + } + + public int getWorkerWaitingTaskCount() { + return workerWaitingTaskCount; + } + + public void setWorkerWaitingTaskCount(int workerWaitingTaskCount) { + this.workerWaitingTaskCount = workerWaitingTaskCount; + } + + public int getWorkerExecThreadCount() { + return workerExecThreadCount; + } + + public void setWorkerExecThreadCount(int workerExecThreadCount) { + this.workerExecThreadCount = workerExecThreadCount; + } + + public HeartBeat() { + this.reportTime = System.currentTimeMillis(); + this.serverStatus = Constants.NORMAL_NODE_STATUS; + } + + public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory) { + this.reportTime = System.currentTimeMillis(); + this.serverStatus = Constants.NORMAL_NODE_STATUS; + this.startupTime = startupTime; + this.maxCpuloadAvg = maxCpuloadAvg; + this.reservedMemory = reservedMemory; + } + + public HeartBeat(long startupTime, double maxCpuloadAvg, double reservedMemory, int hostWeight, int workerExecThreadCount) { + this.reportTime = System.currentTimeMillis(); + this.serverStatus = Constants.NORMAL_NODE_STATUS; + this.startupTime = startupTime; + this.maxCpuloadAvg = maxCpuloadAvg; + this.reservedMemory = reservedMemory; + this.workerHostWeight = hostWeight; + this.workerExecThreadCount = workerExecThreadCount; + } + + /** + * fill system info + */ + private void fillSystemInfo() { + this.cpuUsage = OSUtils.cpuUsage(); + this.loadAverage = OSUtils.loadAverage(); + this.availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); + this.memoryUsage = OSUtils.memoryUsage(); + this.processId = OSUtils.getProcessID(); + } + + /** + * update server state + */ + public void updateServerState() { + if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) { + logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G", + loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory); + this.serverStatus = Constants.ABNORMAL_NODE_STATUS; + } else if (workerWaitingTaskCount > workerExecThreadCount) { + logger.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount); + this.serverStatus = Constants.BUSY_NODE_STATUE; + } else { + this.serverStatus = Constants.NORMAL_NODE_STATUS; + } + } + + /** + * encode heartbeat + */ + public String encodeHeartBeat() { + this.fillSystemInfo(); + this.updateServerState(); + + StringBuilder builder = new StringBuilder(100); + builder.append(cpuUsage).append(COMMA); + builder.append(memoryUsage).append(COMMA); + builder.append(loadAverage).append(COMMA); + builder.append(availablePhysicalMemorySize).append(Constants.COMMA); + builder.append(maxCpuloadAvg).append(Constants.COMMA); + builder.append(reservedMemory).append(Constants.COMMA); + builder.append(startupTime).append(Constants.COMMA); + builder.append(reportTime).append(Constants.COMMA); + builder.append(serverStatus).append(COMMA); + builder.append(processId).append(COMMA); + builder.append(workerHostWeight).append(COMMA); + builder.append(workerExecThreadCount).append(COMMA); + builder.append(workerWaitingTaskCount); + + return builder.toString(); + } + + /** + * decode heartbeat + */ + public static HeartBeat decodeHeartBeat(String heartBeatInfo) { + String[] parts = heartBeatInfo.split(Constants.COMMA); + if (parts.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH) { + return null; + } + HeartBeat heartBeat = new HeartBeat(); + heartBeat.cpuUsage = Double.parseDouble(parts[0]); + heartBeat.memoryUsage = Double.parseDouble(parts[1]); + heartBeat.loadAverage = Double.parseDouble(parts[2]); + heartBeat.availablePhysicalMemorySize = Double.parseDouble(parts[3]); + heartBeat.maxCpuloadAvg = Double.parseDouble(parts[4]); + heartBeat.reservedMemory = Double.parseDouble(parts[5]); + heartBeat.startupTime = Long.parseLong(parts[6]); + heartBeat.reportTime = Long.parseLong(parts[7]); + heartBeat.serverStatus = Integer.parseInt(parts[8]); + heartBeat.processId = Integer.parseInt(parts[9]); + heartBeat.workerHostWeight = Integer.parseInt(parts[10]); + heartBeat.workerExecThreadCount = Integer.parseInt(parts[11]); + heartBeat.workerWaitingTaskCount = Integer.parseInt(parts[12]); + return heartBeat; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java deleted file mode 100644 index f54bd17830..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.common.utils; - -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.model.Server; - -import org.apache.commons.lang.StringUtils; - -/** - * heartbeat for ZK reigster res info - */ -public class ResInfo { - - /** - * cpuUsage - */ - private double cpuUsage; - - /** - * memoryUsage - */ - private double memoryUsage; - - /** - * loadAverage - */ - private double loadAverage; - - public ResInfo(double cpuUsage, double memoryUsage) { - this.cpuUsage = cpuUsage; - this.memoryUsage = memoryUsage; - } - - public ResInfo(double cpuUsage, double memoryUsage, double loadAverage) { - this(cpuUsage,memoryUsage); - this.loadAverage = loadAverage; - } - - public double getCpuUsage() { - return cpuUsage; - } - - public void setCpuUsage(double cpuUsage) { - this.cpuUsage = cpuUsage; - } - - public double getMemoryUsage() { - return memoryUsage; - } - - public void setMemoryUsage(double memoryUsage) { - this.memoryUsage = memoryUsage; - } - - public double getLoadAverage() { - return loadAverage; - } - - public void setLoadAverage(double loadAverage) { - this.loadAverage = loadAverage; - } - - /** - * get CPU and memory usage - * @param cpuUsage cpu usage - * @param memoryUsage memory usage - * @param loadAverage load average - * @return cpu and memory usage - */ - public static String getResInfoJson(double cpuUsage, double memoryUsage, double loadAverage) { - ResInfo resInfo = new ResInfo(cpuUsage,memoryUsage,loadAverage); - return JSONUtils.toJsonString(resInfo); - } - - /** - * parse heartbeat info for zk - * @param heartBeatInfo heartbeat info - * @return heartbeat info to Server - */ - public static Server parseHeartbeatForRegistryInfo(String heartBeatInfo) { - if (!isValidHeartbeatForRegistryInfo(heartBeatInfo)) { - return null; - } - String[] parts = heartBeatInfo.split(Constants.COMMA); - Server server = new Server(); - server.setResInfo(getResInfoJson(Double.parseDouble(parts[0]), - Double.parseDouble(parts[1]), - Double.parseDouble(parts[2]))); - server.setCreateTime(DateUtils.stringToDate(parts[6])); - server.setLastHeartbeatTime(DateUtils.stringToDate(parts[7])); - //set process id - server.setId(Integer.parseInt(parts[9])); - return server; - } - - /** - * is valid heartbeat info for zk - * @param heartBeatInfo heartbeat info - * @return heartbeat info is valid - */ - public static boolean isValidHeartbeatForRegistryInfo(String heartBeatInfo) { - if (!StringUtils.isEmpty(heartBeatInfo)) { - String[] parts = heartBeatInfo.split(Constants.COMMA); - return parts.length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH - || parts.length == Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH; - } - return false; - } - - /** - * is new heartbeat info for zk with weight - * @param parts heartbeat info parts - * @return heartbeat info is new with weight - */ - public static boolean isNewHeartbeatWithWeight(String[] parts) { - return parts.length == Constants.HEARTBEAT_WITH_WEIGHT_FOR_ZOOKEEPER_INFO_LENGTH; - } - -} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java new file mode 100644 index 0000000000..c71450fa3d --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HeartBeatTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.utils; + +import static org.junit.Assert.assertEquals; + +import org.apache.dolphinscheduler.common.Constants; + +import org.junit.Test; + +/** + * NetUtilsTest + */ +public class HeartBeatTest { + + @Test + public void testAbnormalState() { + long startupTime = System.currentTimeMillis(); + double loadAverage = 100; + double reservedMemory = 100; + HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory); + heartBeat.updateServerState(); + assertEquals(Constants.ABNORMAL_NODE_STATUS, heartBeat.getServerStatus()); + } + + @Test + public void testBusyState() { + long startupTime = System.currentTimeMillis(); + double loadAverage = 0; + double reservedMemory = 0; + int hostWeight = 1; + int taskCount = 200; + int workerThreadCount = 199; + HeartBeat heartBeat = new HeartBeat(startupTime, loadAverage, reservedMemory, hostWeight, workerThreadCount); + + heartBeat.setWorkerWaitingTaskCount(taskCount); + heartBeat.updateServerState(); + assertEquals(Constants.BUSY_NODE_STATUE, heartBeat.getServerStatus()); + } + + @Test + public void testDecodeHeartBeat() throws Exception { + String heartBeatInfo = "0.35,0.58,3.09,6.47,5.0,1.0,1634033006749,1634033006857,1,29732,1,199,200"; + HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo); + + double delta = 0.001; + assertEquals(0.35, heartBeat.getCpuUsage(), delta); + assertEquals(0.58, heartBeat.getMemoryUsage(), delta); + assertEquals(3.09, heartBeat.getLoadAverage(), delta); + assertEquals(6.47, heartBeat.getAvailablePhysicalMemorySize(), delta); + assertEquals(5.0, heartBeat.getMaxCpuloadAvg(), delta); + assertEquals(1.0, heartBeat.getReservedMemory(), delta); + assertEquals(1634033006749L, heartBeat.getStartupTime()); + assertEquals(1634033006857L, heartBeat.getReportTime()); + assertEquals(1, heartBeat.getServerStatus()); + assertEquals(29732, heartBeat.getProcessId()); + assertEquals(199, heartBeat.getWorkerExecThreadCount()); + assertEquals(200, heartBeat.getWorkerWaitingTaskCount()); + } + +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java index 0e84db678a..d1448b3e6e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java @@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.ResInfo; +import org.apache.dolphinscheduler.common.utils.HeartBeat; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; @@ -36,7 +36,7 @@ import java.util.Set; import org.springframework.beans.factory.annotation.Autowired; /** - * common host manager + * common host manager */ public abstract class CommonHostManager implements HostManager { @@ -48,6 +48,7 @@ public abstract class CommonHostManager implements HostManager { /** * select host + * * @param context context * @return host */ @@ -87,12 +88,12 @@ public abstract class CommonHostManager implements HostManager { return hostWorkers; } - protected int getWorkerHostWeightFromHeartbeat(String heartbeat) { + protected int getWorkerHostWeightFromHeartbeat(String heartBeatInfo) { int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT; - if (!StringUtils.isEmpty(heartbeat)) { - String[] parts = heartbeat.split(Constants.COMMA); - if (ResInfo.isNewHeartbeatWithWeight(parts)) { - hostWeight = Integer.parseInt(parts[10]); + if (!StringUtils.isEmpty(heartBeatInfo)) { + HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo); + if (heartBeat != null) { + hostWeight = heartBeat.getWorkerHostWeight(); } } return hostWeight; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index 86ed6a8310..f78b95736a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -19,8 +19,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.ResInfo; +import org.apache.dolphinscheduler.common.utils.HeartBeat; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; @@ -47,7 +46,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * lower weight host manager + * lower weight host manager */ public class LowerWeightHostManager extends CommonHostManager { @@ -79,7 +78,7 @@ public class LowerWeightHostManager extends CommonHostManager { this.workerHostWeightsMap = new ConcurrentHashMap<>(); this.lock = new ReentrantLock(); this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor")); - this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),0, 5, TimeUnit.SECONDS); + this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(), 0, 5, TimeUnit.SECONDS); } @PreDestroy @@ -89,6 +88,7 @@ public class LowerWeightHostManager extends CommonHostManager { /** * select host + * * @param context context * @return host */ @@ -153,23 +153,23 @@ public class LowerWeightHostManager extends CommonHostManager { } } - public HostWeight getHostWeight(String addr, String workerGroup, String heartbeat) { - if (ResInfo.isValidHeartbeatForRegistryInfo(heartbeat)) { - String[] parts = heartbeat.split(Constants.COMMA); - int status = Integer.parseInt(parts[8]); - if (status == Constants.ABNORMAL_NODE_STATUS) { - logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low", - addr, Double.parseDouble(parts[2]), Double.parseDouble(parts[3])); - return null; - } - double cpu = Double.parseDouble(parts[0]); - double memory = Double.parseDouble(parts[1]); - double loadAverage = Double.parseDouble(parts[2]); - long startTime = DateUtils.stringToDate(parts[6]).getTime(); - int weight = getWorkerHostWeightFromHeartbeat(heartbeat); - return new HostWeight(HostWorker.of(addr, weight, workerGroup), cpu, memory, loadAverage, startTime); + public HostWeight getHostWeight(String addr, String workerGroup, String heartBeatInfo) { + HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo); + if (heartBeat == null) { + return null; + } + if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) { + logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low", + addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize()); + return null; + } + if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) { + logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}", + addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount()); + return null; } - return null; + return new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), + heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(), heartBeat.getStartupTime()); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java index 22de8e730b..7bae6de162 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistryClient.java @@ -29,7 +29,6 @@ import org.apache.dolphinscheduler.common.enums.StateEvent; import org.apache.dolphinscheduler.common.enums.StateEventType; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.thread.ThreadUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -96,14 +95,14 @@ public class MasterRegistryClient { private ConcurrentHashMap processInstanceExecMaps; /** - * master start time + * master startup time, ms */ - private String startTime; + private long startupTime; private String localNodePath; public void init(ConcurrentHashMap processInstanceExecMaps) { - this.startTime = DateUtils.dateToString(new Date()); + this.startupTime = System.currentTimeMillis(); this.registryClient = RegistryClient.getInstance(); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); this.processInstanceExecMaps = processInstanceExecMaps; @@ -364,14 +363,14 @@ public class MasterRegistryClient { String address = NetUtils.getAddr(masterConfig.getListenPort()); localNodePath = getMasterPath(); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); - HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, + HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory(), Sets.newHashSet(getMasterPath()), Constants.MASTER_TYPE, registryClient); - registryClient.persistEphemeral(localNodePath, heartBeatTask.heartBeatInfo()); + registryClient.persistEphemeral(localNodePath, heartBeatTask.getHeartBeatInfo()); registryClient.addConnectionStateListener(new MasterRegistryConnectStateListener()); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); logger.info("master node : {} registry to ZK successfully with heartBeatInterval : {}s", address, masterHeartbeatInterval); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index c80787709f..61e8c40c76 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -17,15 +17,12 @@ package org.apache.dolphinscheduler.server.registry; -import static org.apache.dolphinscheduler.remote.utils.Constants.COMMA; +import org.apache.dolphinscheduler.common.utils.HeartBeat; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; +import org.apache.dolphinscheduler.service.registry.RegistryClient; -import java.util.Date; import java.util.Set; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,42 +33,43 @@ public class HeartBeatTask implements Runnable { private final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); - private String startTime; - private double maxCpuloadAvg; - private double reservedMemory; - private int hostWeight; // worker host weight private Set heartBeatPaths; - private String serverType; private RegistryClient registryClient; + private WorkerManagerThread workerManagerThread; + private String serverType; + private HeartBeat heartBeat; - public HeartBeatTask(String startTime, + public HeartBeatTask(long startupTime, double maxCpuloadAvg, double reservedMemory, Set heartBeatPaths, String serverType, RegistryClient registryClient) { - this.startTime = startTime; - this.maxCpuloadAvg = maxCpuloadAvg; - this.reservedMemory = reservedMemory; this.heartBeatPaths = heartBeatPaths; - this.serverType = serverType; this.registryClient = registryClient; + this.serverType = serverType; + this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory); } - public HeartBeatTask(String startTime, + public HeartBeatTask(long startupTime, double maxCpuloadAvg, double reservedMemory, int hostWeight, Set heartBeatPaths, String serverType, - RegistryClient registryClient) { - this.startTime = startTime; - this.maxCpuloadAvg = maxCpuloadAvg; - this.reservedMemory = reservedMemory; - this.hostWeight = hostWeight; + RegistryClient registryClient, + int workerThreadCount, + WorkerManagerThread workerManagerThread + ) { this.heartBeatPaths = heartBeatPaths; - this.serverType = serverType; this.registryClient = registryClient; + this.workerManagerThread = workerManagerThread; + this.serverType = serverType; + this.heartBeat = new HeartBeat(startupTime, maxCpuloadAvg, reservedMemory, hostWeight, workerThreadCount); + } + + public String getHeartBeatInfo() { + return this.heartBeat.encodeHeartBeat(); } @Override @@ -85,41 +83,16 @@ public class HeartBeatTask implements Runnable { } } + if (workerManagerThread != null) { + // update waiting task count + heartBeat.setWorkerWaitingTaskCount(workerManagerThread.getThreadPoolQueueSize()); + } + for (String heartBeatPath : heartBeatPaths) { - registryClient.update(heartBeatPath, heartBeatInfo()); + registryClient.update(heartBeatPath, heartBeat.encodeHeartBeat()); } } catch (Throwable ex) { logger.error("error write heartbeat info", ex); } } - - public String heartBeatInfo() { - double loadAverage = OSUtils.loadAverage(); - double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); - int status = Constants.NORMAL_NODE_STATUS; - if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) { - logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G", - loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory); - status = Constants.ABNORMAL_NODE_STATUS; - } - - StringBuilder builder = new StringBuilder(100); - builder.append(OSUtils.cpuUsage()).append(COMMA); - builder.append(OSUtils.memoryUsage()).append(COMMA); - builder.append(OSUtils.loadAverage()).append(COMMA); - builder.append(OSUtils.availablePhysicalMemorySize()).append(Constants.COMMA); - builder.append(maxCpuloadAvg).append(Constants.COMMA); - builder.append(reservedMemory).append(Constants.COMMA); - builder.append(startTime).append(Constants.COMMA); - builder.append(DateUtils.dateToString(new Date())).append(Constants.COMMA); - builder.append(status).append(COMMA); - // save process id - builder.append(OSUtils.getProcessID()); - // worker host weight - if (Constants.WORKER_TYPE.equals(serverType)) { - builder.append(Constants.COMMA).append(hostWeight); - } - return builder.toString(); - } - } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index 9705b4480f..7c03f22973 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -80,12 +80,6 @@ public class WorkerServer implements IStoppable { */ private NettyRemotingServer nettyRemotingServer; - /** - * worker registry - */ - @Autowired - private WorkerRegistryClient workerRegistryClient; - /** * worker config */ @@ -110,6 +104,12 @@ public class WorkerServer implements IStoppable { @Autowired private WorkerManagerThread workerManagerThread; + /** + * worker registry + */ + @Autowired + private WorkerRegistryClient workerRegistryClient; + private TaskPluginManager taskPluginManager; /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index c3720fe460..76d70a468a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -172,7 +172,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { // submit task to manager if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager))) { - logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getQueueSize()); + logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getDelayQueueSize()); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index 363b497278..e8c6ad012c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -24,16 +24,15 @@ import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; -import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.registry.HeartBeatTask; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.lang.StringUtils; -import java.util.Date; import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.Executors; @@ -63,6 +62,12 @@ public class WorkerRegistryClient { @Autowired private WorkerConfig workerConfig; + /** + * worker manager + */ + @Autowired + private WorkerManagerThread workerManagerThread; + /** * heartbeat executor */ @@ -71,16 +76,16 @@ public class WorkerRegistryClient { private RegistryClient registryClient; /** - * worker start time + * worker startup time, ms */ - private String startTime; + private long startupTime; private Set workerGroups; @PostConstruct public void initWorkRegistry() { this.workerGroups = workerConfig.getWorkerGroups(); - this.startTime = DateUtils.dateToString(new Date()); + this.startupTime = System.currentTimeMillis(); this.registryClient = RegistryClient.getInstance(); this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("HeartBeatExecutor")); } @@ -98,13 +103,16 @@ public class WorkerRegistryClient { logger.info("worker node : {} registry to ZK {} successfully", address, workerZKPath); } - HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, + HeartBeatTask heartBeatTask = new HeartBeatTask(startupTime, workerConfig.getWorkerMaxCpuloadAvg(), workerConfig.getWorkerReservedMemory(), workerConfig.getHostWeight(), workerZkPaths, Constants.WORKER_TYPE, - registryClient); + registryClient, + workerConfig.getWorkerExecThreads(), + workerManagerThread + ); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); logger.info("worker node : {} heartbeat interval {} s", address, workerHeartbeatInterval); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 8319e01664..4f68166ebd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.spi.task.request.TaskRequest; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,14 +74,23 @@ public class WorkerManagerThread implements Runnable { } /** - * get queue size + * get delay queue size * * @return queue size */ - public int getQueueSize() { + public int getDelayQueueSize() { return workerExecuteQueue.size(); } + /** + * get thread pool queue size + * + * @return queue size + */ + public int getThreadPoolQueueSize() { + return ((ThreadPoolExecutor) workerExecService).getQueue().size(); + } + /** * Kill tasks that have not been executed, like delay task * then send Response to Master, update the execution status of task instance diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java index c6b1eb8936..210ce21e8e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java @@ -31,12 +31,14 @@ import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; -import org.apache.dolphinscheduler.common.utils.ResInfo; +import org.apache.dolphinscheduler.common.utils.HeartBeat; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.commons.lang.StringUtils; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -93,10 +95,17 @@ public class RegistryClient extends RegistryCenter { List serverList = new ArrayList<>(); for (Map.Entry entry : serverMaps.entrySet()) { - Server server = ResInfo.parseHeartbeatForRegistryInfo(entry.getValue()); - if (server == null) { + HeartBeat heartBeat = HeartBeat.decodeHeartBeat(entry.getValue()); + if (heartBeat == null) { continue; } + + Server server = new Server(); + server.setResInfo(JSONUtils.toJsonString(heartBeat)); + server.setCreateTime(new Date(heartBeat.getStartupTime())); + server.setLastHeartbeatTime(new Date(heartBeat.getReportTime())); + server.setId(heartBeat.getProcessId()); + String key = entry.getKey(); server.setZkDirectory(parentPath + "/" + key); // set host and port