diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md
index 279411ef75..6a0438aebb 100644
--- a/docs/docs/en/architecture/configuration.md
+++ b/docs/docs/en/architecture/configuration.md
@@ -291,7 +291,6 @@ Location: `worker-server/conf/application.yaml`
|worker.tenant-auto-create|true|tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.|
|worker.max-cpu-load-avg|-1|worker max CPU load avg, only higher than the system CPU load average, worker server can be dispatched tasks. default value -1: the number of CPU cores * 2|
|worker.reserved-memory|0.3|worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G|
-|worker.groups|default|worker groups separated by comma, e.g., 'worker.groups=default,test'
worker will join corresponding group according to this config when startup|
|worker.alert-listen-host|localhost|the alert listen host of worker|
|worker.alert-listen-port|50052|the alert listen port of worker|
|worker.registry-disconnect-strategy.strategy|stop|Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting|
diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md
index be822c89a2..184053b68a 100644
--- a/docs/docs/zh/architecture/configuration.md
+++ b/docs/docs/zh/architecture/configuration.md
@@ -285,7 +285,6 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置
|worker.tenant-auto-create|true|租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。|
|worker.max-cpu-load-avg|-1|worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2|
|worker.reserved-memory|0.3|worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G|
-|worker.groups|default|worker分组配置,逗号分隔,例如'worker.groups=default,test'
worker启动时会根据该配置自动加入对应的分组|
|worker.alert-listen-host|localhost|alert监听host|
|worker.alert-listen-port|50052|alert监听端口|
|worker.registry-disconnect-strategy.strategy|stop|当Worker与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|
diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
index 4e27998019..497df26ca6 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
@@ -28,8 +28,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
-import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
@@ -41,13 +39,13 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -174,10 +172,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
* @return boolean
*/
private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
- Map serverMaps = registryClient.getServerMaps(NodeType.WORKER, true);
if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
return null;
}
+ Map serverMaps = registryClient.getServerMaps(NodeType.WORKER);
for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) {
if (!serverMaps.containsKey(addr)) {
return addr;
@@ -205,11 +203,11 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
Result result = new Result();
List workerGroups;
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
- workerGroups = getWorkerGroups(true, null);
+ workerGroups = getWorkerGroups(null);
} else {
Set ids = resourcePermissionCheckService
.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP, loginUser.getId(), logger);
- workerGroups = getWorkerGroups(true, ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
+ workerGroups = getWorkerGroups(ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
}
List resultDataList = new ArrayList<>();
int total = 0;
@@ -255,20 +253,15 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
Map result = new HashMap<>();
List workerGroups;
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
- workerGroups = getWorkerGroups(false, null);
+ workerGroups = getWorkerGroups(null);
} else {
Set ids = resourcePermissionCheckService
.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP, loginUser.getId(), logger);
- workerGroups = getWorkerGroups(false, ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
+ workerGroups = getWorkerGroups(ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
}
List availableWorkerGroupList = workerGroups.stream()
.map(WorkerGroup::getName)
.collect(Collectors.toList());
- int index = availableWorkerGroupList.indexOf(Constants.DEFAULT_WORKER_GROUP);
- if (index > -1) {
- availableWorkerGroupList.remove(index);
- availableWorkerGroupList.add(0, Constants.DEFAULT_WORKER_GROUP);
- }
result.put(Constants.DATA_LIST, availableWorkerGroupList);
putMsg(result, Status.SUCCESS);
return result;
@@ -277,10 +270,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
/**
* get worker groups
*
- * @param isPaging whether paging
* @return WorkerGroup list
*/
- private List getWorkerGroups(boolean isPaging, List ids) {
+ private List getWorkerGroups(List ids) {
// worker groups from database
List workerGroups;
if (ids != null) {
@@ -289,63 +281,23 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
workerGroups = workerGroupMapper.queryAllWorkerGroup();
}
- // worker groups from zookeeper
- String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
- Collection workerGroupList = null;
- try {
- workerGroupList = registryClient.getChildrenKeys(workerPath);
- } catch (Exception e) {
- logger.error("getWorkerGroups exception, workerPath: {}, isPaging: {}", workerPath, isPaging, e);
+ Optional containDefaultWorkerGroups = workerGroups.stream()
+ .map(workerGroup -> Constants.DEFAULT_WORKER_GROUP.equals(workerGroup.getName())).findAny();
+ if (!containDefaultWorkerGroups.isPresent() || !containDefaultWorkerGroups.get()) {
+ // there doesn't exist a default WorkerGroup, we will add all worker to the default worker group.
+ Set activeWorkerNodes = registryClient.getServerNodeSet(NodeType.WORKER);
+ WorkerGroup defaultWorkerGroup = new WorkerGroup();
+ defaultWorkerGroup.setName(Constants.DEFAULT_WORKER_GROUP);
+ defaultWorkerGroup.setAddrList(String.join(Constants.COMMA, activeWorkerNodes));
+ defaultWorkerGroup.setCreateTime(new Date());
+ defaultWorkerGroup.setUpdateTime(new Date());
+ defaultWorkerGroup.setSystemDefault(true);
+ workerGroups.add(defaultWorkerGroup);
}
- if (CollectionUtils.isEmpty(workerGroupList)) {
- if (CollectionUtils.isEmpty(workerGroups) && !isPaging) {
- WorkerGroup wg = new WorkerGroup();
- wg.setName(Constants.DEFAULT_WORKER_GROUP);
- workerGroups.add(wg);
- }
- return workerGroups;
- }
- Map workerGroupsMap = null;
- if (workerGroups.size() != 0) {
- workerGroupsMap = workerGroups.stream().collect(Collectors.toMap(WorkerGroup::getName,
- workerGroupItem -> workerGroupItem, (oldWorkerGroup, newWorkerGroup) -> oldWorkerGroup));
- }
- for (String workerGroup : workerGroupList) {
- String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup;
- Collection childrenNodes = null;
- try {
- childrenNodes = registryClient.getChildrenKeys(workerGroupPath);
- } catch (Exception e) {
- logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath);
- }
- if (childrenNodes == null || childrenNodes.isEmpty()) {
- continue;
- }
- WorkerGroup wg = new WorkerGroup();
- handleAddrList(wg, workerGroup, childrenNodes);
- wg.setName(workerGroup);
- if (isPaging) {
- String registeredValue =
- registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
- WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(registeredValue, WorkerHeartBeat.class);
- wg.setCreateTime(new Date(workerHeartBeat.getStartupTime()));
- wg.setUpdateTime(new Date(workerHeartBeat.getReportTime()));
- wg.setSystemDefault(true);
- if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) {
- wg.setDescription(workerGroupsMap.get(workerGroup).getDescription());
- workerGroups.remove(workerGroupsMap.get(workerGroup));
- }
- }
- workerGroups.add(wg);
- }
return workerGroups;
}
- protected void handleAddrList(WorkerGroup wg, String workerGroup, Collection childrenNodes) {
- wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
- }
-
/**
* delete worker group by id
*
@@ -368,6 +320,11 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
List processInstances = processInstanceMapper
.queryByWorkerGroupNameAndStatus(workerGroup.getName(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
+ List processInstanceIds =
+ processInstances.stream().map(ProcessInstance::getId).collect(Collectors.toList());
+ logger.warn(
+ "Delete worker group failed because there are {} processInstances are using it, processInstanceIds:{}.",
+ processInstances.size(), processInstanceIds);
putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size());
return result;
}
@@ -385,7 +342,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Override
public Map getWorkerAddressList() {
Map result = new HashMap<>();
- Set serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER, true);
+ Set serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER);
result.put(Constants.DATA_LIST, serverNodeList);
putMsg(result, Status.SUCCESS);
return result;
diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
index 72bad1e686..afff430388 100644
--- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
+++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
@@ -67,7 +67,7 @@ public class WorkerGroupControllerTest extends AbstractControllerTest {
Map serverMaps = new HashMap<>();
serverMaps.put("192.168.0.1", "192.168.0.1");
serverMaps.put("192.168.0.2", "192.168.0.2");
- PowerMockito.when(registryClient.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps);
+ Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
MultiValueMap paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name","cxc_work_group");
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
index 75453c8649..e97738cbdf 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
@@ -17,10 +17,10 @@
package org.apache.dolphinscheduler.server.master.registry;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
@@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
@@ -55,6 +56,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
@@ -87,16 +89,6 @@ public class ServerNodeManager implements InitializingBean {
*/
private final ConcurrentHashMap> workerGroupNodes = new ConcurrentHashMap<>();
- /**
- * worker group nodes from registry center, workerGroup -> ips
- */
- private final ConcurrentHashMap> registryWorkerGroupNodes = new ConcurrentHashMap<>();
-
- /**
- * worker group nodes from db, workerGroup -> ips
- */
- private final ConcurrentHashMap> dbWorkerGroupNodes = new ConcurrentHashMap<>();
-
/**
* master nodes
*/
@@ -112,28 +104,14 @@ public class ServerNodeManager implements InitializingBean {
@Autowired
private RegistryClient registryClient;
- /**
- * eg : /dolphinscheduler/node/worker/group/127.0.0.1:xxx
- */
- private static final int WORKER_LISTENER_CHECK_LENGTH = 5;
-
- /**
- * worker group mapper
- */
@Autowired
private WorkerGroupMapper workerGroupMapper;
private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
- /**
- * alert dao
- */
@Autowired
private AlertDao alertDao;
- /**
- * master config
- */
@Autowired
private MasterConfig masterConfig;
@@ -180,13 +158,8 @@ public class ServerNodeManager implements InitializingBean {
public void load() {
// master nodes from zookeeper
updateMasterNodes();
-
- // worker group nodes from zookeeper
- Collection workerGroups = registryClient.getWorkerGroupDirectly();
- for (String workerGroup : workerGroups) {
- syncWorkerGroupNodesFromRegistry(workerGroup,
- registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD);
- }
+ updateWorkerNodes();
+ updateWorkerGroupMappings();
}
/**
@@ -197,42 +170,19 @@ public class ServerNodeManager implements InitializingBean {
@Override
public void run() {
try {
- dbWorkerGroupNodes.clear();
// sync worker node info
- Map registryWorkerNodeMap = registryClient
- .getServerMaps(NodeType.WORKER, true);
- syncAllWorkerNodeInfo(registryWorkerNodeMap);
- // sync worker group nodes from database
- List workerGroupList = workerGroupMapper.queryAllWorkerGroup();
- if (CollectionUtils.isNotEmpty(workerGroupList)) {
- for (WorkerGroup wg : workerGroupList) {
- String workerGroupName = wg.getName();
- Set workerAddress = getWorkerAddressByWorkerGroup(
- registryWorkerNodeMap, wg);
- if (!workerAddress.isEmpty()) {
- Set workerNodes = dbWorkerGroupNodes
- .getOrDefault(workerGroupName, new HashSet<>());
- workerNodes.clear();
- workerNodes.addAll(workerAddress);
- dbWorkerGroupNodes.put(workerGroupName, workerNodes);
- }
- }
- }
+ updateWorkerNodes();
+ updateWorkerGroupMappings();
+ notifyWorkerInfoChangeListeners();
} catch (Exception e) {
logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
- } finally {
- refreshWorkerGroupNodes();
}
}
}
-<<<<<<< HEAD
protected Set getWorkerAddressByWorkerGroup(Map newWorkerNodeInfo,
WorkerGroup wg) {
-=======
- protected Set getWorkerAddressByWorkerGroup(Map newWorkerNodeInfo, WorkerGroup wg) {
->>>>>>> 2e61c76c2 ([Improvement] Add remote task model (#11767))
Set nodes = new HashSet<>();
String[] addrs = wg.getAddrList().split(Constants.COMMA);
for (String addr : addrs) {
@@ -256,30 +206,18 @@ public class ServerNodeManager implements InitializingBean {
if (registryClient.isWorkerPath(path)) {
try {
String[] parts = path.split("/");
- if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
- throw new IllegalArgumentException(
- String.format("worker group path : %s is not valid, ignore", path));
- }
- final String workerGroupName = parts[parts.length - 2];
final String workerAddress = parts[parts.length - 1];
+ // todo: update workerNodeInfo
logger.debug("received subscribe event : {}", event);
- Collection currentNodes = registryClient
- .getWorkerGroupNodesDirectly(workerGroupName);
- syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type);
-
if (type == Type.ADD) {
- logger.info("worker group node : {} added, currentNodes : {}", path,
- currentNodes);
+ logger.info("Worker: {} added, currentNode : {}", path, workerAddress);
} else if (type == Type.REMOVE) {
- logger.info("worker group node : {} down.", path);
+ logger.info("Worker node : {} down.", path);
alertDao.sendServerStoppedAlert(1, path, "WORKER");
} else if (type == Type.UPDATE) {
- syncSingleWorkerNodeInfo(workerAddress,
- JSONUtils.parseObject(data, WorkerHeartBeat.class));
+ syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
}
- } catch (IllegalArgumentException ex) {
- logger.warn(ex.getMessage());
} catch (Exception ex) {
logger.error("WorkerGroupListener capture data change and get data failed", ex);
}
@@ -330,6 +268,50 @@ public class ServerNodeManager implements InitializingBean {
}
+ private void updateWorkerNodes() {
+ workerGroupWriteLock.lock();
+ try {
+ Map workerNodeMaps = registryClient.getServerMaps(NodeType.WORKER);
+ for (Map.Entry entry : workerNodeMaps.entrySet()) {
+ workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
+ }
+ } finally {
+ workerGroupWriteLock.unlock();
+ }
+ }
+
+ private void updateWorkerGroupMappings() {
+ List workerGroups = workerGroupMapper.queryAllWorkerGroup();
+ Map> tmpWorkerGroupMappings = new HashMap<>();
+ try {
+ workerNodeInfoReadLock.lock();
+ for (WorkerGroup workerGroup : workerGroups) {
+ String workerGroupName = workerGroup.getName();
+ String[] workerAddresses = workerGroup.getAddrList().split(Constants.COMMA);
+ if (ArrayUtils.isEmpty(workerAddresses)) {
+ continue;
+ }
+ Set activeWorkerNodes = Arrays.stream(workerAddresses)
+ .filter(workerNodeInfo::containsKey).collect(Collectors.toSet());
+ tmpWorkerGroupMappings.put(workerGroupName, activeWorkerNodes);
+ }
+ if (!tmpWorkerGroupMappings.containsKey(Constants.DEFAULT_WORKER_GROUP)) {
+ tmpWorkerGroupMappings.put(Constants.DEFAULT_WORKER_GROUP, workerNodeInfo.keySet());
+ }
+ } finally {
+ workerNodeInfoReadLock.unlock();
+ }
+
+ workerGroupWriteLock.lock();
+ try {
+ workerGroupNodes.clear();
+ workerGroupNodes.putAll(tmpWorkerGroupMappings);
+ notifyWorkerInfoChangeListeners();
+ } finally {
+ workerGroupWriteLock.unlock();
+ }
+ }
+
/**
* sync master nodes
*
@@ -349,63 +331,13 @@ public class ServerNodeManager implements InitializingBean {
logger.warn("current addr:{} is not in active master list",
masterConfig.getMasterAddress());
}
-<<<<<<< HEAD
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE,
MASTER_SLOT, masterConfig.getMasterAddress());
-=======
- logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT,
- masterConfig.getMasterAddress());
->>>>>>> 2e61c76c2 ([Improvement] Add remote task model (#11767))
} finally {
masterLock.unlock();
}
}
- /**
- * sync worker group nodes from registry center
- *
- * @param workerGroup worker group
- * @param nodes worker nodes
- * @param type event type
- */
- private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection nodes,
- Type type) {
- try {
- if (type == Type.REMOVE) {
- if (!registryWorkerGroupNodes.containsKey(workerGroup)) {
- logger.warn("cannot remove worker group {}, not in active list", workerGroup);
- return;
- }
- registryWorkerGroupNodes.remove(workerGroup);
- } else {
- Set workerNodes = registryWorkerGroupNodes
- .getOrDefault(workerGroup, new HashSet<>());
- workerNodes.clear();
- workerNodes.addAll(nodes);
- registryWorkerGroupNodes.put(workerGroup, workerNodes);
- }
- } finally {
- refreshWorkerGroupNodes();
- }
- }
-
- /**
- * refresh worker group nodes
- */
- private void refreshWorkerGroupNodes() {
- workerGroupWriteLock.lock();
- try {
- workerGroupNodes.clear();
- workerGroupNodes.putAll(registryWorkerGroupNodes);
- workerGroupNodes.putAll(dbWorkerGroupNodes);
- logger.debug("refresh worker group nodes, current list: {}",
- Arrays.toString(workerGroupNodes.keySet().toArray()));
- } finally {
- notifyWorkerInfoChangeListeners();
- workerGroupWriteLock.unlock();
- }
- }
-
public Map> getWorkerGroupNodes() {
workerGroupReadLock.lock();
try {
@@ -466,8 +398,7 @@ public class ServerNodeManager implements InitializingBean {
try {
workerNodeInfo.clear();
for (Map.Entry entry : newWorkerNodeInfo.entrySet()) {
- workerNodeInfo.put(entry.getKey(),
- JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
+ workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
}
} finally {
workerNodeInfoWriteLock.unlock();
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
index 508ba293aa..2a5b2203f2 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
@@ -17,9 +17,12 @@
package org.apache.dolphinscheduler.service.registry;
-import com.google.common.base.Strings;
-import lombok.NonNull;
-import org.apache.commons.lang3.StringUtils;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.dolphinscheduler.common.Constants.COLON;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
@@ -31,28 +34,29 @@ import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
-import javax.annotation.PostConstruct;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.dolphinscheduler.common.Constants.COLON;
-import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
+import javax.annotation.PostConstruct;
+
+import lombok.NonNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.google.common.base.Strings;
@Component
public class RegistryClient {
@@ -91,7 +95,7 @@ public class RegistryClient {
}
public List getServerList(NodeType nodeType) {
- Map serverMaps = getServerMaps(nodeType, false);
+ Map serverMaps = getServerMaps(nodeType);
String parentPath = rootNodePath(nodeType);
List serverList = new ArrayList<>();
@@ -123,26 +127,24 @@ public class RegistryClient {
server.setZkDirectory(parentPath + "/" + serverPath);
// set host and port
String[] hostAndPort = serverPath.split(COLON);
- String[] hosts = hostAndPort[0].split(DIVISION_STRING);
// fetch the last one
- server.setHost(hosts[hosts.length - 1]);
+ server.setHost(hostAndPort[0]);
server.setPort(Integer.parseInt(hostAndPort[1]));
serverList.add(server);
}
return serverList;
}
- public Map getServerMaps(NodeType nodeType, boolean hostOnly) {
+ /**
+ * Return server host:port -> value
+ */
+ public Map getServerMaps(NodeType nodeType) {
Map serverMap = new HashMap<>();
try {
String path = rootNodePath(nodeType);
Collection serverList = getServerNodes(nodeType);
for (String server : serverList) {
- String host = server;
- if (nodeType == NodeType.WORKER && hostOnly) {
- host = server.split(SINGLE_SLASH)[1];
- }
- serverMap.putIfAbsent(host, get(path + SINGLE_SLASH + server));
+ serverMap.putIfAbsent(server, get(path + SINGLE_SLASH + server));
}
} catch (Exception e) {
logger.error("get server list failed", e);
@@ -152,7 +154,7 @@ public class RegistryClient {
}
public boolean checkNodeExists(String host, NodeType nodeType) {
- return getServerMaps(nodeType, true).keySet()
+ return getServerMaps(nodeType).keySet()
.stream()
.anyMatch(it -> it.contains(host));
}
@@ -161,14 +163,6 @@ public class RegistryClient {
return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
}
- public Collection getWorkerGroupDirectly() {
- return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
- }
-
- public Collection getWorkerGroupNodesDirectly(String workerGroup) {
- return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup);
- }
-
/**
* get host ip:port, path format: parentPath/ip:port
*
@@ -241,14 +235,9 @@ public class RegistryClient {
return registry.children(key);
}
- public Set getServerNodeSet(NodeType nodeType, boolean hostOnly) {
+ public Set getServerNodeSet(NodeType nodeType) {
try {
- return getServerNodes(nodeType).stream().map(server -> {
- if (nodeType == NodeType.WORKER && hostOnly) {
- return server.split(SINGLE_SLASH)[1];
- }
- return server;
- }).collect(Collectors.toSet());
+ return new HashSet<>(getServerNodes(nodeType));
} catch (Exception e) {
throw new RegistryException("Failed to get server node: " + nodeType, e);
}
@@ -272,13 +261,7 @@ public class RegistryClient {
private Collection getServerNodes(NodeType nodeType) {
final String path = rootNodePath(nodeType);
- final Collection serverList = getChildrenKeys(path);
- if (nodeType != NodeType.WORKER) {
- return serverList;
- }
- return serverList.stream().flatMap(group -> getChildrenKeys(path + SINGLE_SLASH + group)
- .stream()
- .map(it -> group + SINGLE_SLASH + it)).collect(Collectors.toList());
+ return getChildrenKeys(path);
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
index a7e6927415..96f00a1f30 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
@@ -17,11 +17,15 @@
package org.apache.dolphinscheduler.server.worker.config;
-import com.google.common.collect.Sets;
-import lombok.Data;
-import org.apache.commons.collections4.CollectionUtils;
+import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
+
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
+
+import java.time.Duration;
+
+import lombok.Data;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
@@ -30,12 +34,6 @@ import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
import org.springframework.validation.annotation.Validated;
-import java.time.Duration;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
-
@Data
@Validated
@Configuration
@@ -52,7 +50,6 @@ public class WorkerConfig implements Validator {
private boolean tenantDistributedUser = false;
private int maxCpuLoadAvg = -1;
private double reservedMemory = 0.3;
- private Set groups = Sets.newHashSet("default");
private String alertListenHost = "localhost";
private int alertListenPort = 50052;
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
@@ -61,7 +58,7 @@ public class WorkerConfig implements Validator {
* This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
*/
private String workerAddress;
- private Set workerGroupRegistryPaths;
+ private String workerRegistryPath;
@Override
public boolean supports(Class> clazz) {
@@ -82,17 +79,7 @@ public class WorkerConfig implements Validator {
}
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
- workerConfig.setGroups(workerConfig.getGroups().stream().map(String::trim).collect(Collectors.toSet()));
- if (CollectionUtils.isEmpty(workerConfig.getGroups())) {
- errors.rejectValue("groups", null, "should not be empty");
- }
-
- Set workerRegistryPaths = workerConfig.getGroups()
- .stream()
- .map(workerGroup -> REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup + "/" + workerConfig.getWorkerAddress())
- .collect(Collectors.toSet());
-
- workerConfig.setWorkerGroupRegistryPaths(workerRegistryPaths);
+ workerConfig.setWorkerRegistryPath(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerConfig.getWorkerAddress());
printConfig();
}
@@ -105,11 +92,10 @@ public class WorkerConfig implements Validator {
logger.info("Worker config: tenantDistributedUser -> {}", tenantDistributedUser);
logger.info("Worker config: maxCpuLoadAvg -> {}", maxCpuLoadAvg);
logger.info("Worker config: reservedMemory -> {}", reservedMemory);
- logger.info("Worker config: groups -> {}", groups);
logger.info("Worker config: alertListenHost -> {}", alertListenHost);
logger.info("Worker config: alertListenPort -> {}", alertListenPort);
logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy);
logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy);
- logger.info("Worker config: workerGroupRegistryPaths: {}", workerGroupRegistryPaths);
+ logger.info("Worker config: workerRegistryPath: {}", workerRegistryPath);
}
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
index d2147d1559..169db04a82 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
@@ -17,26 +17,28 @@
package org.apache.dolphinscheduler.server.worker.registry;
-import lombok.extern.slf4j.Slf4j;
+import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
-import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import javax.annotation.PostConstruct;
import java.io.IOException;
-import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
+import javax.annotation.PostConstruct;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
@Slf4j
@Service
@@ -56,7 +58,6 @@ public class WorkerRegistryClient implements AutoCloseable {
private WorkerHeartBeatTask workerHeartBeatTask;
-
@PostConstruct
public void initWorkRegistry() {
this.workerHeartBeatTask = new WorkerHeartBeatTask(
@@ -80,22 +81,19 @@ public class WorkerRegistryClient implements AutoCloseable {
*/
private void registry() {
WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat();
+ String workerZKPath = workerConfig.getWorkerRegistryPath();
+ // remove before persist
+ registryClient.remove(workerZKPath);
+ registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
+ log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
- for (String workerZKPath : workerConfig.getWorkerGroupRegistryPaths()) {
- // remove before persist
- registryClient.remove(workerZKPath);
- registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
- log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
- }
-
- while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER)) {
+ while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), NodeType.WORKER)) {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
// sleep 1s, waiting master failover remove
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
-
workerHeartBeatTask.start();
log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress());
}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
index d2a9ca008c..bf81bbb945 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
@@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task;
-import lombok.NonNull;
-import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
@@ -30,6 +28,9 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.function.Supplier;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+
@Slf4j
public class WorkerHeartBeatTask extends BaseHeartBeatTask {
@@ -59,7 +60,8 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask {
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
int execThreads = workerConfig.getExecThreads();
int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
- int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory, execThreads, workerWaitingTaskCount);
+ int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory,
+ execThreads, workerWaitingTaskCount);
return WorkerHeartBeat.builder()
.startupTime(ServerLifeCycleManager.getServerStartupTime())
@@ -82,11 +84,11 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask {
@Override
public void writeHeartBeat(WorkerHeartBeat workerHeartBeat) {
String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat);
- for (String workerGroupRegistryPath : workerConfig.getWorkerGroupRegistryPaths()) {
- registryClient.persistEphemeral(workerGroupRegistryPath, workerHeartBeatJson);
- }
- log.info("Success write worker group heartBeatInfo into registry, workGroupPath: {} workerHeartBeatInfo: {}",
- workerConfig.getWorkerGroupRegistryPaths(), workerHeartBeatJson);
+ String workerRegistryPath = workerConfig.getWorkerRegistryPath();
+ registryClient.persistEphemeral(workerRegistryPath, workerHeartBeatJson);
+ log.info(
+ "Success write worker group heartBeatInfo into registry, workerRegistryPath: {} workerHeartBeatInfo: {}",
+ workerRegistryPath, workerHeartBeatJson);
}
public int getServerStatus(double loadAverage,
@@ -96,11 +98,13 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask {
int workerExecThreadCount,
int workerWaitingTaskCount) {
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
- log.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
+ log.warn(
+ "current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
return Constants.ABNORMAL_NODE_STATUS;
} else if (workerWaitingTaskCount > workerExecThreadCount) {
- log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount);
+ log.warn("current waiting task count {} is large than worker thread count {}, worker is busy",
+ workerWaitingTaskCount, workerExecThreadCount);
return Constants.BUSY_NODE_STATUE;
} else {
return Constants.NORMAL_NODE_STATUS;
diff --git a/dolphinscheduler-worker/src/main/resources/application.yaml b/dolphinscheduler-worker/src/main/resources/application.yaml
index 4effad23b1..d64f4fde2c 100644
--- a/dolphinscheduler-worker/src/main/resources/application.yaml
+++ b/dolphinscheduler-worker/src/main/resources/application.yaml
@@ -68,13 +68,6 @@ worker:
max-cpu-load-avg: -1
# worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
reserved-memory: 0.3
- # for multiple worker groups, use hyphen before group name, e.g.
- # groups:
- # - default
- # - group1
- # - group2
- groups:
- - default
# alert server listen host
alert-listen-host: localhost
alert-listen-port: 50052
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
index 1d8e7ea516..f39b8a9ad7 100644
--- a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.mockito.BDDMockito.given;
import org.apache.dolphinscheduler.common.enums.NodeType;
+import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@@ -27,15 +28,14 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +44,7 @@ import com.google.common.collect.Sets;
/**
* worker registry test
*/
-@RunWith(MockitoJUnitRunner.Silent.class)
+@ExtendWith(MockitoExtension.class)
public class WorkerRegistryClientTest {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
@@ -78,28 +78,17 @@ public class WorkerRegistryClientTest {
// workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
}
- @Before
- public void before() {
- given(workerConfig.getGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
- // given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
- // scheduleAtFixedRate
- given(heartBeatExecutor.scheduleAtFixedRate(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(),
- Mockito.any(TimeUnit.class))).willReturn(null);
-
- }
-
@Test
public void testStart() {
- workerRegistryClient.initWorkRegistry();
-
- given(workerManagerThread.getThreadPoolQueueSize()).willReturn(1);
-
- given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true);
+ given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234));
given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
+ given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true);
+ workerRegistryClient.initWorkRegistry();
workerRegistryClient.start();
+ Assertions.assertTrue(true);
}
@Test