Browse Source

[Improvement-14884][Master] Add overload state in master heartbeat to trigger slot change (#14887)

3.2.1-prepare
Aaron Wang 1 year ago committed by GitHub
parent
commit
73e846d03e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
  2. 24
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java
  3. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java
  4. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java
  5. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java
  6. 35
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterInfoChangeListener.java
  7. 116
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java
  8. 98
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  9. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java
  10. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java
  11. 8
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java
  12. 94
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java
  13. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java
  14. 12
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

@ -652,10 +652,6 @@ public final class Constants {
*/ */
public static final int AUTHORIZE_READABLE_PERM = 4; public static final int AUTHORIZE_READABLE_PERM = 4;
public static final int NORMAL_NODE_STATUS = 0;
public static final int ABNORMAL_NODE_STATUS = 1;
public static final int BUSY_NODE_STATUE = 2;
public static final String START_TIME = "start time"; public static final String START_TIME = "start time";
public static final String END_TIME = "end time"; public static final String END_TIME = "end time";
public static final String START_END_DATE = "startDate,endDate"; public static final String START_END_DATE = "startDate,endDate";

24
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ServerStatus.java

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
public enum ServerStatus {
NORMAL, ABNORMAL, BUSY
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/MasterHeartBeat.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.common.model; package org.apache.dolphinscheduler.common.model;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -36,6 +38,7 @@ public class MasterHeartBeat implements HeartBeat {
private double reservedMemory; private double reservedMemory;
private double diskAvailable; private double diskAvailable;
private int processId; private int processId;
private ServerStatus serverStatus;
private String host; private String host;
private int port; private int port;

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/model/WorkerHeartBeat.java

@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.common.model; package org.apache.dolphinscheduler.common.model;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -36,7 +38,7 @@ public class WorkerHeartBeat implements HeartBeat {
private double availablePhysicalMemorySize; private double availablePhysicalMemorySize;
private double reservedMemory; private double reservedMemory;
private double diskAvailable; private double diskAvailable;
private int serverStatus; private ServerStatus serverStatus;
private int processId; private int processId;
private String host; private String host;

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.master.dispatch.host; package org.apache.dolphinscheduler.server.master.dispatch.host;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.extract.base.utils.Host; import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
@ -135,12 +135,12 @@ public class LowerWeightHostManager extends CommonHostManager {
log.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup); log.warn("worker {} in work group {} have not received the heartbeat", addr, workerGroup);
return Optional.empty(); return Optional.empty();
} }
if (Constants.ABNORMAL_NODE_STATUS == heartBeat.getServerStatus()) { if (ServerStatus.ABNORMAL == heartBeat.getServerStatus()) {
log.warn("worker {} current cpu load average {} is too high or available memory {}G is too low", log.warn("worker {} current cpu load average {} is too high or available memory {}G is too low",
addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize()); addr, heartBeat.getLoadAverage(), heartBeat.getAvailablePhysicalMemorySize());
return Optional.empty(); return Optional.empty();
} }
if (Constants.BUSY_NODE_STATUE == heartBeat.getServerStatus()) { if (ServerStatus.BUSY == heartBeat.getServerStatus()) {
log.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}", log.warn("worker {} is busy, current waiting task count {} is large than worker thread count {}",
addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount()); addr, heartBeat.getWorkerWaitingTaskCount(), heartBeat.getWorkerExecThreadCount());
return Optional.empty(); return Optional.empty();

35
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterInfoChangeListener.java

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import java.util.Map;
/**
* The listener used in {@link ServerNodeManager} to notify the change of master info.
*/
public interface MasterInfoChangeListener {
/**
* Used to notify the change of master info.
*
* @param masterNodeInfo master node info map, key is master address, value is master info.
*/
void notify(Map<String, MasterHeartBeat> masterNodeInfo);
}

116
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManager.java

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class MasterSlotManager {
@Autowired
protected ServerNodeManager serverNodeManager;
@Autowired
protected MasterConfig masterConfig;
private volatile int currentSlot = 0;
private volatile int totalSlot = 0;
@PostConstruct
public void init() {
serverNodeManager.addMasterInfoChangeListener(new MasterSlotManager.SlotChangeListener());
}
public int getSlot() {
return currentSlot;
}
public int getMasterSize() {
return totalSlot;
}
public class SlotChangeListener implements MasterInfoChangeListener {
private final Lock slotLock = new ReentrantLock();
private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
@Override
public void notify(Map<String, MasterHeartBeat> masterNodeInfo) {
List<Server> serverList = masterNodeInfo.values().stream()
.filter(heartBeat -> !heartBeat.getServerStatus().equals(ServerStatus.ABNORMAL))
.map(this::convertHeartBeatToServer).collect(Collectors.toList());
syncMasterNodes(serverList);
}
/**
* sync master nodes
*/
private void syncMasterNodes(List<Server> masterNodes) {
slotLock.lock();
try {
this.masterPriorityQueue.clear();
this.masterPriorityQueue.putAll(masterNodes);
int tempCurrentSlot = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
int tempTotalSlot = masterNodes.size();
if (tempCurrentSlot < 0) {
totalSlot = 0;
currentSlot = 0;
log.warn("Current master is not in active master list");
} else if (tempCurrentSlot != currentSlot || tempTotalSlot != totalSlot) {
totalSlot = tempTotalSlot;
currentSlot = tempCurrentSlot;
log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
}
} finally {
slotLock.unlock();
}
}
private Server convertHeartBeatToServer(MasterHeartBeat masterHeartBeat) {
Server server = new Server();
server.setCreateTime(new Date(masterHeartBeat.getStartupTime()));
server.setLastHeartbeatTime(new Date(masterHeartBeat.getReportTime()));
server.setId(masterHeartBeat.getProcessId());
server.setHost(masterHeartBeat.getHost());
server.setPort(masterHeartBeat.getPort());
return server;
}
}
}

98
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -18,7 +18,7 @@
package org.apache.dolphinscheduler.server.master.registry; package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.WorkerGroupNotFoundException;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.ArrayUtils;
@ -40,10 +39,8 @@ import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -52,7 +49,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -69,8 +65,6 @@ import org.springframework.stereotype.Service;
@Slf4j @Slf4j
public class ServerNodeManager implements InitializingBean { public class ServerNodeManager implements InitializingBean {
private final Lock masterLock = new ReentrantLock();
private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock workerGroupLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock(); private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock();
private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock(); private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock();
@ -79,12 +73,14 @@ public class ServerNodeManager implements InitializingBean {
private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock(); private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock();
private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock(); private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock();
private final ReentrantLock masterNodeInfoLock = new ReentrantLock();
/** /**
* worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes * worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes
*/ */
private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
private final Set<String> masterNodes = new HashSet<>(); private final Map<String, MasterHeartBeat> masterNodeInfo = new HashMap<>();
private final Map<String, WorkerHeartBeat> workerNodeInfo = new HashMap<>(); private final Map<String, WorkerHeartBeat> workerNodeInfo = new HashMap<>();
@ -99,8 +95,6 @@ public class ServerNodeManager implements InitializingBean {
@Autowired @Autowired
private WorkerGroupMapper workerGroupMapper; private WorkerGroupMapper workerGroupMapper;
private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
@Autowired @Autowired
private AlertDao alertDao; private AlertDao alertDao;
@ -109,24 +103,13 @@ public class ServerNodeManager implements InitializingBean {
private final List<WorkerInfoChangeListener> workerInfoChangeListeners = new ArrayList<>(); private final List<WorkerInfoChangeListener> workerInfoChangeListeners = new ArrayList<>();
private volatile int currentSlot = 0; private final List<MasterInfoChangeListener> masterInfoChangeListeners = new ArrayList<>();
private volatile int totalSlot = 0;
public int getSlot() {
return currentSlot;
}
public int getMasterSize() {
return totalSlot;
}
@Override @Override
public void afterPropertiesSet() { public void afterPropertiesSet() {
// load nodes from zookeeper // load nodes from zookeeper
updateMasterNodes(); refreshNodesAndGroupMappings();
refreshWorkerNodesAndGroupMappings();
// init executor service // init executor service
executorService = executorService =
@ -147,7 +130,7 @@ public class ServerNodeManager implements InitializingBean {
public void run() { public void run() {
try { try {
// sync worker node info // sync worker node info
refreshWorkerNodesAndGroupMappings(); refreshNodesAndGroupMappings();
} catch (Exception e) { } catch (Exception e) {
log.error("WorkerNodeInfoAndGroupDbSyncTask error:", e); log.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
} }
@ -155,12 +138,15 @@ public class ServerNodeManager implements InitializingBean {
} }
/** /**
* Refresh worker nodes and worker group mapping information * Refresh master/worker nodes and worker group mapping information
*/ */
private void refreshWorkerNodesAndGroupMappings() { private void refreshNodesAndGroupMappings() {
updateWorkerNodes(); updateWorkerNodes();
updateWorkerGroupMappings(); updateWorkerGroupMappings();
notifyWorkerInfoChangeListeners(); notifyWorkerInfoChangeListeners();
updateMasterNodes();
notifyMasterInfoChangeListeners();
} }
/** /**
@ -214,11 +200,8 @@ public class ServerNodeManager implements InitializingBean {
try { try {
if (type.equals(Type.ADD)) { if (type.equals(Type.ADD)) {
log.info("master node : {} added.", path); log.info("master node : {} added.", path);
updateMasterNodes(); } else if (type.equals(Type.REMOVE)) {
}
if (type.equals(Type.REMOVE)) {
log.info("master node : {} down.", path); log.info("master node : {} down.", path);
updateMasterNodes();
alertDao.sendServerStoppedAlert(1, path, "MASTER"); alertDao.sendServerStoppedAlert(1, path, "MASTER");
} }
} catch (Exception ex) { } catch (Exception ex) {
@ -229,19 +212,17 @@ public class ServerNodeManager implements InitializingBean {
} }
private void updateMasterNodes() { private void updateMasterNodes() {
currentSlot = 0; masterNodeInfoLock.lock();
totalSlot = 0;
this.masterNodes.clear();
String nodeLock = RegistryNodeType.MASTER_NODE_LOCK.getRegistryPath();
try { try {
registryClient.getLock(nodeLock); masterNodeInfo.clear();
Collection<String> currentNodes = registryClient.getMasterNodesDirectly(); Map<String, String> masterNodeMaps = registryClient.getServerMaps(RegistryNodeType.MASTER);
List<Server> masterNodeList = registryClient.getServerList(RegistryNodeType.MASTER); for (Map.Entry<String, String> entry : masterNodeMaps.entrySet()) {
syncMasterNodes(currentNodes, masterNodeList); masterNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), MasterHeartBeat.class));
}
} catch (Exception e) { } catch (Exception e) {
log.error("update master nodes error", e); log.error("update master nodes error", e);
} finally { } finally {
registryClient.releaseLock(nodeLock); masterNodeInfoLock.unlock();
} }
} }
@ -289,30 +270,6 @@ public class ServerNodeManager implements InitializingBean {
} }
} }
/**
* sync master nodes
*
* @param nodes master nodes
*/
private void syncMasterNodes(Collection<String> nodes, List<Server> masterNodes) {
masterLock.lock();
try {
this.masterNodes.addAll(nodes);
this.masterPriorityQueue.clear();
this.masterPriorityQueue.putList(masterNodes);
int index = masterPriorityQueue.getIndex(masterConfig.getMasterAddress());
if (index >= 0) {
totalSlot = nodes.size();
currentSlot = index;
} else {
log.warn("Current master is not in active master list");
}
log.info("Update master nodes, total master size: {}, current slot: {}", totalSlot, currentSlot);
} finally {
masterLock.unlock();
}
}
public Map<String, Set<String>> getWorkerGroupNodes() { public Map<String, Set<String>> getWorkerGroupNodes() {
workerGroupReadLock.lock(); workerGroupReadLock.lock();
try { try {
@ -360,6 +317,10 @@ public class ServerNodeManager implements InitializingBean {
} }
} }
public Map<String, MasterHeartBeat> getMasterNodeInfo() {
return Collections.unmodifiableMap(masterNodeInfo);
}
/** /**
* Add the resource change listener, when the resource changed, the listener will be notified. * Add the resource change listener, when the resource changed, the listener will be notified.
* *
@ -377,6 +338,17 @@ public class ServerNodeManager implements InitializingBean {
} }
} }
public synchronized void addMasterInfoChangeListener(MasterInfoChangeListener listener) {
masterInfoChangeListeners.add(listener);
}
private void notifyMasterInfoChangeListeners() {
Map<String, MasterHeartBeat> masterNodeInfoMap = getMasterNodeInfo();
for (MasterInfoChangeListener listener : masterInfoChangeListeners) {
listener.notify(masterNodeInfoMap);
}
}
@PreDestroy @PreDestroy
public void destroy() { public void destroy() {
executorService.shutdownNow(); executorService.shutdownNow();

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerBootstrap.java

@ -33,7 +33,7 @@ import org.apache.dolphinscheduler.server.master.exception.MasterException;
import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException; import org.apache.dolphinscheduler.server.master.exception.WorkflowCreateException;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics; import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
import org.apache.dolphinscheduler.service.command.CommandService; import org.apache.dolphinscheduler.service.command.CommandService;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
@ -73,7 +73,7 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
private WorkflowEventLooper workflowEventLooper; private WorkflowEventLooper workflowEventLooper;
@Autowired @Autowired
private ServerNodeManager serverNodeManager; private MasterSlotManager masterSlotManager;
@Autowired @Autowired
private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap; private MasterTaskExecutorBootstrap masterTaskExecutorBootstrap;
@ -171,8 +171,8 @@ public class MasterSchedulerBootstrap extends BaseDaemonThread implements AutoCl
private List<Command> findCommands() throws MasterException { private List<Command> findCommands() throws MasterException {
try { try {
long scheduleStartTime = System.currentTimeMillis(); long scheduleStartTime = System.currentTimeMillis();
int thisMasterSlot = serverNodeManager.getSlot(); int thisMasterSlot = masterSlotManager.getSlot();
int masterCount = serverNodeManager.getMasterSize(); int masterCount = masterSlotManager.getMasterSize();
if (masterCount <= 0) { if (masterCount <= 0) {
log.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot); log.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
return Collections.emptyList(); return Collections.emptyList();

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContextFactory.java

@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph; import org.apache.dolphinscheduler.server.master.graph.IWorkflowGraph;
import org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory; import org.apache.dolphinscheduler.server.master.graph.WorkflowGraphFactory;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics; import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.master.registry.MasterSlotManager;
import org.apache.dolphinscheduler.service.exceptions.CronParseException; import org.apache.dolphinscheduler.service.exceptions.CronParseException;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
@ -41,7 +41,7 @@ import org.springframework.stereotype.Component;
public class WorkflowExecuteContextFactory { public class WorkflowExecuteContextFactory {
@Autowired @Autowired
private ServerNodeManager serverNodeManager; private MasterSlotManager masterSlotManager;
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
@ -85,8 +85,8 @@ public class WorkflowExecuteContextFactory {
} }
private SlotCheckState slotCheck(Command command) { private SlotCheckState slotCheck(Command command) {
int slot = serverNodeManager.getSlot(); int slot = masterSlotManager.getSlot();
int masterSize = serverNodeManager.getMasterSize(); int masterSize = masterSlotManager.getMasterSize();
SlotCheckState state; SlotCheckState state;
if (masterSize <= 0) { if (masterSize <= 0) {
state = SlotCheckState.CHANGE; state = SlotCheckState.CHANGE;

8
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/task/MasterHeartBeatTask.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.task; package org.apache.dolphinscheduler.server.master.task;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat; import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
@ -60,6 +61,7 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
.memoryUsage(OSUtils.memoryUsagePercentage()) .memoryUsage(OSUtils.memoryUsagePercentage())
.diskAvailable(OSUtils.diskAvailable()) .diskAvailable(OSUtils.diskAvailable())
.processId(processId) .processId(processId)
.serverStatus(getServerStatus())
.host(NetUtils.getHost()) .host(NetUtils.getHost())
.port(masterConfig.getListenPort()) .port(masterConfig.getListenPort())
.build(); .build();
@ -72,4 +74,10 @@ public class MasterHeartBeatTask extends BaseHeartBeatTask<MasterHeartBeat> {
log.debug("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}", log.debug("Success write master heartBeatInfo into registry, masterRegistryPath: {}, heartBeatInfo: {}",
heartBeatPath, masterHeartBeatJson); heartBeatPath, masterHeartBeatJson);
} }
private ServerStatus getServerStatus() {
return OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory())
? ServerStatus.ABNORMAL
: ServerStatus.NORMAL;
}
} }

94
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/registry/MasterSlotManagerTest.java

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.model.MasterHeartBeat;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class MasterSlotManagerTest {
@InjectMocks
private MasterSlotManager masterSlotManager = Mockito.spy(new MasterSlotManager());
@Mock
private MasterConfig masterConfig;
@Test
void testNormalMasterSlots() {
// on normal Master side
Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:7777");
sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL);
Assertions.assertEquals(1, masterSlotManager.getMasterSize());
Assertions.assertEquals(0, masterSlotManager.getSlot());
sendHeartBeat(ServerStatus.NORMAL, ServerStatus.NORMAL);
Assertions.assertEquals(2, masterSlotManager.getMasterSize());
Assertions.assertEquals(1, masterSlotManager.getSlot());
}
@Test
void testOverloadMasterSlots() {
// on abnormal Master side
Mockito.when(masterConfig.getMasterAddress()).thenReturn("127.0.0.1:6666");
sendHeartBeat(ServerStatus.ABNORMAL, ServerStatus.NORMAL);
Assertions.assertEquals(0, masterSlotManager.getMasterSize());
Assertions.assertEquals(0, masterSlotManager.getSlot());
sendHeartBeat(ServerStatus.NORMAL, ServerStatus.NORMAL);
Assertions.assertEquals(2, masterSlotManager.getMasterSize());
Assertions.assertEquals(0, masterSlotManager.getSlot());
}
public void sendHeartBeat(ServerStatus serverStatus1, ServerStatus serverStatus2) {
MasterSlotManager.SlotChangeListener slotChangeListener = masterSlotManager.new SlotChangeListener();
Map<String, MasterHeartBeat> masterNodeInfo = new HashMap<>();
// generate heartbeat
MasterHeartBeat masterHeartBeat1 = MasterHeartBeat.builder()
.startupTime(System.currentTimeMillis())
.serverStatus(serverStatus1)
.host("127.0.0.1")
.port(6666)
.build();
MasterHeartBeat masterHeartBeat2 = MasterHeartBeat.builder()
.startupTime(System.currentTimeMillis())
.serverStatus(serverStatus2)
.host("127.0.0.1")
.port(7777)
.build();
masterNodeInfo.put("127.0.0.1:6666", masterHeartBeat1);
masterNodeInfo.put("127.0.0.1:7777", masterHeartBeat2);
slotChangeListener.notify(masterNodeInfo);
}
}

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/queue/MasterPriorityQueue.java

@ -20,10 +20,10 @@ package org.apache.dolphinscheduler.service.queue;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.NetUtils;
import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -62,7 +62,7 @@ public class MasterPriorityQueue implements TaskPriorityQueue<Server> {
return queue.size(); return queue.size();
} }
public void putList(List<Server> serverList) { public void putAll(Collection<Server> serverList) {
for (Server server : serverList) { for (Server server : serverList) {
this.queue.put(server); this.queue.put(server);
} }

12
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java

@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task; package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.ServerStatus;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
@ -59,7 +59,7 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
double reservedMemory = workerConfig.getReservedMemory(); double reservedMemory = workerConfig.getReservedMemory();
double memoryUsagePercentage = OSUtils.memoryUsagePercentage(); double memoryUsagePercentage = OSUtils.memoryUsagePercentage();
int execThreads = workerConfig.getExecThreads(); int execThreads = workerConfig.getExecThreads();
int serverStatus = ServerStatus serverStatus =
getServerStatus(cpuUsagePercentage, maxCpuUsePercentage, memoryUsagePercentage, reservedMemory, getServerStatus(cpuUsagePercentage, maxCpuUsePercentage, memoryUsagePercentage, reservedMemory,
execThreads, this.workerWaitingTaskCount.get()); execThreads, this.workerWaitingTaskCount.get());
@ -91,7 +91,7 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
workerRegistryPath, workerHeartBeatJson); workerRegistryPath, workerHeartBeatJson);
} }
public int getServerStatus(double cpuUsagePercentage, private ServerStatus getServerStatus(double cpuUsagePercentage,
double maxCpuUsePercentage, double maxCpuUsePercentage,
double memoryUsagePercentage, double memoryUsagePercentage,
double reservedMemory, double reservedMemory,
@ -101,13 +101,13 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
log.warn( log.warn(
"current cpu load average {} is higher than {} or available memory {} is lower than {}", "current cpu load average {} is higher than {} or available memory {} is lower than {}",
cpuUsagePercentage, maxCpuUsePercentage, 1 - memoryUsagePercentage, reservedMemory); cpuUsagePercentage, maxCpuUsePercentage, 1 - memoryUsagePercentage, reservedMemory);
return Constants.ABNORMAL_NODE_STATUS; return ServerStatus.ABNORMAL;
} else if (workerWaitingTaskCount > workerExecThreadCount) { } else if (workerWaitingTaskCount > workerExecThreadCount) {
log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", log.warn("current waiting task count {} is large than worker thread count {}, worker is busy",
workerWaitingTaskCount, workerExecThreadCount); workerWaitingTaskCount, workerExecThreadCount);
return Constants.BUSY_NODE_STATUE; return ServerStatus.BUSY;
} else { } else {
return Constants.NORMAL_NODE_STATUS; return ServerStatus.NORMAL;
} }
} }
} }

Loading…
Cancel
Save