diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index 64a15b5b95..6549f9d63c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -26,7 +26,9 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin; +import org.apache.dolphinscheduler.spi.utils.StringUtils; +import java.util.Optional; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -138,9 +140,9 @@ public class LowerWeightHostManager extends CommonHostManager { Set hostWeights = new HashSet<>(nodes.size()); for (String node : nodes) { String heartbeat = serverNodeManager.getWorkerNodeInfo(node); - HostWeight hostWeight = getHostWeight(node, workerGroup, heartbeat); - if (hostWeight != null) { - hostWeights.add(hostWeight); + Optional hostWeightOpt = getHostWeight(node, workerGroup, heartbeat); + if (hostWeightOpt.isPresent()) { + hostWeights.add(hostWeightOpt.get()); } } if (!hostWeights.isEmpty()) { @@ -153,23 +155,29 @@ public class LowerWeightHostManager extends CommonHostManager { } } - public HostWeight getHostWeight(String addr, String workerGroup, String heartBeatInfo) { + public Optional getHostWeight(String addr, String workerGroup, String heartBeatInfo) { + if (StringUtils.isEmpty(heartBeatInfo)) { + logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup); + return Optional.empty(); + } HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo); if (heartBeat == null) { - return null; + return Optional.empty(); } if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) { logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low", addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize()); - return null; + return Optional.empty(); } if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) { logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}", addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount()); - return null; + return Optional.empty(); } - return new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), - heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(), heartBeat.getStartupTime()); + return Optional.of( + new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup), + heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(), + heartBeat.getStartupTime())); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java new file mode 100644 index 0000000000..5e35b0b2ec --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.dispatch.host; + +import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +/** + * RefreshResourceTask test + */ +@RunWith(MockitoJUnitRunner.class) +public class RefreshResourceTaskTest { + + @Mock + private ServerNodeManager serverNodeManager; + + @InjectMocks + LowerWeightHostManager lowerWeightHostManager; + + @Test + public void testGetHostWeightWithResult() { + Assert.assertTrue(!lowerWeightHostManager.new RefreshResourceTask() + .getHostWeight("192.168.1.1:22", "default", null) + .isPresent()); + } +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 5f9f8973bf..0c42a7a244 100644 --- a/pom.xml +++ b/pom.xml @@ -1018,6 +1018,7 @@ **/server/master/dispatch/host/assign/HostWorkerTest.java **/server/master/registry/MasterRegistryClientTest.java **/server/master/registry/ServerNodeManagerTest.java + **/server/master/dispatch/host/RefreshResourceTaskTest.java **/server/master/dispatch/host/assign/RoundRobinHostManagerTest.java **/server/master/MasterCommandTest.java **/server/master/DependentTaskTest.java