Browse Source

cherry-pick [Improvement][WorkerGroup]Remove workerGroup in registry #12217

3.1.1-release
Wenjun Ruan 2 years ago committed by zhuangchong
parent
commit
a17d0cc5d2
  1. 1
      docs/docs/en/architecture/configuration.md
  2. 1
      docs/docs/zh/architecture/configuration.md
  3. 93
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  4. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java
  5. 187
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java
  6. 75
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java
  7. 34
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java
  8. 22
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java
  9. 24
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java
  10. 7
      dolphinscheduler-worker/src/main/resources/application.yaml
  11. 31
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

1
docs/docs/en/architecture/configuration.md

@ -291,7 +291,6 @@ Location: `worker-server/conf/application.yaml`
|worker.tenant-auto-create|true|tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.|
|worker.max-cpu-load-avg|-1|worker max CPU load avg, only higher than the system CPU load average, worker server can be dispatched tasks. default value -1: the number of CPU cores * 2|
|worker.reserved-memory|0.3|worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G|
|worker.groups|default|worker groups separated by comma, e.g., 'worker.groups=default,test' <br> worker will join corresponding group according to this config when startup|
|worker.alert-listen-host|localhost|the alert listen host of worker|
|worker.alert-listen-port|50052|the alert listen port of worker|
|worker.registry-disconnect-strategy.strategy|stop|Used when the worker disconnect from registry, default value: stop. Optional values include stop, waiting|

1
docs/docs/zh/architecture/configuration.md

@ -285,7 +285,6 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置
|worker.tenant-auto-create|true|租户对应于系统的用户,由worker提交作业.如果系统没有该用户,则在参数worker.tenant.auto.create为true后自动创建。|
|worker.max-cpu-load-avg|-1|worker最大cpuload均值,只有高于系统cpuload均值时,worker服务才能被派发任务. 默认值为-1: cpu cores * 2|
|worker.reserved-memory|0.3|worker预留内存,只有低于系统可用内存时,worker服务才能被派发任务,单位为G|
|worker.groups|default|worker分组配置,逗号分隔,例如'worker.groups=default,test' <br> worker启动时会根据该配置自动加入对应的分组|
|worker.alert-listen-host|localhost|alert监听host|
|worker.alert-listen-port|50052|alert监听端口|
|worker.registry-disconnect-strategy.strategy|stop|当Worker与注册中心失联之后采取的策略, 默认值是: stop. 可选值包括: stop, waiting|

93
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -28,8 +28,6 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
@ -41,13 +39,13 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -174,10 +172,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
* @return boolean
*/
private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
Map<String, String> serverMaps = registryClient.getServerMaps(NodeType.WORKER, true);
if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
return null;
}
Map<String, String> serverMaps = registryClient.getServerMaps(NodeType.WORKER);
for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) {
if (!serverMaps.containsKey(addr)) {
return addr;
@ -205,11 +203,11 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
Result result = new Result();
List<WorkerGroup> workerGroups;
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
workerGroups = getWorkerGroups(true, null);
workerGroups = getWorkerGroups(null);
} else {
Set<Integer> ids = resourcePermissionCheckService
.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP, loginUser.getId(), logger);
workerGroups = getWorkerGroups(true, ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
workerGroups = getWorkerGroups(ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
}
List<WorkerGroup> resultDataList = new ArrayList<>();
int total = 0;
@ -255,20 +253,15 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
Map<String, Object> result = new HashMap<>();
List<WorkerGroup> workerGroups;
if (loginUser.getUserType().equals(UserType.ADMIN_USER)) {
workerGroups = getWorkerGroups(false, null);
workerGroups = getWorkerGroups(null);
} else {
Set<Integer> ids = resourcePermissionCheckService
.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP, loginUser.getId(), logger);
workerGroups = getWorkerGroups(false, ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
workerGroups = getWorkerGroups(ids.isEmpty() ? Collections.emptyList() : new ArrayList<>(ids));
}
List<String> availableWorkerGroupList = workerGroups.stream()
.map(WorkerGroup::getName)
.collect(Collectors.toList());
int index = availableWorkerGroupList.indexOf(Constants.DEFAULT_WORKER_GROUP);
if (index > -1) {
availableWorkerGroupList.remove(index);
availableWorkerGroupList.add(0, Constants.DEFAULT_WORKER_GROUP);
}
result.put(Constants.DATA_LIST, availableWorkerGroupList);
putMsg(result, Status.SUCCESS);
return result;
@ -277,10 +270,9 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
/**
* get worker groups
*
* @param isPaging whether paging
* @return WorkerGroup list
*/
private List<WorkerGroup> getWorkerGroups(boolean isPaging, List<Integer> ids) {
private List<WorkerGroup> getWorkerGroups(List<Integer> ids) {
// worker groups from database
List<WorkerGroup> workerGroups;
if (ids != null) {
@ -289,62 +281,22 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
workerGroups = workerGroupMapper.queryAllWorkerGroup();
}
// worker groups from zookeeper
String workerPath = Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
Collection<String> workerGroupList = null;
try {
workerGroupList = registryClient.getChildrenKeys(workerPath);
} catch (Exception e) {
logger.error("getWorkerGroups exception, workerPath: {}, isPaging: {}", workerPath, isPaging, e);
Optional<Boolean> containDefaultWorkerGroups = workerGroups.stream()
.map(workerGroup -> Constants.DEFAULT_WORKER_GROUP.equals(workerGroup.getName())).findAny();
if (!containDefaultWorkerGroups.isPresent() || !containDefaultWorkerGroups.get()) {
// there doesn't exist a default WorkerGroup, we will add all worker to the default worker group.
Set<String> activeWorkerNodes = registryClient.getServerNodeSet(NodeType.WORKER);
WorkerGroup defaultWorkerGroup = new WorkerGroup();
defaultWorkerGroup.setName(Constants.DEFAULT_WORKER_GROUP);
defaultWorkerGroup.setAddrList(String.join(Constants.COMMA, activeWorkerNodes));
defaultWorkerGroup.setCreateTime(new Date());
defaultWorkerGroup.setUpdateTime(new Date());
defaultWorkerGroup.setSystemDefault(true);
workerGroups.add(defaultWorkerGroup);
}
if (CollectionUtils.isEmpty(workerGroupList)) {
if (CollectionUtils.isEmpty(workerGroups) && !isPaging) {
WorkerGroup wg = new WorkerGroup();
wg.setName(Constants.DEFAULT_WORKER_GROUP);
workerGroups.add(wg);
}
return workerGroups;
}
Map<String, WorkerGroup> workerGroupsMap = null;
if (workerGroups.size() != 0) {
workerGroupsMap = workerGroups.stream().collect(Collectors.toMap(WorkerGroup::getName,
workerGroupItem -> workerGroupItem, (oldWorkerGroup, newWorkerGroup) -> oldWorkerGroup));
}
for (String workerGroup : workerGroupList) {
String workerGroupPath = workerPath + Constants.SINGLE_SLASH + workerGroup;
Collection<String> childrenNodes = null;
try {
childrenNodes = registryClient.getChildrenKeys(workerGroupPath);
} catch (Exception e) {
logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath);
}
if (childrenNodes == null || childrenNodes.isEmpty()) {
continue;
}
WorkerGroup wg = new WorkerGroup();
handleAddrList(wg, workerGroup, childrenNodes);
wg.setName(workerGroup);
if (isPaging) {
String registeredValue =
registryClient.get(workerGroupPath + Constants.SINGLE_SLASH + childrenNodes.iterator().next());
WorkerHeartBeat workerHeartBeat = JSONUtils.parseObject(registeredValue, WorkerHeartBeat.class);
wg.setCreateTime(new Date(workerHeartBeat.getStartupTime()));
wg.setUpdateTime(new Date(workerHeartBeat.getReportTime()));
wg.setSystemDefault(true);
if (workerGroupsMap != null && workerGroupsMap.containsKey(workerGroup)) {
wg.setDescription(workerGroupsMap.get(workerGroup).getDescription());
workerGroups.remove(workerGroupsMap.get(workerGroup));
}
}
workerGroups.add(wg);
}
return workerGroups;
}
protected void handleAddrList(WorkerGroup wg, String workerGroup, Collection<String> childrenNodes) {
wg.setAddrList(String.join(Constants.COMMA, childrenNodes));
}
/**
* delete worker group by id
@ -368,6 +320,11 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
List<ProcessInstance> processInstances = processInstanceMapper
.queryByWorkerGroupNameAndStatus(workerGroup.getName(), Constants.NOT_TERMINATED_STATES);
if (CollectionUtils.isNotEmpty(processInstances)) {
List<Integer> processInstanceIds =
processInstances.stream().map(ProcessInstance::getId).collect(Collectors.toList());
logger.warn(
"Delete worker group failed because there are {} processInstances are using it, processInstanceIds:{}.",
processInstances.size(), processInstanceIds);
putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size());
return result;
}
@ -385,7 +342,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Override
public Map<String, Object> getWorkerAddressList() {
Map<String, Object> result = new HashMap<>();
Set<String> serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER, true);
Set<String> serverNodeList = registryClient.getServerNodeSet(NodeType.WORKER);
result.put(Constants.DATA_LIST, serverNodeList);
putMsg(result, Status.SUCCESS);
return result;

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/WorkerGroupControllerTest.java

@ -67,7 +67,7 @@ public class WorkerGroupControllerTest extends AbstractControllerTest {
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("192.168.0.1", "192.168.0.1");
serverMaps.put("192.168.0.2", "192.168.0.2");
PowerMockito.when(registryClient.getServerMaps(NodeType.WORKER, true)).thenReturn(serverMaps);
Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
paramsMap.add("name","cxc_work_group");

187
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/registry/ServerNodeManager.java

@ -17,10 +17,10 @@
package org.apache.dolphinscheduler.server.master.registry;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.constants.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.service.queue.MasterPriorityQueue;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
@ -55,6 +56,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
@ -87,16 +89,6 @@ public class ServerNodeManager implements InitializingBean {
*/
private final ConcurrentHashMap<String, Set<String>> workerGroupNodes = new ConcurrentHashMap<>();
/**
* worker group nodes from registry center, workerGroup -> ips
*/
private final ConcurrentHashMap<String, Set<String>> registryWorkerGroupNodes = new ConcurrentHashMap<>();
/**
* worker group nodes from db, workerGroup -> ips
*/
private final ConcurrentHashMap<String, Set<String>> dbWorkerGroupNodes = new ConcurrentHashMap<>();
/**
* master nodes
*/
@ -112,28 +104,14 @@ public class ServerNodeManager implements InitializingBean {
@Autowired
private RegistryClient registryClient;
/**
* eg : /dolphinscheduler/node/worker/group/127.0.0.1:xxx
*/
private static final int WORKER_LISTENER_CHECK_LENGTH = 5;
/**
* worker group mapper
*/
@Autowired
private WorkerGroupMapper workerGroupMapper;
private final MasterPriorityQueue masterPriorityQueue = new MasterPriorityQueue();
/**
* alert dao
*/
@Autowired
private AlertDao alertDao;
/**
* master config
*/
@Autowired
private MasterConfig masterConfig;
@ -180,13 +158,8 @@ public class ServerNodeManager implements InitializingBean {
public void load() {
// master nodes from zookeeper
updateMasterNodes();
// worker group nodes from zookeeper
Collection<String> workerGroups = registryClient.getWorkerGroupDirectly();
for (String workerGroup : workerGroups) {
syncWorkerGroupNodesFromRegistry(workerGroup,
registryClient.getWorkerGroupNodesDirectly(workerGroup), Type.ADD);
}
updateWorkerNodes();
updateWorkerGroupMappings();
}
/**
@ -197,42 +170,19 @@ public class ServerNodeManager implements InitializingBean {
@Override
public void run() {
try {
dbWorkerGroupNodes.clear();
// sync worker node info
Map<String, String> registryWorkerNodeMap = registryClient
.getServerMaps(NodeType.WORKER, true);
syncAllWorkerNodeInfo(registryWorkerNodeMap);
// sync worker group nodes from database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryAllWorkerGroup();
if (CollectionUtils.isNotEmpty(workerGroupList)) {
for (WorkerGroup wg : workerGroupList) {
String workerGroupName = wg.getName();
Set<String> workerAddress = getWorkerAddressByWorkerGroup(
registryWorkerNodeMap, wg);
if (!workerAddress.isEmpty()) {
Set<String> workerNodes = dbWorkerGroupNodes
.getOrDefault(workerGroupName, new HashSet<>());
workerNodes.clear();
workerNodes.addAll(workerAddress);
dbWorkerGroupNodes.put(workerGroupName, workerNodes);
}
}
}
updateWorkerNodes();
updateWorkerGroupMappings();
notifyWorkerInfoChangeListeners();
} catch (Exception e) {
logger.error("WorkerNodeInfoAndGroupDbSyncTask error:", e);
} finally {
refreshWorkerGroupNodes();
}
}
}
<<<<<<< HEAD
protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo,
WorkerGroup wg) {
=======
protected Set<String> getWorkerAddressByWorkerGroup(Map<String, String> newWorkerNodeInfo, WorkerGroup wg) {
>>>>>>> 2e61c76c2 ([Improvement] Add remote task model (#11767))
Set<String> nodes = new HashSet<>();
String[] addrs = wg.getAddrList().split(Constants.COMMA);
for (String addr : addrs) {
@ -256,30 +206,18 @@ public class ServerNodeManager implements InitializingBean {
if (registryClient.isWorkerPath(path)) {
try {
String[] parts = path.split("/");
if (parts.length < WORKER_LISTENER_CHECK_LENGTH) {
throw new IllegalArgumentException(
String.format("worker group path : %s is not valid, ignore", path));
}
final String workerGroupName = parts[parts.length - 2];
final String workerAddress = parts[parts.length - 1];
// todo: update workerNodeInfo
logger.debug("received subscribe event : {}", event);
Collection<String> currentNodes = registryClient
.getWorkerGroupNodesDirectly(workerGroupName);
syncWorkerGroupNodesFromRegistry(workerGroupName, currentNodes, type);
if (type == Type.ADD) {
logger.info("worker group node : {} added, currentNodes : {}", path,
currentNodes);
logger.info("Worker: {} added, currentNode : {}", path, workerAddress);
} else if (type == Type.REMOVE) {
logger.info("worker group node : {} down.", path);
logger.info("Worker node : {} down.", path);
alertDao.sendServerStoppedAlert(1, path, "WORKER");
} else if (type == Type.UPDATE) {
syncSingleWorkerNodeInfo(workerAddress,
JSONUtils.parseObject(data, WorkerHeartBeat.class));
syncSingleWorkerNodeInfo(workerAddress, JSONUtils.parseObject(data, WorkerHeartBeat.class));
}
} catch (IllegalArgumentException ex) {
logger.warn(ex.getMessage());
} catch (Exception ex) {
logger.error("WorkerGroupListener capture data change and get data failed", ex);
}
@ -330,6 +268,50 @@ public class ServerNodeManager implements InitializingBean {
}
private void updateWorkerNodes() {
workerGroupWriteLock.lock();
try {
Map<String, String> workerNodeMaps = registryClient.getServerMaps(NodeType.WORKER);
for (Map.Entry<String, String> entry : workerNodeMaps.entrySet()) {
workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
}
} finally {
workerGroupWriteLock.unlock();
}
}
private void updateWorkerGroupMappings() {
List<WorkerGroup> workerGroups = workerGroupMapper.queryAllWorkerGroup();
Map<String, Set<String>> tmpWorkerGroupMappings = new HashMap<>();
try {
workerNodeInfoReadLock.lock();
for (WorkerGroup workerGroup : workerGroups) {
String workerGroupName = workerGroup.getName();
String[] workerAddresses = workerGroup.getAddrList().split(Constants.COMMA);
if (ArrayUtils.isEmpty(workerAddresses)) {
continue;
}
Set<String> activeWorkerNodes = Arrays.stream(workerAddresses)
.filter(workerNodeInfo::containsKey).collect(Collectors.toSet());
tmpWorkerGroupMappings.put(workerGroupName, activeWorkerNodes);
}
if (!tmpWorkerGroupMappings.containsKey(Constants.DEFAULT_WORKER_GROUP)) {
tmpWorkerGroupMappings.put(Constants.DEFAULT_WORKER_GROUP, workerNodeInfo.keySet());
}
} finally {
workerNodeInfoReadLock.unlock();
}
workerGroupWriteLock.lock();
try {
workerGroupNodes.clear();
workerGroupNodes.putAll(tmpWorkerGroupMappings);
notifyWorkerInfoChangeListeners();
} finally {
workerGroupWriteLock.unlock();
}
}
/**
* sync master nodes
*
@ -349,63 +331,13 @@ public class ServerNodeManager implements InitializingBean {
logger.warn("current addr:{} is not in active master list",
masterConfig.getMasterAddress());
}
<<<<<<< HEAD
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE,
MASTER_SLOT, masterConfig.getMasterAddress());
=======
logger.info("update master nodes, master size: {}, slot: {}, addr: {}", MASTER_SIZE, MASTER_SLOT,
masterConfig.getMasterAddress());
>>>>>>> 2e61c76c2 ([Improvement] Add remote task model (#11767))
} finally {
masterLock.unlock();
}
}
/**
* sync worker group nodes from registry center
*
* @param workerGroup worker group
* @param nodes worker nodes
* @param type event type
*/
private void syncWorkerGroupNodesFromRegistry(String workerGroup, Collection<String> nodes,
Type type) {
try {
if (type == Type.REMOVE) {
if (!registryWorkerGroupNodes.containsKey(workerGroup)) {
logger.warn("cannot remove worker group {}, not in active list", workerGroup);
return;
}
registryWorkerGroupNodes.remove(workerGroup);
} else {
Set<String> workerNodes = registryWorkerGroupNodes
.getOrDefault(workerGroup, new HashSet<>());
workerNodes.clear();
workerNodes.addAll(nodes);
registryWorkerGroupNodes.put(workerGroup, workerNodes);
}
} finally {
refreshWorkerGroupNodes();
}
}
/**
* refresh worker group nodes
*/
private void refreshWorkerGroupNodes() {
workerGroupWriteLock.lock();
try {
workerGroupNodes.clear();
workerGroupNodes.putAll(registryWorkerGroupNodes);
workerGroupNodes.putAll(dbWorkerGroupNodes);
logger.debug("refresh worker group nodes, current list: {}",
Arrays.toString(workerGroupNodes.keySet().toArray()));
} finally {
notifyWorkerInfoChangeListeners();
workerGroupWriteLock.unlock();
}
}
public Map<String, Set<String>> getWorkerGroupNodes() {
workerGroupReadLock.lock();
try {
@ -466,8 +398,7 @@ public class ServerNodeManager implements InitializingBean {
try {
workerNodeInfo.clear();
for (Map.Entry<String, String> entry : newWorkerNodeInfo.entrySet()) {
workerNodeInfo.put(entry.getKey(),
JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
workerNodeInfo.put(entry.getKey(), JSONUtils.parseObject(entry.getValue(), WorkerHeartBeat.class));
}
} finally {
workerNodeInfoWriteLock.unlock();

75
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/registry/RegistryClient.java

@ -17,9 +17,12 @@
package org.apache.dolphinscheduler.service.registry;
import com.google.common.base.Strings;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
@ -31,28 +34,29 @@ import org.apache.dolphinscheduler.registry.api.ConnectionListener;
import org.apache.dolphinscheduler.registry.api.Registry;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.registry.api.SubscribeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.dolphinscheduler.common.Constants.COLON;
import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_MASTERS;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import javax.annotation.PostConstruct;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.google.common.base.Strings;
@Component
public class RegistryClient {
@ -91,7 +95,7 @@ public class RegistryClient {
}
public List<Server> getServerList(NodeType nodeType) {
Map<String, String> serverMaps = getServerMaps(nodeType, false);
Map<String, String> serverMaps = getServerMaps(nodeType);
String parentPath = rootNodePath(nodeType);
List<Server> serverList = new ArrayList<>();
@ -123,26 +127,24 @@ public class RegistryClient {
server.setZkDirectory(parentPath + "/" + serverPath);
// set host and port
String[] hostAndPort = serverPath.split(COLON);
String[] hosts = hostAndPort[0].split(DIVISION_STRING);
// fetch the last one
server.setHost(hosts[hosts.length - 1]);
server.setHost(hostAndPort[0]);
server.setPort(Integer.parseInt(hostAndPort[1]));
serverList.add(server);
}
return serverList;
}
public Map<String, String> getServerMaps(NodeType nodeType, boolean hostOnly) {
/**
* Return server host:port -> value
*/
public Map<String, String> getServerMaps(NodeType nodeType) {
Map<String, String> serverMap = new HashMap<>();
try {
String path = rootNodePath(nodeType);
Collection<String> serverList = getServerNodes(nodeType);
for (String server : serverList) {
String host = server;
if (nodeType == NodeType.WORKER && hostOnly) {
host = server.split(SINGLE_SLASH)[1];
}
serverMap.putIfAbsent(host, get(path + SINGLE_SLASH + server));
serverMap.putIfAbsent(server, get(path + SINGLE_SLASH + server));
}
} catch (Exception e) {
logger.error("get server list failed", e);
@ -152,7 +154,7 @@ public class RegistryClient {
}
public boolean checkNodeExists(String host, NodeType nodeType) {
return getServerMaps(nodeType, true).keySet()
return getServerMaps(nodeType).keySet()
.stream()
.anyMatch(it -> it.contains(host));
}
@ -161,14 +163,6 @@ public class RegistryClient {
return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_MASTERS);
}
public Collection<String> getWorkerGroupDirectly() {
return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS);
}
public Collection<String> getWorkerGroupNodesDirectly(String workerGroup) {
return getChildrenKeys(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup);
}
/**
* get host ip:port, path format: parentPath/ip:port
*
@ -241,14 +235,9 @@ public class RegistryClient {
return registry.children(key);
}
public Set<String> getServerNodeSet(NodeType nodeType, boolean hostOnly) {
public Set<String> getServerNodeSet(NodeType nodeType) {
try {
return getServerNodes(nodeType).stream().map(server -> {
if (nodeType == NodeType.WORKER && hostOnly) {
return server.split(SINGLE_SLASH)[1];
}
return server;
}).collect(Collectors.toSet());
return new HashSet<>(getServerNodes(nodeType));
} catch (Exception e) {
throw new RegistryException("Failed to get server node: " + nodeType, e);
}
@ -272,13 +261,7 @@ public class RegistryClient {
private Collection<String> getServerNodes(NodeType nodeType) {
final String path = rootNodePath(nodeType);
final Collection<String> serverList = getChildrenKeys(path);
if (nodeType != NodeType.WORKER) {
return serverList;
}
return serverList.stream().flatMap(group -> getChildrenKeys(path + SINGLE_SLASH + group)
.stream()
.map(it -> group + SINGLE_SLASH + it)).collect(Collectors.toList());
return getChildrenKeys(path);
}
}

34
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java

@ -17,11 +17,15 @@
package org.apache.dolphinscheduler.server.worker.config;
import com.google.common.collect.Sets;
import lombok.Data;
import org.apache.commons.collections4.CollectionUtils;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.ConnectStrategyProperties;
import java.time.Duration;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ -30,12 +34,6 @@ import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
import org.springframework.validation.annotation.Validated;
import java.time.Duration;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS;
@Data
@Validated
@Configuration
@ -52,7 +50,6 @@ public class WorkerConfig implements Validator {
private boolean tenantDistributedUser = false;
private int maxCpuLoadAvg = -1;
private double reservedMemory = 0.3;
private Set<String> groups = Sets.newHashSet("default");
private String alertListenHost = "localhost";
private int alertListenPort = 50052;
private ConnectStrategyProperties registryDisconnectStrategy = new ConnectStrategyProperties();
@ -61,7 +58,7 @@ public class WorkerConfig implements Validator {
* This field doesn't need to set at config file, it will be calculated by workerIp:listenPort
*/
private String workerAddress;
private Set<String> workerGroupRegistryPaths;
private String workerRegistryPath;
@Override
public boolean supports(Class<?> clazz) {
@ -82,17 +79,7 @@ public class WorkerConfig implements Validator {
}
workerConfig.setWorkerAddress(NetUtils.getAddr(workerConfig.getListenPort()));
workerConfig.setGroups(workerConfig.getGroups().stream().map(String::trim).collect(Collectors.toSet()));
if (CollectionUtils.isEmpty(workerConfig.getGroups())) {
errors.rejectValue("groups", null, "should not be empty");
}
Set<String> workerRegistryPaths = workerConfig.getGroups()
.stream()
.map(workerGroup -> REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerGroup + "/" + workerConfig.getWorkerAddress())
.collect(Collectors.toSet());
workerConfig.setWorkerGroupRegistryPaths(workerRegistryPaths);
workerConfig.setWorkerRegistryPath(REGISTRY_DOLPHINSCHEDULER_WORKERS + "/" + workerConfig.getWorkerAddress());
printConfig();
}
@ -105,11 +92,10 @@ public class WorkerConfig implements Validator {
logger.info("Worker config: tenantDistributedUser -> {}", tenantDistributedUser);
logger.info("Worker config: maxCpuLoadAvg -> {}", maxCpuLoadAvg);
logger.info("Worker config: reservedMemory -> {}", reservedMemory);
logger.info("Worker config: groups -> {}", groups);
logger.info("Worker config: alertListenHost -> {}", alertListenHost);
logger.info("Worker config: alertListenPort -> {}", alertListenPort);
logger.info("Worker config: registryDisconnectStrategy -> {}", registryDisconnectStrategy);
logger.info("Worker config: workerAddress -> {}", registryDisconnectStrategy);
logger.info("Worker config: workerGroupRegistryPaths: {}", workerGroupRegistryPaths);
logger.info("Worker config: workerRegistryPath: {}", workerRegistryPath);
}
}

22
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClient.java

@ -17,26 +17,28 @@
package org.apache.dolphinscheduler.server.worker.registry;
import lombok.extern.slf4j.Slf4j;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.IStoppable;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.model.WorkerHeartBeat;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.registry.api.RegistryException;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.task.WorkerHeartBeatTask;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import javax.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@ -56,7 +58,6 @@ public class WorkerRegistryClient implements AutoCloseable {
private WorkerHeartBeatTask workerHeartBeatTask;
@PostConstruct
public void initWorkRegistry() {
this.workerHeartBeatTask = new WorkerHeartBeatTask(
@ -80,22 +81,19 @@ public class WorkerRegistryClient implements AutoCloseable {
*/
private void registry() {
WorkerHeartBeat workerHeartBeat = workerHeartBeatTask.getHeartBeat();
for (String workerZKPath : workerConfig.getWorkerGroupRegistryPaths()) {
String workerZKPath = workerConfig.getWorkerRegistryPath();
// remove before persist
registryClient.remove(workerZKPath);
registryClient.persistEphemeral(workerZKPath, JSONUtils.toJsonString(workerHeartBeat));
log.info("Worker node: {} registry to ZK {} successfully", workerConfig.getWorkerAddress(), workerZKPath);
}
while (!registryClient.checkNodeExists(NetUtils.getHost(), NodeType.WORKER)) {
while (!registryClient.checkNodeExists(workerConfig.getWorkerAddress(), NodeType.WORKER)) {
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
}
// sleep 1s, waiting master failover remove
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);
workerHeartBeatTask.start();
log.info("Worker node: {} registry finished", workerConfig.getWorkerAddress());
}

24
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/task/WorkerHeartBeatTask.java

@ -17,8 +17,6 @@
package org.apache.dolphinscheduler.server.worker.task;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.model.BaseHeartBeatTask;
@ -30,6 +28,9 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.function.Supplier;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
@ -59,7 +60,8 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
int execThreads = workerConfig.getExecThreads();
int workerWaitingTaskCount = this.workerWaitingTaskCount.get();
int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory, execThreads, workerWaitingTaskCount);
int serverStatus = getServerStatus(loadAverage, maxCpuLoadAvg, availablePhysicalMemorySize, reservedMemory,
execThreads, workerWaitingTaskCount);
return WorkerHeartBeat.builder()
.startupTime(ServerLifeCycleManager.getServerStartupTime())
@ -82,11 +84,11 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
@Override
public void writeHeartBeat(WorkerHeartBeat workerHeartBeat) {
String workerHeartBeatJson = JSONUtils.toJsonString(workerHeartBeat);
for (String workerGroupRegistryPath : workerConfig.getWorkerGroupRegistryPaths()) {
registryClient.persistEphemeral(workerGroupRegistryPath, workerHeartBeatJson);
}
log.info("Success write worker group heartBeatInfo into registry, workGroupPath: {} workerHeartBeatInfo: {}",
workerConfig.getWorkerGroupRegistryPaths(), workerHeartBeatJson);
String workerRegistryPath = workerConfig.getWorkerRegistryPath();
registryClient.persistEphemeral(workerRegistryPath, workerHeartBeatJson);
log.info(
"Success write worker group heartBeatInfo into registry, workerRegistryPath: {} workerHeartBeatInfo: {}",
workerRegistryPath, workerHeartBeatJson);
}
public int getServerStatus(double loadAverage,
@ -96,11 +98,13 @@ public class WorkerHeartBeatTask extends BaseHeartBeatTask<WorkerHeartBeat> {
int workerExecThreadCount,
int workerWaitingTaskCount) {
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
log.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
log.warn(
"current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
return Constants.ABNORMAL_NODE_STATUS;
} else if (workerWaitingTaskCount > workerExecThreadCount) {
log.warn("current waiting task count {} is large than worker thread count {}, worker is busy", workerWaitingTaskCount, workerExecThreadCount);
log.warn("current waiting task count {} is large than worker thread count {}, worker is busy",
workerWaitingTaskCount, workerExecThreadCount);
return Constants.BUSY_NODE_STATUE;
} else {
return Constants.NORMAL_NODE_STATUS;

7
dolphinscheduler-worker/src/main/resources/application.yaml

@ -68,13 +68,6 @@ worker:
max-cpu-load-avg: -1
# worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
reserved-memory: 0.3
# for multiple worker groups, use hyphen before group name, e.g.
# groups:
# - default
# - group1
# - group2
groups:
- default
# alert server listen host
alert-listen-host: localhost
alert-listen-port: 50052

31
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistryClientTest.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.registry;
import static org.mockito.BDDMockito.given;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
@ -27,15 +28,14 @@ import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,7 +44,7 @@ import com.google.common.collect.Sets;
/**
* worker registry test
*/
@RunWith(MockitoJUnitRunner.Silent.class)
@ExtendWith(MockitoExtension.class)
public class WorkerRegistryClientTest {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
@ -78,28 +78,17 @@ public class WorkerRegistryClientTest {
// workerGroups = Sets.newHashSet(DEFAULT_WORKER_GROUP, TEST_WORKER_GROUP);
}
@Before
public void before() {
given(workerConfig.getGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
// given(heartBeatExecutor.getWorkerGroups()).willReturn(Sets.newHashSet("127.0.0.1"));
// scheduleAtFixedRate
given(heartBeatExecutor.scheduleAtFixedRate(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(),
Mockito.any(TimeUnit.class))).willReturn(null);
}
@Test
public void testStart() {
workerRegistryClient.initWorkRegistry();
given(workerManagerThread.getThreadPoolQueueSize()).willReturn(1);
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true);
given(workerConfig.getWorkerAddress()).willReturn(NetUtils.getAddr(1234));
given(workerConfig.getHeartbeatInterval()).willReturn(Duration.ofSeconds(1));
given(registryClient.checkNodeExists(Mockito.anyString(), Mockito.any(NodeType.class))).willReturn(true);
workerRegistryClient.initWorkRegistry();
workerRegistryClient.start();
Assertions.assertTrue(true);
}
@Test

Loading…
Cancel
Save