Browse Source

[Fix-#6641]fix npe when the LowerWeightHostManager can not get the workNodeInfo (#6656)

* change the return type of getHostWeight

Co-authored-by: ywang46 <ywang46@paypal.com>
3.0.0/version-upgrade
Yao WANG 3 years ago committed by GitHub
parent
commit
73f20b7553
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  2. 47
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java
  3. 1
      pom.xml

26
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -26,7 +26,9 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker;
import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.util.Optional;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -138,9 +140,9 @@ public class LowerWeightHostManager extends CommonHostManager {
Set<HostWeight> hostWeights = new HashSet<>(nodes.size());
for (String node : nodes) {
String heartbeat = serverNodeManager.getWorkerNodeInfo(node);
HostWeight hostWeight = getHostWeight(node, workerGroup, heartbeat);
if (hostWeight != null) {
hostWeights.add(hostWeight);
Optional<HostWeight> hostWeightOpt = getHostWeight(node, workerGroup, heartbeat);
if (hostWeightOpt.isPresent()) {
hostWeights.add(hostWeightOpt.get());
}
}
if (!hostWeights.isEmpty()) {
@ -153,23 +155,29 @@ public class LowerWeightHostManager extends CommonHostManager {
}
}
public HostWeight getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
public Optional<HostWeight> getHostWeight(String addr, String workerGroup, String heartBeatInfo) {
if (StringUtils.isEmpty(heartBeatInfo)) {
logger.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
return Optional.empty();
}
HeartBeat heartBeat = HeartBeat.decodeHeartBeat(heartBeatInfo);
if (heartBeat == null) {
return null;
return Optional.empty();
}
if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) {
logger.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize());
return null;
return Optional.empty();
}
if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) {
logger.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}",
addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount());
return null;
return Optional.empty();
}
return new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(), heartBeat.getStartupTime());
return Optional.of(
new HostWeight(HostWorker.of(addr, heartBeat.getWorkerHostWeight(), workerGroup),
heartBeat.getCpuUsage(), heartBeat.getMemoryUsage(), heartBeat.getLoadAverage(),
heartBeat.getStartupTime()));
}
}

47
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RefreshResourceTaskTest.java

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
/**
* RefreshResourceTask test
*/
@RunWith(MockitoJUnitRunner.class)
public class RefreshResourceTaskTest {
@Mock
private ServerNodeManager serverNodeManager;
@InjectMocks
LowerWeightHostManager lowerWeightHostManager;
@Test
public void testGetHostWeightWithResult() {
Assert.assertTrue(!lowerWeightHostManager.new RefreshResourceTask()
.getHostWeight("192.168.1.1:22", "default", null)
.isPresent());
}
}

1
pom.xml

@ -1018,6 +1018,7 @@
<include>**/server/master/dispatch/host/assign/HostWorkerTest.java</include>
<include>**/server/master/registry/MasterRegistryClientTest.java</include>
<include>**/server/master/registry/ServerNodeManagerTest.java</include>
<include>**/server/master/dispatch/host/RefreshResourceTaskTest.java</include>
<include>**/server/master/dispatch/host/assign/RoundRobinHostManagerTest.java</include>
<include>**/server/master/MasterCommandTest.java</include>
<include>**/server/master/DependentTaskTest.java</include>

Loading…
Cancel
Save