diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java index 5769126659..9ec208e44c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java @@ -652,10 +652,6 @@ public final class Constants { */ public static final int AUTHORIZE_READABLE_PERM = 4; - public static final int NORMAL_NODE_STATUS = 0; - public static final int ABNORMAL_NODE_STATUS = 1; - public static final int BUSY_NODE_STATUE = 2; - public static final String START_TIME = "start time"; public static final String END_TIME = "end time"; public static final String START_END_DATE = "startDate,endDate"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java new file mode 100644 index 0000000000..16a0f0e34c --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.enums; + +public enum ServerStatus { + + NORMAL, ABNORMAL, BUSY + +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java index 52c96defc6..6386f32ae6 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.common.model; +import org.apache.dolphinscheduler.common.enums.ServerStatus; + import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -36,6 +38,7 @@ public class MasterHeartBeat implements HeartBeat { private double reservedMemory; private double diskAvailable; private int processId; + private ServerStatus serverStatus; private String host; private int port; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java index d3843d2783..396f227f47 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.common.model; +import org.apache.dolphinscheduler.common.enums.ServerStatus; + import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -36,7 +38,7 @@ public class WorkerHeartBeat implements HeartBeat { private double availablePhysicalMemorySize; private double reservedMemory; private double diskAvailable; - private int serverStatus; + private ServerStatus serverStatus; private int processId; private String host; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index a6cb9352a8..607b78abbc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; -import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ServerStatus; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.extract.base.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException; @@ -135,12 +135,12 @@ public class LowerWeightHostManager extends CommonHostManager { log.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup); return Optional.empty(); } - if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) { + if (ServerStatus.ABNORMAL == heartBeat.getServerStatus()) { log.warn("worker {} current cpu load average {} is too high or available memory {}G is too low", addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize()); return Optional.empty(); } - if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) { + if (ServerStatus.BUSY == heartBeat.getServerStatus()) { log.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}", addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount()); return Optional.empty(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterInfoChangeListener.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterInfoChangeListener.java new file mode 100644 index 0000000000..ffd7628923 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterInfoChangeListener.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.registry; + +import org.apache.dolphinscheduler.common.model.MasterHeartBeat; + +import java.util.Map; + +/** + * The listener used in {@link ServerNodeManager} to notify the change of master info. + */ +public interface MasterInfoChangeListener { + + /** + * Used to notify the change of master info. + * + * @param masterNodeInfo master node info map, key is master address, value is master info. + */ + void notify(Map masterNodeInfo); +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java new file mode 100644 index 0000000000..5fb0c74f5a --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.registry; + +import org.apache.dolphinscheduler.common.enums.ServerStatus; +import org.apache.dolphinscheduler.common.model.MasterHeartBeat; +import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; +import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue; + +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import javax.annotation.PostConstruct; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +@Slf4j +public class MasterSlotManager { + + @Autowired + protected ServerNodeManager serverNodeManager; + + @Autowired + protected MasterConfig masterConfig; + + private volatile int currentSlot = 0; + private volatile int totalSlot = 0; + + @PostConstruct + public void init() { + serverNodeManager.addMasterInfoChangeListener(new MasterSlotManager.SlotChangeListener()); + } + + public int getSlot() { + return currentSlot; + } + + public int getMasterSize() { + return totalSlot; + } + + public class SlotChangeListener implements MasterInfoChangeListener { + + private final Lock slotLock = new ReentrantLock(); + + private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue(); + + @Override + public void notify(Map masterNodeInfo) { + List serverList = masterNodeInfo.values().stream() + .filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.ABNORMAL)) + .map(this::convertHeartBeatToServer).collect(Collectors.toList()); + syncMasterNodes(serverList); + } + + /** + * sync master nodes + */ + private void syncMasterNodes(List masterNodes) { + slotLock.lock(); + try { + this.masterPriorityQueue.clear(); + this.masterPriorityQueue.putAll(masterNodes); + int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress()); + int tempTotalSlot = masterNodes.size(); + if (tempCurrentSlot < 0) { + totalSlot = 0; + currentSlot = 0; + log.warn("Current master is not in active master list"); + } else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) { + totalSlot = tempTotalSlot; + currentSlot = tempCurrentSlot; + log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot); + } + } finally { + slotLock.unlock(); + } + } + + private Server convertHeartBeatToServer(MasterHeartBeat masterHeartBeat) { + Server server = new Server(); + server.setCreateTime(new Date(masterHeartBeat.getStartupTime())); + server.setLastHeartbeatTime(new Date(masterHeartBeat.getReportTime())); + server.setId(masterHeartBeat.getProcessId()); + server.setHost(masterHeartBeat.getHost()); + server.setPort(masterHeartBeat.getPort()); + + return server; + } + + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index 852f73ce73..feee60a171 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.master.registry; import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.model.MasterHeartBeat; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.registry.api.SubscribeListener; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException; -import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.ArrayUtils; @@ -40,10 +39,8 @@ import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -52,7 +49,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; @@ -69,8 +65,6 @@ import org.springframework.stereotype.Service; @Slf4j public class ServerNodeManager implements InitializingBean { - private final Lock masterLock = new ReentrantLock(); - private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock(); private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock(); @@ -79,12 +73,14 @@ public class ServerNodeManager implements InitializingBean { private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock(); private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock(); + private final ReentrantLock masterNodeInfoLock = new ReentrantLock(); + /** * worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes */ private final ConcurrentHashMap> workerGroupNodes = new ConcurrentHashMap<>(); - private final Set masterNodes = new HashSet<>(); + private final Map masterNodeInfo = new HashMap<>(); private final Map workerNodeInfo = new HashMap<>(); @@ -99,8 +95,6 @@ public class ServerNodeManager implements InitializingBean { @Autowired private WorkerGroupMapper workerGroupMapper; - private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue(); - @Autowired private AlertDao alertDao; @@ -109,24 +103,13 @@ public class ServerNodeManager implements InitializingBean { private final List workerInfoChangeListeners = new ArrayList<>(); - private volatile int currentSlot = 0; - - private volatile int totalSlot = 0; - - public int getSlot() { - return currentSlot; - } - - public int getMasterSize() { - return totalSlot; - } + private final List masterInfoChangeListeners = new ArrayList<>(); @Override public void afterPropertiesSet() { // load nodes from zookeeper - updateMasterNodes(); - refreshWorkerNodesAndGroupMappings(); + refreshNodesAndGroupMappings(); // init executor service executorService = @@ -147,7 +130,7 @@ public class ServerNodeManager implements InitializingBean { public void run() { try { // sync worker node info - refreshWorkerNodesAndGroupMappings(); + refreshNodesAndGroupMappings(); } catch (Exception e) { log.error("WorkerNodeInfoAndGroupDbSyncTask error:", e); } @@ -155,12 +138,15 @@ public class ServerNodeManager implements InitializingBean { } /** - * Refresh worker nodes and worker group mapping information + * Refresh master/worker nodes and worker group mapping information */ - private void refreshWorkerNodesAndGroupMappings() { + private void refreshNodesAndGroupMappings() { updateWorkerNodes(); updateWorkerGroupMappings(); notifyWorkerInfoChangeListeners(); + + updateMasterNodes(); + notifyMasterInfoChangeListeners(); } /** @@ -214,11 +200,8 @@ public class ServerNodeManager implements InitializingBean { try { if (type.equals(Type.ADD)) { log.info("master node : {} added.", path); - updateMasterNodes(); - } - if (type.equals(Type.REMOVE)) { + } else if (type.equals(Type.REMOVE)) { log.info("master node : {} down.", path); - updateMasterNodes(); alertDao.sendServerStoppedAlert(1, path, "MASTER"); } } catch (Exception ex) { @@ -229,19 +212,17 @@ public class ServerNodeManager implements InitializingBean { } private void updateMasterNodes() { - currentSlot = 0; - totalSlot = 0; - this.masterNodes.clear(); - String nodeLock = RegistryNodeType.MASTER_NODE_LOCK.getRegistryPath(); + masterNodeInfoLock.lock(); try { - registryClient.getLock(nodeLock); - Collection currentNodes = registryClient.getMasterNodesDirectly(); - List masterNodeList = registryClient.getServerList(RegistryNodeType.MASTER); - syncMasterNodes(currentNodes, masterNodeList); + masterNodeInfo.clear(); + Map masterNodeMaps = registryClient.getServerMaps(RegistryNodeType.MASTER); + for (Map.Entry entry : masterNodeMaps.entrySet()) { + masterNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), MasterHeartBeat.class)); + } } catch (Exception e) { log.error("update master nodes error", e); } finally { - registryClient.releaseLock(nodeLock); + masterNodeInfoLock.unlock(); } } @@ -289,30 +270,6 @@ public class ServerNodeManager implements InitializingBean { } } - /** - * sync master nodes - * - * @param nodes master nodes - */ - private void syncMasterNodes(Collection nodes, List masterNodes) { - masterLock.lock(); - try { - this.masterNodes.addAll(nodes); - this.masterPriorityQueue.clear(); - this.masterPriorityQueue.putList(masterNodes); - int index = masterPriorityQueue.getIndex(masterConfig.getMasterAddress()); - if (index >= 0) { - totalSlot = nodes.size(); - currentSlot = index; - } else { - log.warn("Current master is not in active master list"); - } - log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot); - } finally { - masterLock.unlock(); - } - } - public Map> getWorkerGroupNodes() { workerGroupReadLock.lock(); try { @@ -360,6 +317,10 @@ public class ServerNodeManager implements InitializingBean { } } + public Map getMasterNodeInfo() { + return Collections.unmodifiableMap(masterNodeInfo); + } + /** * Add the resource change listener, when the resource changed, the listener will be notified. * @@ -377,6 +338,17 @@ public class ServerNodeManager implements InitializingBean { } } + public synchronized void addMasterInfoChangeListener(MasterInfoChangeListener listener) { + masterInfoChangeListeners.add(listener); + } + + private void notifyMasterInfoChangeListeners() { + Map masterNodeInfoMap = getMasterNodeInfo(); + for (MasterInfoChangeListener listener : masterInfoChangeListeners) { + listener.notify(masterNodeInfoMap); + } + } + @PreDestroy public void destroy() { executorService.shutdownNow(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java index 0313e2b4a5..4d84644cb0 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java @@ -33,7 +33,7 @@ import org.apache.dolphinscheduler.server.master.exception.MasterException; import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; -import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager; import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.commons.collections4.CollectionUtils; @@ -73,7 +73,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl private WorkflowEventLooper workflowEventLooper; @Autowired - private ServerNodeManager serverNodeManager; + private MasterSlotManager masterSlotManager; @Autowired private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap; @@ -171,8 +171,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl private List findCommands() throws MasterException { try { long scheduleStartTime = System.currentTimeMillis(); - int thisMasterSlot = serverNodeManager.getSlot(); - int masterCount = serverNodeManager.getMasterSize(); + int thisMasterSlot = masterSlotManager.getSlot(); + int masterCount = masterSlotManager.getMasterSize(); if (masterCount <= 0) { log.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot); return Collections.emptyList(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java index 0578176e5e..89810aea1f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java @@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; -import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; +import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager; import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -41,7 +41,7 @@ import org.springframework.stereotype.Component; public class WorkflowExecuteContextFactory { @Autowired - private ServerNodeManager serverNodeManager; + private MasterSlotManager masterSlotManager; @Autowired private ProcessService processService; @@ -85,8 +85,8 @@ public class WorkflowExecuteContextFactory { } private SlotCheckState slotCheck(Command command) { - int slot = serverNodeManager.getSlot(); - int masterSize = serverNodeManager.getMasterSize(); + int slot = masterSlotManager.getSlot(); + int masterSize = masterSlotManager.getMasterSize(); SlotCheckState state; if (masterSize <= 0) { state = SlotCheckState.CHANGE; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java index 824d4778ab..f8a9b30e28 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.task; +import org.apache.dolphinscheduler.common.enums.ServerStatus; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.model.MasterHeartBeat; @@ -60,6 +61,7 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask { .memoryUsage(OSUtils.memoryUsagePercentage()) .diskAvailable(OSUtils.diskAvailable()) .processId(processId) + .serverStatus(getServerStatus()) .host(NetUtils.getHost()) .port(masterConfig.getListenPort()) .build(); @@ -72,4 +74,10 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask { log.debug("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}", heartBeatPath, masterHeartBeatJson); } + + private ServerStatus getServerStatus() { + return OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory()) + ? ServerStatus.ABNORMAL + : ServerStatus.NORMAL; + } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java new file mode 100644 index 0000000000..4ee75a0392 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.registry; + +import org.apache.dolphinscheduler.common.enums.ServerStatus; +import org.apache.dolphinscheduler.common.model.MasterHeartBeat; +import org.apache.dolphinscheduler.server.master.config.MasterConfig; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class MasterSlotManagerTest { + + @InjectMocks + private MasterSlotManager masterSlotManager = Mockito.spy(new MasterSlotManager()); + + @Mock + private MasterConfig masterConfig; + + @Test + void testNormalMasterSlots() { + // on normal Master side + Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:7777"); + + sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL); + Assertions.assertEquals(1, masterSlotManager.getMasterSize()); + Assertions.assertEquals(0, masterSlotManager.getSlot()); + + sendHeartBeat(ServerStatus.NORMAL, ServerStatus.NORMAL); + Assertions.assertEquals(2, masterSlotManager.getMasterSize()); + Assertions.assertEquals(1, masterSlotManager.getSlot()); + } + + @Test + void testOverloadMasterSlots() { + // on abnormal Master side + Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:6666"); + + sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL); + Assertions.assertEquals(0, masterSlotManager.getMasterSize()); + Assertions.assertEquals(0, masterSlotManager.getSlot()); + + sendHeartBeat(ServerStatus.NORMAL, ServerStatus.NORMAL); + Assertions.assertEquals(2, masterSlotManager.getMasterSize()); + Assertions.assertEquals(0, masterSlotManager.getSlot()); + } + + public void sendHeartBeat(ServerStatus serverStatus1, ServerStatus serverStatus2) { + MasterSlotManager.SlotChangeListener slotChangeListener = masterSlotManager.new SlotChangeListener(); + + Map masterNodeInfo = new HashMap<>(); + // generate heartbeat + MasterHeartBeat masterHeartBeat1 = MasterHeartBeat.builder() + .startupTime(System.currentTimeMillis()) + .serverStatus(serverStatus1) + .host("127.0.0.1") + .port(6666) + .build(); + MasterHeartBeat masterHeartBeat2 = MasterHeartBeat.builder() + .startupTime(System.currentTimeMillis()) + .serverStatus(serverStatus2) + .host("127.0.0.1") + .port(7777) + .build(); + masterNodeInfo.put("127.0.0.1:6666", masterHeartBeat1); + masterNodeInfo.put("127.0.0.1:7777", masterHeartBeat2); + + slotChangeListener.notify(masterNodeInfo); + } +} diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java index e5405489e3..d4fe74cd9f 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java @@ -20,10 +20,10 @@ package org.apache.dolphinscheduler.service.queue; import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.NetUtils; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; @@ -62,7 +62,7 @@ public class MasterPriorityQueue implements TaskPriorityQueue { return queue.size(); } - public void putList(List serverList) { + public void putAll(Collection serverList) { for (Server server : serverList) { this.queue.put(server); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java index bb9c77e849..1294163e1e 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java @@ -17,7 +17,7 @@ package org.apache.dolphinscheduler.server.worker.task; -import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.enums.ServerStatus; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; @@ -59,7 +59,7 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask { double reservedMemory = workerConfig.getReservedMemory(); double memoryUsagePercentage = OSUtils.memoryUsagePercentage(); int execThreads = workerConfig.getExecThreads(); - int serverStatus = + ServerStatus serverStatus = getServerStatus(cpuUsagePercentage, maxCpuUsePercentage, memoryUsagePercentage, reservedMemory, execThreads, this.workerWaitingTaskCount.get()); @@ -91,23 +91,23 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask { workerRegistryPath, workerHeartBeatJson); } - public int getServerStatus(double cpuUsagePercentage, - double maxCpuUsePercentage, - double memoryUsagePercentage, - double reservedMemory, - int workerExecThreadCount, - int workerWaitingTaskCount) { + private ServerStatus getServerStatus(double cpuUsagePercentage, + double maxCpuUsePercentage, + double memoryUsagePercentage, + double reservedMemory, + int workerExecThreadCount, + int workerWaitingTaskCount) { if (cpuUsagePercentage > maxCpuUsePercentage || (1 - memoryUsagePercentage) < reservedMemory) { log.warn( "current cpu load average {} is higher than {} or available memory {} is lower than {}", cpuUsagePercentage, maxCpuUsePercentage, 1 - memoryUsagePercentage, reservedMemory); - return Constants.ABNORMAL_NODE_STATUS; + return ServerStatus.ABNORMAL; } else if (workerWaitingTaskCount > workerExecThreadCount) { log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount); - return Constants.BUSY_NODE_STATUE; + return ServerStatus.BUSY; } else { - return Constants.NORMAL_NODE_STATUS; + return ServerStatus.NORMAL; } } }