From ada7cf71d5c70174c7d7c6608f4f60896db5d6b9 Mon Sep 17 00:00:00 2001 From: Yann Ann Date: Sat, 24 Sep 2022 18:57:20 +0800 Subject: [PATCH] [fix#12000]Cannot remove the WorkerGroup from the master service (#12050) * [Bug] [Master] Cannot remove the WorkerGroup from the master service. #12000 * remove unnecessary locks * Update dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java Co-authored-by: caishunfeng Co-authored-by: caishunfeng --- .../master/registry/ServerNodeManager.java | 141 +++++++++++------- 1 file changed, 91 insertions(+), 50 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index 3ec10b3ed7..19c42c0711 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.master.registry; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; +import java.util.Arrays; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; @@ -82,10 +83,20 @@ public class ServerNodeManager implements InitializingBean { private final ReentrantReadWriteLock.WriteLock workerNodeInfoWriteLock = workerNodeInfoLock.writeLock(); /** - * worker group nodes, workerGroup -> ips + * worker group nodes, workerGroup -> ips, combining registryWorkerGroupNodes and dbWorkerGroupNodes */ private final ConcurrentHashMap> workerGroupNodes = new ConcurrentHashMap<>(); + /** + * worker group nodes from registry center, workerGroup -> ips + */ + private final ConcurrentHashMap> registryWorkerGroupNodes = new ConcurrentHashMap<>(); + + /** + * worker group nodes from db, workerGroup -> ips + */ + private final ConcurrentHashMap> dbWorkerGroupNodes = new ConcurrentHashMap<>(); + /** * master nodes */ @@ -140,6 +151,7 @@ public class ServerNodeManager implements InitializingBean { return MASTER_SIZE; } + /** * init listener * @@ -147,23 +159,19 @@ public class ServerNodeManager implements InitializingBean { */ @Override public void afterPropertiesSet() throws Exception { - /** - * load nodes from zookeeper - */ + + // load nodes from zookeeper load(); - /** - * init executor service - */ + + // init executor service executorService = - Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS); - /* - * init MasterNodeListener listener - */ + + // init MasterNodeListener listener registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_MASTERS, new MasterDataListener()); - /* - * init WorkerNodeListener listener - */ + + // init WorkerNodeListener listener registryClient.subscribe(REGISTRY_DOLPHINSCHEDULER_WORKERS, new WorkerDataListener()); } @@ -171,17 +179,14 @@ public class ServerNodeManager implements InitializingBean { * load nodes from zookeeper */ public void load() { - /* - * master nodes from zookeeper - */ + // master nodes from zookeeper updateMasterNodes(); - /* - * worker group nodes from zookeeper - */ + // worker group nodes from zookeeper Collection workerGroups = registryClient.getWorkerGroupDirectly(); for (String workerGroup : workerGroups) { - syncWorkerGroupNodes(workerGroup, registryClient.getWorkerGroupNodesDirectly(workerGroup)); + syncWorkerGroupNodesFromRegistry(workerGroup, + registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD); } } @@ -193,28 +198,39 @@ public class ServerNodeManager implements InitializingBean { @Override public void run() { try { + dbWorkerGroupNodes.clear(); + // sync worker node info - Map registryWorkerNodeMap = registryClient.getServerMaps(NodeType.WORKER, true); + Map registryWorkerNodeMap = registryClient + .getServerMaps(NodeType.WORKER, true); syncAllWorkerNodeInfo(registryWorkerNodeMap); // sync worker group nodes from database List workerGroupList = workerGroupMapper.queryAllWorkerGroup(); if (CollectionUtils.isNotEmpty(workerGroupList)) { for (WorkerGroup wg : workerGroupList) { String workerGroupName = wg.getName(); - Set workerAddress = getWorkerAddressByWorkerGroup(registryWorkerNodeMap, wg); + Set workerAddress = getWorkerAddressByWorkerGroup( + registryWorkerNodeMap, wg); if (!workerAddress.isEmpty()) { - syncWorkerGroupNodes(workerGroupName, workerAddress); + Set workerNodes = dbWorkerGroupNodes + .getOrDefault(workerGroupName, new HashSet<>()); + workerNodes.clear(); + workerNodes.addAll(workerAddress); + dbWorkerGroupNodes.put(workerGroupName, workerNodes); } } } - notifyWorkerInfoChangeListeners(); } catch (Exception e) { logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e); + } finally { + refreshWorkerGroupNodes(); } } } - protected Set getWorkerAddressByWorkerGroup(Map newWorkerNodeInfo, WorkerGroup wg) { + + protected Set getWorkerAddressByWorkerGroup(Map newWorkerNodeInfo, + WorkerGroup wg) { Set nodes = new HashSet<>(); String[] addrs = wg.getAddrList().split(Constants.COMMA); for (String addr : addrs) { @@ -240,29 +256,26 @@ public class ServerNodeManager implements InitializingBean { String[] parts = path.split("/"); if (parts.length < WORKER_LISTENER_CHECK_LENGTH) { throw new IllegalArgumentException( - String.format("worker group path : %s is not valid, ignore", path)); + String.format("worker group path : %s is not valid, ignore", path)); } final String workerGroupName = parts[parts.length - 2]; final String workerAddress = parts[parts.length - 1]; + logger.debug("received subscribe event : {}", event); + Collection currentNodes = registryClient + .getWorkerGroupNodesDirectly(workerGroupName); + syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type); + if (type == Type.ADD) { - logger.info("worker group node : {} added.", path); - Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); - logger.info("currentNodes : {}", currentNodes); - syncWorkerGroupNodes(workerGroupName, currentNodes); + logger.info("worker group node : {} added, currentNodes : {}", path, + currentNodes); } else if (type == Type.REMOVE) { logger.info("worker group node : {} down.", path); - Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); - syncWorkerGroupNodes(workerGroupName, currentNodes); alertDao.sendServerStoppedAlert(1, path, "WORKER"); } else if (type == Type.UPDATE) { - logger.debug("worker group node : {} update, data: {}", path, data); - Collection currentNodes = registryClient.getWorkerGroupNodesDirectly(workerGroupName); - syncWorkerGroupNodes(workerGroupName, currentNodes); - - syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class)); + syncSingleWorkerNodeInfo(workerAddress, + JSONUtils.parseObject(data, WorkerHeartBeat.class)); } - notifyWorkerInfoChangeListeners(); } catch (IllegalArgumentException ex) { logger.warn(ex.getMessage()); } catch (Exception ex) { @@ -274,7 +287,6 @@ public class ServerNodeManager implements InitializingBean { } class MasterDataListener implements SubscribeListener { - @Override public void notify(Event event) { final String path = event.path(); @@ -331,29 +343,57 @@ public class ServerNodeManager implements InitializingBean { MASTER_SIZE = nodes.size(); MASTER_SLOT = index; } else { - logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress()); - } - logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT, + logger.warn("current addr:{} is not in active master list", masterConfig.getMasterAddress()); + } + logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, + MASTER_SLOT, masterConfig.getMasterAddress()); } finally { masterLock.unlock(); } } /** - * sync worker group nodes + * sync worker group nodes from registry center * * @param workerGroup worker group * @param nodes worker nodes + * @param type event type + */ + private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection nodes, + Type type) { + try { + if (type == Type.REMOVE) { + if (!registryWorkerGroupNodes.containsKey(workerGroup)) { + logger.warn("cannot remove worker group {}, not in active list", workerGroup); + return; + } + registryWorkerGroupNodes.remove(workerGroup); + } else { + Set workerNodes = registryWorkerGroupNodes + .getOrDefault(workerGroup, new HashSet<>()); + workerNodes.clear(); + workerNodes.addAll(nodes); + registryWorkerGroupNodes.put(workerGroup, workerNodes); + } + } finally { + refreshWorkerGroupNodes(); + } + } + + /** + * refresh worker group nodes */ - private void syncWorkerGroupNodes(String workerGroup, Collection nodes) { + private void refreshWorkerGroupNodes() { workerGroupWriteLock.lock(); try { - Set workerNodes = workerGroupNodes.getOrDefault(workerGroup, new HashSet<>()); - workerNodes.clear(); - workerNodes.addAll(nodes); - workerGroupNodes.put(workerGroup, workerNodes); + workerGroupNodes.clear(); + workerGroupNodes.putAll(registryWorkerGroupNodes); + workerGroupNodes.putAll(dbWorkerGroupNodes); + logger.debug("refresh worker group nodes, current list: {}", + Arrays.toString(workerGroupNodes.keySet().toArray())); } finally { + notifyWorkerInfoChangeListeners(); workerGroupWriteLock.unlock(); } } @@ -418,7 +458,8 @@ public class ServerNodeManager implements InitializingBean { try { workerNodeInfo.clear(); for (Map.Entry entry : newWorkerNodeInfo.entrySet()) { - workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class)); + workerNodeInfo.put(entry.getKey(), + JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class)); } } finally { workerNodeInfoWriteLock.unlock();