Browse Source

[fix#12000]Cannot remove the WorkerGroup from the master service (#12050)

* [Bug] [Master] Cannot remove the WorkerGroup from the master service. #12000

* remove unnecessary locks

* Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>

Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>
3.1.0-release
Yann Ann 2 years ago committed by caishunfeng
parent
commit
3e899bee06
  1. 167
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

167
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -17,8 +17,9 @@
package org.apache.dolphinscheduler.server.master.registry;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
@ -34,14 +35,12 @@ import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PreDestroy;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@ -57,8 +56,13 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* server node manager
@ -74,16 +78,25 @@ public class ServerNodeManager implements InitializingBean {
private final ReentrantReadWriteLock.ReadLock workerGroupReadLock = workerGroupLock.readLock();
private final ReentrantReadWriteLock.WriteLock workerGroupWriteLock = workerGroupLock.writeLock();
private final ReentrantReadWriteLock workerNodeInfoLock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.ReadLock workerNodeInfoReadLock = workerNodeInfoLock.readLock();
private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock();
/**
* worker group nodes, workerGroup -> ips
* worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes
*/
private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
/**
* worker group nodes from registry center, workerGroup -> ips
*/
private final ConcurrentHashMap<String, Set<String>> registryWorkerGroupNodes = new ConcurrentHashMap<>();
/**
* worker group nodes from db, workerGroup -> ips
*/
private final ConcurrentHashMap<String, Set<String>> dbWorkerGroupNodes = new ConcurrentHashMap<>();
/**
* master nodes
*/
@ -138,7 +151,6 @@ public class ServerNodeManager implements InitializingBean {
return MASTER_SIZE;
}
/**
* init listener
*
@ -146,22 +158,20 @@ public class ServerNodeManager implements InitializingBean {
*/
@Override
public void afterPropertiesSet() throws Exception {
/**
* load nodes from zookeeper
*/
// load nodes from zookeeper
load();
/**
* init executor service
*/
executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
// init executor service
executorService =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
/*
* init MasterNodeListener listener
*/
// init MasterNodeListener listener
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
/*
* init WorkerNodeListener listener
*/
// init WorkerNodeListener listener
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
}
@ -169,17 +179,14 @@ public class ServerNodeManager implements InitializingBean {
* load nodes from zookeeper
*/
public void load() {
/*
* master nodes from zookeeper
*/
// master nodes from zookeeper
updateMasterNodes();
/*
* worker group nodes from zookeeper
*/
// worker group nodes from zookeeper
Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
for (String workerGroup : workerGroups) {
syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup));
syncWorkerGroupNodesFromRegistry(workerGroup,
registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD);
}
}
@ -191,29 +198,38 @@ public class ServerNodeManager implements InitializingBean {
@Override
public void run() {
try {
dbWorkerGroupNodes.clear();
// sync worker node info
Map<String, String> registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true);
Map<String, String> registryWorkerNodeMap = registryClient
.getServerMaps(NodeType.WORKER, true);
syncAllWorkerNodeInfo(registryWorkerNodeMap);
// sync worker group nodes from database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
if (CollectionUtils.isNotEmpty(workerGroupList)) {
for (WorkerGroup wg : workerGroupList) {
String workerGroupName = wg.getName();
Set<String> workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg);
Set<String> workerAddress = getWorkerAddressByWorkerGroup(
registryWorkerNodeMap, wg);
if (!workerAddress.isEmpty()) {
syncWorkerGroupNodes(workerGroupName, workerAddress);
Set<String> workerNodes = dbWorkerGroupNodes
.getOrDefault(workerGroupName, new HashSet<>());
workerNodes.clear();
workerNodes.addAll(workerAddress);
dbWorkerGroupNodes.put(workerGroupName, workerNodes);
}
}
}
notifyWorkerInfoChangeListeners();
} catch (Exception e) {
logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
} finally {
refreshWorkerGroupNodes();
}
}
}
protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo,
WorkerGroup wg) {
Set<String> nodes = new HashSet<>();
String[] addrs = wg.getAddrList().split(Constants.COMMA);
for (String addr : addrs) {
@ -238,29 +254,27 @@ public class ServerNodeManager implements InitializingBean {
try {
String[] parts = path.split("/");
if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path));
throw new IllegalArgumentException(
String.format("worker group path : %s is not valid, ignore", path));
}
final String workerGroupName = parts[parts.length - 2];
final String workerAddress = parts[parts.length - 1];
logger.debug("received subscribe event : {}", event);
Collection<String> currentNodes = registryClient
.getWorkerGroupNodesDirectly(workerGroupName);
syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type);
if (type == Type.ADD) {
logger.info("worker group node : {} added.", path);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
logger.info("currentNodes : {}", currentNodes);
syncWorkerGroupNodes(workerGroupName, currentNodes);
logger.info("worker group node : {} added, currentNodes : {}", path,
currentNodes);
} else if (type == Type.REMOVE) {
logger.info("worker group node : {} down.", path);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
syncWorkerGroupNodes(workerGroupName, currentNodes);
alertDao.sendServerStoppedAlert(1, path, "WORKER");
} else if (type == Type.UPDATE) {
logger.debug("worker group node : {} update, data: {}", path, data);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
syncWorkerGroupNodes(workerGroupName, currentNodes);
syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
syncSingleWorkerNodeInfo(workerAddress,
JSONUtils.parseObject(data, WorkerHeartBeat.class));
}
notifyWorkerInfoChangeListeners();
} catch (IllegalArgumentException ex) {
logger.warn(ex.getMessage());
} catch (Exception ex) {
@ -272,6 +286,7 @@ public class ServerNodeManager implements InitializingBean {
}
class MasterDataListener implements SubscribeListener {
@Override
public void notify(Event event) {
final String path = event.path();
@ -328,28 +343,57 @@ public class ServerNodeManager implements InitializingBean {
MASTER_SIZE = nodes.size();
MASTER_SLOT = index;
} else {
logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress());
logger.warn("current addr:{} is not in active master list",
masterConfig.getMasterAddress());
}
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, masterConfig.getMasterAddress());
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE,
MASTER_SLOT, masterConfig.getMasterAddress());
} finally {
masterLock.unlock();
}
}
/**
* sync worker group nodes
* sync worker group nodes from registry center
*
* @param workerGroup worker group
* @param nodes worker nodes
* @param type event type
*/
private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection<String> nodes,
Type type) {
try {
if (type == Type.REMOVE) {
if (!registryWorkerGroupNodes.containsKey(workerGroup)) {
logger.warn("cannot remove worker group {}, not in active list", workerGroup);
return;
}
registryWorkerGroupNodes.remove(workerGroup);
} else {
Set<String> workerNodes = registryWorkerGroupNodes
.getOrDefault(workerGroup, new HashSet<>());
workerNodes.clear();
workerNodes.addAll(nodes);
registryWorkerGroupNodes.put(workerGroup, workerNodes);
}
} finally {
refreshWorkerGroupNodes();
}
}
/**
* refresh worker group nodes
*/
private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) {
private void refreshWorkerGroupNodes() {
workerGroupWriteLock.lock();
try {
Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>());
workerNodes.clear();
workerNodes.addAll(nodes);
workerGroupNodes.put(workerGroup, workerNodes);
workerGroupNodes.clear();
workerGroupNodes.putAll(registryWorkerGroupNodes);
workerGroupNodes.putAll(dbWorkerGroupNodes);
logger.debug("refresh worker group nodes, current list: {}",
Arrays.toString(workerGroupNodes.keySet().toArray()));
} finally {
notifyWorkerInfoChangeListeners();
workerGroupWriteLock.unlock();
}
}
@ -414,7 +458,8 @@ public class ServerNodeManager implements InitializingBean {
try {
workerNodeInfo.clear();
for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
workerNodeInfo.put(entry.getKey(),
JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
}
} finally {
workerNodeInfoWriteLock.unlock();

Loading…
Cancel
Save