diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md index 45d7341277..1a257dc92e 100644 --- a/docs/docs/en/architecture/configuration.md +++ b/docs/docs/en/architecture/configuration.md @@ -291,7 +291,6 @@ Location: `worker-server/conf/application.yaml` |worker.tenant-auto-create|true|tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.| |worker.max-cpu-load-avg|-1|worker max CPU load avg, only higher than the system CPU load average, worker server can be dispatched tasks. default value -1: the number of CPU cores * 2| |worker.reserved-memory|0.3|worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G| -|worker.groups|default|worker groups separated by comma, e.g., 'worker.groups=default,test'
worker will join corresponding group according to this config when startup| |worker.alert-listen-host|localhost|the alert listen host of worker| |worker.alert-listen-port|50052|the alert listen port of worker| |worker.registry-disconnect-strategy.strategy|stop|Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting| diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md index 500f08c9c8..691a253a50 100644 --- a/docs/docs/zh/architecture/configuration.md +++ b/docs/docs/zh/architecture/configuration.md @@ -285,7 +285,6 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置 |worker.tenant-auto-create|true|租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。| |worker.max-cpu-load-avg|-1|worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2| |worker.reserved-memory|0.3|worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G| -|worker.groups|default|worker分组配置,逗号分隔,例如'worker.groups=default,test'
worker启动时会根据该配置自动加入对应的分组| |worker.alert-listen-host|localhost|alert监听host| |worker.alert-listen-port|50052|alert监听端口| |worker.registry-disconnect-strategy.strategy|stop|当Worker与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting| diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index b00efe69b3..a6d46efd39 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -28,8 +28,6 @@ import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.UserType; -import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; @@ -41,13 +39,13 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -178,10 +176,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro * @return boolean */ private String checkWorkerGroupAddrList(WorkerGroup workerGroup) { - Map serverMaps = registryClient.getServerMaps(NodeType.WORKER, true); if (Strings.isNullOrEmpty(workerGroup.getAddrList())) { return null; } + Map serverMaps = registryClient.getServerMaps(NodeType.WORKER); for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) { if (!serverMaps.containsKey(addr)) { return addr; @@ -209,11 +207,11 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro Result result = new Result(); List workerGroups; if (loginUser.getUserType().equals(UserType.ADMIN_USER)) { - workerGroups = getWorkerGroups(true, null); + workerGroups = getWorkerGroups(null); } else { Set ids = resourcePermissionCheckService .userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP, loginUser.getId(), logger); - workerGroups = getWorkerGroups(true, ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids)); + workerGroups = getWorkerGroups(ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids)); } List resultDataList = new ArrayList<>(); int total = 0; @@ -259,20 +257,15 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro Map result = new HashMap<>(); List workerGroups; if (loginUser.getUserType().equals(UserType.ADMIN_USER)) { - workerGroups = getWorkerGroups(false, null); + workerGroups = getWorkerGroups(null); } else { Set ids = resourcePermissionCheckService .userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP, loginUser.getId(), logger); - workerGroups = getWorkerGroups(false, ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids)); + workerGroups = getWorkerGroups(ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids)); } List availableWorkerGroupList = workerGroups.stream() .map(WorkerGroup::getName) .collect(Collectors.toList()); - int index = availableWorkerGroupList.indexOf(Constants.DEFAULT_WORKER_GROUP); - if (index > -1) { - availableWorkerGroupList.remove(index); - availableWorkerGroupList.add(0, Constants.DEFAULT_WORKER_GROUP); - } result.put(Constants.DATA_LIST, availableWorkerGroupList); putMsg(result, Status.SUCCESS); return result; @@ -281,10 +274,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro /** * get worker groups * - * @param isPaging whether paging * @return WorkerGroup list */ - private List getWorkerGroups(boolean isPaging, List ids) { + private List getWorkerGroups(List ids) { // worker groups from database List workerGroups; if (ids != null) { @@ -292,64 +284,23 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro } else { workerGroups = workerGroupMapper.queryAllWorkerGroup(); } - - // worker groups from zookeeper - String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; - Collection workerGroupList = null; - try { - workerGroupList = registryClient.getChildrenKeys(workerPath); - } catch (Exception e) { - logger.error("Get worker groups exception, workerPath:{}, isPaging:{}", workerPath, isPaging, e); + Optional containDefaultWorkerGroups = workerGroups.stream() + .map(workerGroup -> Constants.DEFAULT_WORKER_GROUP.equals(workerGroup.getName())).findAny(); + if (!containDefaultWorkerGroups.isPresent() || !containDefaultWorkerGroups.get()) { + // there doesn't exist a default WorkerGroup, we will add all worker to the default worker group. + Set activeWorkerNodes = registryClient.getServerNodeSet(NodeType.WORKER); + WorkerGroup defaultWorkerGroup = new WorkerGroup(); + defaultWorkerGroup.setName(Constants.DEFAULT_WORKER_GROUP); + defaultWorkerGroup.setAddrList(String.join(Constants.COMMA, activeWorkerNodes)); + defaultWorkerGroup.setCreateTime(new Date()); + defaultWorkerGroup.setUpdateTime(new Date()); + defaultWorkerGroup.setSystemDefault(true); + workerGroups.add(defaultWorkerGroup); } - if (CollectionUtils.isEmpty(workerGroupList)) { - if (CollectionUtils.isEmpty(workerGroups) && !isPaging) { - WorkerGroup wg = new WorkerGroup(); - wg.setName(Constants.DEFAULT_WORKER_GROUP); - workerGroups.add(wg); - } - return workerGroups; - } - Map workerGroupsMap = null; - if (workerGroups.size() != 0) { - workerGroupsMap = workerGroups.stream().collect(Collectors.toMap(WorkerGroup::getName, - workerGroupItem -> workerGroupItem, (oldWorkerGroup, newWorkerGroup) -> oldWorkerGroup)); - } - for (String workerGroup : workerGroupList) { - String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup; - Collection childrenNodes = null; - try { - childrenNodes = registryClient.getChildrenKeys(workerGroupPath); - } catch (Exception e) { - logger.error("Get children nodes exception, workerGroupPath:{}.", workerGroupPath, e); - } - if (childrenNodes == null || childrenNodes.isEmpty()) { - continue; - } - WorkerGroup wg = new WorkerGroup(); - handleAddrList(wg, workerGroup, childrenNodes); - wg.setName(workerGroup); - if (isPaging) { - String registeredValue = - registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next()); - WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(registeredValue, WorkerHeartBeat.class); - wg.setCreateTime(new Date(workerHeartBeat.getStartupTime())); - wg.setUpdateTime(new Date(workerHeartBeat.getReportTime())); - wg.setSystemDefault(true); - if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) { - wg.setDescription(workerGroupsMap.get(workerGroup).getDescription()); - workerGroups.remove(workerGroupsMap.get(workerGroup)); - } - } - workerGroups.add(wg); - } return workerGroups; } - protected void handleAddrList(WorkerGroup wg, String workerGroup, Collection childrenNodes) { - wg.setAddrList(String.join(Constants.COMMA, childrenNodes)); - } - /** * delete worker group by id * @@ -373,8 +324,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro List processInstances = processInstanceMapper .queryByWorkerGroupNameAndStatus(workerGroup.getName(), Constants.NOT_TERMINATED_STATES); if (CollectionUtils.isNotEmpty(processInstances)) { - List processInstanceIds = processInstances.stream().map(ProcessInstance::getId).collect(Collectors.toList()); - logger.warn("Delete worker group failed because there are {} processInstances are using it, processInstanceIds:{}.", + List processInstanceIds = + processInstances.stream().map(ProcessInstance::getId).collect(Collectors.toList()); + logger.warn( + "Delete worker group failed because there are {} processInstances are using it, processInstanceIds:{}.", processInstances.size(), processInstanceIds); putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size()); return result; @@ -394,7 +347,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro @Override public Map getWorkerAddressList() { Map result = new HashMap<>(); - Set serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER, true); + Set serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER); result.put(Constants.DATA_LIST, serverNodeList); putMsg(result, Status.SUCCESS); return result; diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java index 118a82eb6d..7e15784e9c 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java @@ -67,7 +67,7 @@ public class WorkerGroupControllerTest extends AbstractControllerTest { Map serverMaps = new HashMap<>(); serverMaps.put("192.168.0.1", "192.168.0.1"); serverMaps.put("192.168.0.2", "192.168.0.2"); - Mockito.when(registryClient.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps); + Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps); MultiValueMap paramsMap = new LinkedMultiValueMap<>(); paramsMap.add("name", "cxc_work_group"); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java index 19c42c0711..2a1ca37ce9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.registry; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; -import java.util.Arrays; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.Server; @@ -38,9 +37,11 @@ import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue; import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -55,6 +56,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import javax.annotation.PreDestroy; @@ -87,16 +89,6 @@ public class ServerNodeManager implements InitializingBean { */ private final ConcurrentHashMap> workerGroupNodes = new ConcurrentHashMap<>(); - /** - * worker group nodes from registry center, workerGroup -> ips - */ - private final ConcurrentHashMap> registryWorkerGroupNodes = new ConcurrentHashMap<>(); - - /** - * worker group nodes from db, workerGroup -> ips - */ - private final ConcurrentHashMap> dbWorkerGroupNodes = new ConcurrentHashMap<>(); - /** * master nodes */ @@ -112,28 +104,14 @@ public class ServerNodeManager implements InitializingBean { @Autowired private RegistryClient registryClient; - /** - * eg : /dolphinscheduler/node/worker/group/127.0.0.1:xxx - */ - private static final int WORKER_LISTENER_CHECK_LENGTH = 5; - - /** - * worker group mapper - */ @Autowired private WorkerGroupMapper workerGroupMapper; private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue(); - /** - * alert dao - */ @Autowired private AlertDao alertDao; - /** - * master config - */ @Autowired private MasterConfig masterConfig; @@ -151,7 +129,6 @@ public class ServerNodeManager implements InitializingBean { return MASTER_SIZE; } - /** * init listener * @@ -165,7 +142,7 @@ public class ServerNodeManager implements InitializingBean { // init executor service executorService = - Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS); // init MasterNodeListener listener @@ -181,13 +158,8 @@ public class ServerNodeManager implements InitializingBean { public void load() { // master nodes from zookeeper updateMasterNodes(); - - // worker group nodes from zookeeper - Collection workerGroups = registryClient.getWorkerGroupDirectly(); - for (String workerGroup : workerGroups) { - syncWorkerGroupNodesFromRegistry(workerGroup, - registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD); - } + updateWorkerNodes(); + updateWorkerGroupMappings(); } /** @@ -198,39 +170,19 @@ public class ServerNodeManager implements InitializingBean { @Override public void run() { try { - dbWorkerGroupNodes.clear(); // sync worker node info - Map registryWorkerNodeMap = registryClient - .getServerMaps(NodeType.WORKER, true); - syncAllWorkerNodeInfo(registryWorkerNodeMap); - // sync worker group nodes from database - List workerGroupList = workerGroupMapper.queryAllWorkerGroup(); - if (CollectionUtils.isNotEmpty(workerGroupList)) { - for (WorkerGroup wg : workerGroupList) { - String workerGroupName = wg.getName(); - Set workerAddress = getWorkerAddressByWorkerGroup( - registryWorkerNodeMap, wg); - if (!workerAddress.isEmpty()) { - Set workerNodes = dbWorkerGroupNodes - .getOrDefault(workerGroupName, new HashSet<>()); - workerNodes.clear(); - workerNodes.addAll(workerAddress); - dbWorkerGroupNodes.put(workerGroupName, workerNodes); - } - } - } + updateWorkerNodes(); + updateWorkerGroupMappings(); + notifyWorkerInfoChangeListeners(); } catch (Exception e) { logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e); - } finally { - refreshWorkerGroupNodes(); } } } - protected Set getWorkerAddressByWorkerGroup(Map newWorkerNodeInfo, - WorkerGroup wg) { + WorkerGroup wg) { Set nodes = new HashSet<>(); String[] addrs = wg.getAddrList().split(Constants.COMMA); for (String addr : addrs) { @@ -254,30 +206,18 @@ public class ServerNodeManager implements InitializingBean { if (registryClient.isWorkerPath(path)) { try { String[] parts = path.split("/"); - if (parts.length < WORKER_LISTENER_CHECK_LENGTH) { - throw new IllegalArgumentException( - String.format("worker group path : %s is not valid, ignore", path)); - } - final String workerGroupName = parts[parts.length - 2]; final String workerAddress = parts[parts.length - 1]; + // todo: update workerNodeInfo logger.debug("received subscribe event : {}", event); - Collection currentNodes = registryClient - .getWorkerGroupNodesDirectly(workerGroupName); - syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type); - if (type == Type.ADD) { - logger.info("worker group node : {} added, currentNodes : {}", path, - currentNodes); + logger.info("Worker: {} added, currentNode : {}", path, workerAddress); } else if (type == Type.REMOVE) { - logger.info("worker group node : {} down.", path); + logger.info("Worker node : {} down.", path); alertDao.sendServerStoppedAlert(1, path, "WORKER"); } else if (type == Type.UPDATE) { - syncSingleWorkerNodeInfo(workerAddress, - JSONUtils.parseObject(data, WorkerHeartBeat.class)); + syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class)); } - } catch (IllegalArgumentException ex) { - logger.warn(ex.getMessage()); } catch (Exception ex) { logger.error("WorkerGroupListener capture data change and get data failed", ex); } @@ -287,6 +227,7 @@ public class ServerNodeManager implements InitializingBean { } class MasterDataListener implements SubscribeListener { + @Override public void notify(Event event) { final String path = event.path(); @@ -327,6 +268,50 @@ public class ServerNodeManager implements InitializingBean { } + private void updateWorkerNodes() { + workerGroupWriteLock.lock(); + try { + Map workerNodeMaps = registryClient.getServerMaps(NodeType.WORKER); + for (Map.Entry entry : workerNodeMaps.entrySet()) { + workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class)); + } + } finally { + workerGroupWriteLock.unlock(); + } + } + + private void updateWorkerGroupMappings() { + List workerGroups = workerGroupMapper.queryAllWorkerGroup(); + Map> tmpWorkerGroupMappings = new HashMap<>(); + try { + workerNodeInfoReadLock.lock(); + for (WorkerGroup workerGroup : workerGroups) { + String workerGroupName = workerGroup.getName(); + String[] workerAddresses = workerGroup.getAddrList().split(Constants.COMMA); + if (ArrayUtils.isEmpty(workerAddresses)) { + continue; + } + Set activeWorkerNodes = Arrays.stream(workerAddresses) + .filter(workerNodeInfo::containsKey).collect(Collectors.toSet()); + tmpWorkerGroupMappings.put(workerGroupName, activeWorkerNodes); + } + if (!tmpWorkerGroupMappings.containsKey(Constants.DEFAULT_WORKER_GROUP)) { + tmpWorkerGroupMappings.put(Constants.DEFAULT_WORKER_GROUP, workerNodeInfo.keySet()); + } + } finally { + workerNodeInfoReadLock.unlock(); + } + + workerGroupWriteLock.lock(); + try { + workerGroupNodes.clear(); + workerGroupNodes.putAll(tmpWorkerGroupMappings); + notifyWorkerInfoChangeListeners(); + } finally { + workerGroupWriteLock.unlock(); + } + } + /** * sync master nodes * @@ -344,60 +329,15 @@ public class ServerNodeManager implements InitializingBean { MASTER_SLOT = index; } else { logger.warn("current addr:{} is not in active master list", - masterConfig.getMasterAddress()); + masterConfig.getMasterAddress()); } logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, - MASTER_SLOT, masterConfig.getMasterAddress()); + MASTER_SLOT, masterConfig.getMasterAddress()); } finally { masterLock.unlock(); } } - /** - * sync worker group nodes from registry center - * - * @param workerGroup worker group - * @param nodes worker nodes - * @param type event type - */ - private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection nodes, - Type type) { - try { - if (type == Type.REMOVE) { - if (!registryWorkerGroupNodes.containsKey(workerGroup)) { - logger.warn("cannot remove worker group {}, not in active list", workerGroup); - return; - } - registryWorkerGroupNodes.remove(workerGroup); - } else { - Set workerNodes = registryWorkerGroupNodes - .getOrDefault(workerGroup, new HashSet<>()); - workerNodes.clear(); - workerNodes.addAll(nodes); - registryWorkerGroupNodes.put(workerGroup, workerNodes); - } - } finally { - refreshWorkerGroupNodes(); - } - } - - /** - * refresh worker group nodes - */ - private void refreshWorkerGroupNodes() { - workerGroupWriteLock.lock(); - try { - workerGroupNodes.clear(); - workerGroupNodes.putAll(registryWorkerGroupNodes); - workerGroupNodes.putAll(dbWorkerGroupNodes); - logger.debug("refresh worker group nodes, current list: {}", - Arrays.toString(workerGroupNodes.keySet().toArray())); - } finally { - notifyWorkerInfoChangeListeners(); - workerGroupWriteLock.unlock(); - } - } - public Map> getWorkerGroupNodes() { workerGroupReadLock.lock(); try { @@ -458,8 +398,7 @@ public class ServerNodeManager implements InitializingBean { try { workerNodeInfo.clear(); for (Map.Entry entry : newWorkerNodeInfo.entrySet()) { - workerNodeInfo.put(entry.getKey(), - JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class)); + workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class)); } } finally { workerNodeInfoWriteLock.unlock(); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java index 508ba293aa..2a5b2203f2 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java @@ -17,9 +17,12 @@ package org.apache.dolphinscheduler.service.registry; -import com.google.common.base.Strings; -import lombok.NonNull; -import org.apache.commons.lang3.StringUtils; +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.dolphinscheduler.common.Constants.COLON; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; +import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; @@ -31,28 +34,29 @@ import org.apache.dolphinscheduler.registry.api.ConnectionListener; import org.apache.dolphinscheduler.registry.api.Registry; import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.registry.api.SubscribeListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; +import org.apache.commons.lang3.StringUtils; + import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; -import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.dolphinscheduler.common.Constants.COLON; -import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING; -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS; -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; -import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import javax.annotation.PostConstruct; + +import lombok.NonNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import com.google.common.base.Strings; @Component public class RegistryClient { @@ -91,7 +95,7 @@ public class RegistryClient { } public List getServerList(NodeType nodeType) { - Map serverMaps = getServerMaps(nodeType, false); + Map serverMaps = getServerMaps(nodeType); String parentPath = rootNodePath(nodeType); List serverList = new ArrayList<>(); @@ -123,26 +127,24 @@ public class RegistryClient { server.setZkDirectory(parentPath + "/" + serverPath); // set host and port String[] hostAndPort = serverPath.split(COLON); - String[] hosts = hostAndPort[0].split(DIVISION_STRING); // fetch the last one - server.setHost(hosts[hosts.length - 1]); + server.setHost(hostAndPort[0]); server.setPort(Integer.parseInt(hostAndPort[1])); serverList.add(server); } return serverList; } - public Map getServerMaps(NodeType nodeType, boolean hostOnly) { + /** + * Return server host:port -> value + */ + public Map getServerMaps(NodeType nodeType) { Map serverMap = new HashMap<>(); try { String path = rootNodePath(nodeType); Collection serverList = getServerNodes(nodeType); for (String server : serverList) { - String host = server; - if (nodeType == NodeType.WORKER && hostOnly) { - host = server.split(SINGLE_SLASH)[1]; - } - serverMap.putIfAbsent(host, get(path + SINGLE_SLASH + server)); + serverMap.putIfAbsent(server, get(path + SINGLE_SLASH + server)); } } catch (Exception e) { logger.error("get server list failed", e); @@ -152,7 +154,7 @@ public class RegistryClient { } public boolean checkNodeExists(String host, NodeType nodeType) { - return getServerMaps(nodeType, true).keySet() + return getServerMaps(nodeType).keySet() .stream() .anyMatch(it -> it.contains(host)); } @@ -161,14 +163,6 @@ public class RegistryClient { return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS); } - public Collection getWorkerGroupDirectly() { - return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS); - } - - public Collection getWorkerGroupNodesDirectly(String workerGroup) { - return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup); - } - /** * get host ip:port, path format: parentPath/ip:port * @@ -241,14 +235,9 @@ public class RegistryClient { return registry.children(key); } - public Set getServerNodeSet(NodeType nodeType, boolean hostOnly) { + public Set getServerNodeSet(NodeType nodeType) { try { - return getServerNodes(nodeType).stream().map(server -> { - if (nodeType == NodeType.WORKER && hostOnly) { - return server.split(SINGLE_SLASH)[1]; - } - return server; - }).collect(Collectors.toSet()); + return new HashSet<>(getServerNodes(nodeType)); } catch (Exception e) { throw new RegistryException("Failed to get server node: " + nodeType, e); } @@ -272,13 +261,7 @@ public class RegistryClient { private Collection getServerNodes(NodeType nodeType) { final String path = rootNodePath(nodeType); - final Collection serverList = getChildrenKeys(path); - if (nodeType != NodeType.WORKER) { - return serverList; - } - return serverList.stream().flatMap(group -> getChildrenKeys(path + SINGLE_SLASH + group) - .stream() - .map(it -> group + SINGLE_SLASH + it)).collect(Collectors.toList()); + return getChildrenKeys(path); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index a7e6927415..96f00a1f30 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -17,11 +17,15 @@ package org.apache.dolphinscheduler.server.worker.config; -import com.google.common.collect.Sets; -import lombok.Data; -import org.apache.commons.collections4.CollectionUtils; +import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; + import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties; + +import java.time.Duration; + +import lombok.Data; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.properties.ConfigurationProperties; @@ -30,12 +34,6 @@ import org.springframework.validation.Errors; import org.springframework.validation.Validator; import org.springframework.validation.annotation.Validated; -import java.time.Duration; -import java.util.Set; -import java.util.stream.Collectors; - -import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS; - @Data @Validated @Configuration @@ -52,7 +50,6 @@ public class WorkerConfig implements Validator { private boolean tenantDistributedUser = false; private int maxCpuLoadAvg = -1; private double reservedMemory = 0.3; - private Set groups = Sets.newHashSet("default"); private String alertListenHost = "localhost"; private int alertListenPort = 50052; private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties(); @@ -61,7 +58,7 @@ public class WorkerConfig implements Validator { * This field doesn't need to set at config file, it will be calculated by workerIp:listenPort */ private String workerAddress; - private Set workerGroupRegistryPaths; + private String workerRegistryPath; @Override public boolean supports(Class clazz) { @@ -82,17 +79,7 @@ public class WorkerConfig implements Validator { } workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort())); - workerConfig.setGroups(workerConfig.getGroups().stream().map(String::trim).collect(Collectors.toSet())); - if (CollectionUtils.isEmpty(workerConfig.getGroups())) { - errors.rejectValue("groups", null, "should not be empty"); - } - - Set workerRegistryPaths = workerConfig.getGroups() - .stream() - .map(workerGroup -> REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup + "/" + workerConfig.getWorkerAddress()) - .collect(Collectors.toSet()); - - workerConfig.setWorkerGroupRegistryPaths(workerRegistryPaths); + workerConfig.setWorkerRegistryPath(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerConfig.getWorkerAddress()); printConfig(); } @@ -105,11 +92,10 @@ public class WorkerConfig implements Validator { logger.info("Worker config: tenantDistributedUser -> {}", tenantDistributedUser); logger.info("Worker config: maxCpuLoadAvg -> {}", maxCpuLoadAvg); logger.info("Worker config: reservedMemory -> {}", reservedMemory); - logger.info("Worker config: groups -> {}", groups); logger.info("Worker config: alertListenHost -> {}", alertListenHost); logger.info("Worker config: alertListenPort -> {}", alertListenPort); logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy); logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy); - logger.info("Worker config: workerGroupRegistryPaths: {}", workerGroupRegistryPaths); + logger.info("Worker config: workerRegistryPath: {}", workerRegistryPath); } } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java index d2147d1559..169db04a82 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java @@ -17,26 +17,28 @@ package org.apache.dolphinscheduler.server.worker.registry; -import lombok.extern.slf4j.Slf4j; +import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; + import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.model.WorkerHeartBeat; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.registry.api.RegistryException; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask; import org.apache.dolphinscheduler.service.registry.RegistryClient; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import javax.annotation.PostConstruct; import java.io.IOException; -import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; +import javax.annotation.PostConstruct; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; @Slf4j @Service @@ -56,7 +58,6 @@ public class WorkerRegistryClient implements AutoCloseable { private WorkerHeartBeatTask workerHeartBeatTask; - @PostConstruct public void initWorkRegistry() { this.workerHeartBeatTask = new WorkerHeartBeatTask( @@ -80,22 +81,19 @@ public class WorkerRegistryClient implements AutoCloseable { */ private void registry() { WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat(); + String workerZKPath = workerConfig.getWorkerRegistryPath(); + // remove before persist + registryClient.remove(workerZKPath); + registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat)); + log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath); - for (String workerZKPath : workerConfig.getWorkerGroupRegistryPaths()) { - // remove before persist - registryClient.remove(workerZKPath); - registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat)); - log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath); - } - - while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER)) { + while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), NodeType.WORKER)) { ThreadUtils.sleep(SLEEP_TIME_MILLIS); } // sleep 1s, waiting master failover remove ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS); - workerHeartBeatTask.start(); log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress()); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java index d2a9ca008c..bf81bbb945 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.server.worker.task; -import lombok.NonNull; -import lombok.extern.slf4j.Slf4j; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager; import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask; @@ -30,6 +28,9 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient; import java.util.function.Supplier; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + @Slf4j public class WorkerHeartBeatTask extends BaseHeartBeatTask { @@ -59,7 +60,8 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask { double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize(); int execThreads = workerConfig.getExecThreads(); int workerWaitingTaskCount = this.workerWaitingTaskCount.get(); - int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory, execThreads, workerWaitingTaskCount); + int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory, + execThreads, workerWaitingTaskCount); return WorkerHeartBeat.builder() .startupTime(ServerLifeCycleManager.getServerStartupTime()) @@ -82,11 +84,11 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask { @Override public void writeHeartBeat(WorkerHeartBeat workerHeartBeat) { String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat); - for (String workerGroupRegistryPath : workerConfig.getWorkerGroupRegistryPaths()) { - registryClient.persistEphemeral(workerGroupRegistryPath, workerHeartBeatJson); - } - log.info("Success write worker group heartBeatInfo into registry, workGroupPath: {} workerHeartBeatInfo: {}", - workerConfig.getWorkerGroupRegistryPaths(), workerHeartBeatJson); + String workerRegistryPath = workerConfig.getWorkerRegistryPath(); + registryClient.persistEphemeral(workerRegistryPath, workerHeartBeatJson); + log.info( + "Success write worker group heartBeatInfo into registry, workerRegistryPath: {} workerHeartBeatInfo: {}", + workerRegistryPath, workerHeartBeatJson); } public int getServerStatus(double loadAverage, @@ -96,11 +98,13 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask { int workerExecThreadCount, int workerWaitingTaskCount) { if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) { - log.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G", + log.warn( + "current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G", loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory); return Constants.ABNORMAL_NODE_STATUS; } else if (workerWaitingTaskCount > workerExecThreadCount) { - log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount); + log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", + workerWaitingTaskCount, workerExecThreadCount); return Constants.BUSY_NODE_STATUE; } else { return Constants.NORMAL_NODE_STATUS; diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml index 4effad23b1..d64f4fde2c 100644 --- a/dolphinscheduler-worker/src/main/resources/application.yaml +++ b/dolphinscheduler-worker/src/main/resources/application.yaml @@ -68,13 +68,6 @@ worker: max-cpu-load-avg: -1 # worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G reserved-memory: 0.3 - # for multiple worker groups, use hyphen before group name, e.g. - # groups: - # - default - # - group1 - # - group2 - groups: - - default # alert server listen host alert-listen-host: localhost alert-listen-port: 50052 diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java index 1d8e7ea516..f39b8a9ad7 100644 --- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.registry; import static org.mockito.BDDMockito.given; import org.apache.dolphinscheduler.common.enums.NodeType; +import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.registry.RegistryClient; @@ -27,15 +28,14 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient; import java.time.Duration; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.junit.jupiter.MockitoExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +44,7 @@ import com.google.common.collect.Sets; /** * worker registry test */ -@RunWith(MockitoJUnitRunner.Silent.class) +@ExtendWith(MockitoExtension.class) public class WorkerRegistryClientTest { private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class); @@ -78,28 +78,17 @@ public class WorkerRegistryClientTest { // workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP); } - @Before - public void before() { - given(workerConfig.getGroups()).willReturn(Sets.newHashSet("127.0.0.1")); - // given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1")); - // scheduleAtFixedRate - given(heartBeatExecutor.scheduleAtFixedRate(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), - Mockito.any(TimeUnit.class))).willReturn(null); - - } - @Test public void testStart() { - workerRegistryClient.initWorkRegistry(); - - given(workerManagerThread.getThreadPoolQueueSize()).willReturn(1); - - given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true); + given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234)); given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1)); + given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true); + workerRegistryClient.initWorkRegistry(); workerRegistryClient.start(); + Assertions.assertTrue(true); } @Test