Browse Source

[fix#12000]Cannot remove the WorkerGroup from the master service (#12050)

* [Bug] [Master] Cannot remove the WorkerGroup from the master service. #12000

* remove unnecessary locks

* Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>

Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>
3.2.0-release
Yann Ann 2 years ago committed by GitHub
parent
commit
ada7cf71d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 141
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

141
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import java.util.Arrays;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.model.Server;
@ -82,10 +83,20 @@ public class ServerNodeManager implements InitializingBean {
private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock(); private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock();
/** /**
* worker group nodes, workerGroup -> ips * worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes
*/ */
private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
/**
* worker group nodes from registry center, workerGroup -> ips
*/
private final ConcurrentHashMap<String, Set<String>> registryWorkerGroupNodes = new ConcurrentHashMap<>();
/**
* worker group nodes from db, workerGroup -> ips
*/
private final ConcurrentHashMap<String, Set<String>> dbWorkerGroupNodes = new ConcurrentHashMap<>();
/** /**
* master nodes * master nodes
*/ */
@ -140,6 +151,7 @@ public class ServerNodeManager implements InitializingBean {
return MASTER_SIZE; return MASTER_SIZE;
} }
/** /**
* init listener * init listener
* *
@ -147,23 +159,19 @@ public class ServerNodeManager implements InitializingBean {
*/ */
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
/**
* load nodes from zookeeper // load nodes from zookeeper
*/
load(); load();
/**
* init executor service // init executor service
*/
executorService = executorService =
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor"));
executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS); executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS);
/*
* init MasterNodeListener listener // init MasterNodeListener listener
*/
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener()); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener());
/*
* init WorkerNodeListener listener // init WorkerNodeListener listener
*/
registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener()); registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener());
} }
@ -171,17 +179,14 @@ public class ServerNodeManager implements InitializingBean {
* load nodes from zookeeper * load nodes from zookeeper
*/ */
public void load() { public void load() {
/* // master nodes from zookeeper
* master nodes from zookeeper
*/
updateMasterNodes(); updateMasterNodes();
/* // worker group nodes from zookeeper
* worker group nodes from zookeeper
*/
Collection<String> workerGroups = registryClient.getWorkerGroupDirectly(); Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
for (String workerGroup : workerGroups) { for (String workerGroup : workerGroups) {
syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup)); syncWorkerGroupNodesFromRegistry(workerGroup,
registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD);
} }
} }
@ -193,28 +198,39 @@ public class ServerNodeManager implements InitializingBean {
@Override @Override
public void run() { public void run() {
try { try {
dbWorkerGroupNodes.clear();
// sync worker node info // sync worker node info
Map<String, String> registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true); Map<String, String> registryWorkerNodeMap = registryClient
.getServerMaps(NodeType.WORKER, true);
syncAllWorkerNodeInfo(registryWorkerNodeMap); syncAllWorkerNodeInfo(registryWorkerNodeMap);
// sync worker group nodes from database // sync worker group nodes from database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup(); List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
if (CollectionUtils.isNotEmpty(workerGroupList)) { if (CollectionUtils.isNotEmpty(workerGroupList)) {
for (WorkerGroup wg : workerGroupList) { for (WorkerGroup wg : workerGroupList) {
String workerGroupName = wg.getName(); String workerGroupName = wg.getName();
Set<String> workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg); Set<String> workerAddress = getWorkerAddressByWorkerGroup(
registryWorkerNodeMap, wg);
if (!workerAddress.isEmpty()) { if (!workerAddress.isEmpty()) {
syncWorkerGroupNodes(workerGroupName, workerAddress); Set<String> workerNodes = dbWorkerGroupNodes
.getOrDefault(workerGroupName, new HashSet<>());
workerNodes.clear();
workerNodes.addAll(workerAddress);
dbWorkerGroupNodes.put(workerGroupName, workerNodes);
} }
} }
} }
notifyWorkerInfoChangeListeners();
} catch (Exception e) { } catch (Exception e) {
logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e); logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
} finally {
refreshWorkerGroupNodes();
} }
} }
} }
protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo,
WorkerGroup wg) {
Set<String> nodes = new HashSet<>(); Set<String> nodes = new HashSet<>();
String[] addrs = wg.getAddrList().split(Constants.COMMA); String[] addrs = wg.getAddrList().split(Constants.COMMA);
for (String addr : addrs) { for (String addr : addrs) {
@ -240,29 +256,26 @@ public class ServerNodeManager implements InitializingBean {
String[] parts = path.split("/"); String[] parts = path.split("/");
if (parts.length < WORKER_LISTENER_CHECK_LENGTH) { if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
String.format("worker group path : %s is not valid, ignore", path)); String.format("worker group path : %s is not valid, ignore", path));
} }
final String workerGroupName = parts[parts.length - 2]; final String workerGroupName = parts[parts.length - 2];
final String workerAddress = parts[parts.length - 1]; final String workerAddress = parts[parts.length - 1];
logger.debug("received subscribe event : {}", event);
Collection<String> currentNodes = registryClient
.getWorkerGroupNodesDirectly(workerGroupName);
syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type);
if (type == Type.ADD) { if (type == Type.ADD) {
logger.info("worker group node : {} added.", path); logger.info("worker group node : {} added, currentNodes : {}", path,
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); currentNodes);
logger.info("currentNodes : {}", currentNodes);
syncWorkerGroupNodes(workerGroupName, currentNodes);
} else if (type == Type.REMOVE) { } else if (type == Type.REMOVE) {
logger.info("worker group node : {} down.", path); logger.info("worker group node : {} down.", path);
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName);
syncWorkerGroupNodes(workerGroupName, currentNodes);
alertDao.sendServerStoppedAlert(1, path, "WORKER"); alertDao.sendServerStoppedAlert(1, path, "WORKER");
} else if (type == Type.UPDATE) { } else if (type == Type.UPDATE) {
logger.debug("worker group node : {} update, data: {}", path, data); syncSingleWorkerNodeInfo(workerAddress,
Collection<String> currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); JSONUtils.parseObject(data, WorkerHeartBeat.class));
syncWorkerGroupNodes(workerGroupName, currentNodes);
syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
} }
notifyWorkerInfoChangeListeners();
} catch (IllegalArgumentException ex) { } catch (IllegalArgumentException ex) {
logger.warn(ex.getMessage()); logger.warn(ex.getMessage());
} catch (Exception ex) { } catch (Exception ex) {
@ -274,7 +287,6 @@ public class ServerNodeManager implements InitializingBean {
} }
class MasterDataListener implements SubscribeListener { class MasterDataListener implements SubscribeListener {
@Override @Override
public void notify(Event event) { public void notify(Event event) {
final String path = event.path(); final String path = event.path();
@ -331,29 +343,57 @@ public class ServerNodeManager implements InitializingBean {
MASTER_SIZE = nodes.size(); MASTER_SIZE = nodes.size();
MASTER_SLOT = index; MASTER_SLOT = index;
} else { } else {
logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress()); logger.warn("current addr:{} is not in active master list",
}
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT,
masterConfig.getMasterAddress()); masterConfig.getMasterAddress());
}
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE,
MASTER_SLOT, masterConfig.getMasterAddress());
} finally { } finally {
masterLock.unlock(); masterLock.unlock();
} }
} }
/** /**
* sync worker group nodes * sync worker group nodes from registry center
* *
* @param workerGroup worker group * @param workerGroup worker group
* @param nodes worker nodes * @param nodes worker nodes
* @param type event type
*/
private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection<String> nodes,
Type type) {
try {
if (type == Type.REMOVE) {
if (!registryWorkerGroupNodes.containsKey(workerGroup)) {
logger.warn("cannot remove worker group {}, not in active list", workerGroup);
return;
}
registryWorkerGroupNodes.remove(workerGroup);
} else {
Set<String> workerNodes = registryWorkerGroupNodes
.getOrDefault(workerGroup, new HashSet<>());
workerNodes.clear();
workerNodes.addAll(nodes);
registryWorkerGroupNodes.put(workerGroup, workerNodes);
}
} finally {
refreshWorkerGroupNodes();
}
}
/**
* refresh worker group nodes
*/ */
private void syncWorkerGroupNodes(String workerGroup, Collection<String> nodes) { private void refreshWorkerGroupNodes() {
workerGroupWriteLock.lock(); workerGroupWriteLock.lock();
try { try {
Set<String> workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>()); workerGroupNodes.clear();
workerNodes.clear(); workerGroupNodes.putAll(registryWorkerGroupNodes);
workerNodes.addAll(nodes); workerGroupNodes.putAll(dbWorkerGroupNodes);
workerGroupNodes.put(workerGroup, workerNodes); logger.debug("refresh worker group nodes, current list: {}",
Arrays.toString(workerGroupNodes.keySet().toArray()));
} finally { } finally {
notifyWorkerInfoChangeListeners();
workerGroupWriteLock.unlock(); workerGroupWriteLock.unlock();
} }
} }
@ -418,7 +458,8 @@ public class ServerNodeManager implements InitializingBean {
try { try {
workerNodeInfo.clear(); workerNodeInfo.clear();
for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) { for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class)); workerNodeInfo.put(entry.getKey(),
JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
} }
} finally { } finally {
workerNodeInfoWriteLock.unlock(); workerNodeInfoWriteLock.unlock();

Loading…
Cancel
Save