diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java index 70b3aecb4f..6f3f32710f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java @@ -16,27 +16,38 @@ */ package org.apache.dolphinscheduler.api.controller; +import static org.apache.dolphinscheduler.api.enums.Status.DELETE_WORKER_GROUP_FAIL; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_ADDRESS_LIST_FAIL; +import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_GROUP_FAIL; +import static org.apache.dolphinscheduler.api.enums.Status.SAVE_ERROR; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.WorkerGroupService; +import org.apache.dolphinscheduler.api.utils.RegexUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.dao.entity.User; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiImplicitParam; -import io.swagger.annotations.ApiImplicitParams; -import io.swagger.annotations.ApiOperation; + +import java.util.Map; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.*; -import springfox.documentation.annotations.ApiIgnore; - -import java.util.Map; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestAttribute; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; -import static org.apache.dolphinscheduler.api.enums.Status.*; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import springfox.documentation.annotations.ApiIgnore; /** * worker group controller @@ -51,8 +62,34 @@ public class WorkerGroupController extends BaseController { @Autowired WorkerGroupService workerGroupService; - - + /** + * create or update a worker group + * + * @param loginUser login user + * @param id worker group id + * @param name worker group name + * @param addrList addr list + * @return create or update result code + */ + @ApiOperation(value = "saveWorkerGroup", notes = "CREATE_WORKER_GROUP_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"), + @ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"), + @ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataType = "String") + }) + @PostMapping(value = "/save") + @ResponseStatus(HttpStatus.OK) + @ApiException(SAVE_ERROR) + public Result saveWorkerGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "id", required = false, defaultValue = "0") int id, + @RequestParam(value = "name") String name, + @RequestParam(value = "addrList") String addrList + ) { + logger.info("save worker group: login user {}, id:{}, name: {}, addrList: {} ", + RegexUtils.escapeNRT(loginUser.getUserName()), id, RegexUtils.escapeNRT(name), RegexUtils.escapeNRT(addrList)); + Map result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList); + return returnDataList(result); + } /** * query worker groups paging @@ -65,21 +102,20 @@ public class WorkerGroupController extends BaseController { */ @ApiOperation(value = "queryAllWorkerGroupsPaging", notes = "QUERY_WORKER_GROUP_PAGING_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"), - @ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"), - @ApiImplicitParam(name = "ipList", value = "WORKER_IP_LIST", required = true, dataType = "String") + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"), + @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20"), + @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String") }) @GetMapping(value = "/list-paging") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_WORKER_GROUP_FAIL) public Result queryAllWorkerGroupsPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam("pageNo") Integer pageNo, - @RequestParam(value = "searchVal", required = false) String searchVal, - @RequestParam("pageSize") Integer pageSize + @RequestParam("pageSize") Integer pageSize, + @RequestParam(value = "searchVal", required = false) String searchVal ) { logger.info("query all worker group paging: login user {}, pageNo:{}, pageSize:{}, searchVal:{}", - loginUser.getUserName(), pageNo, pageSize, searchVal); - + RegexUtils.escapeNRT(loginUser.getUserName()), pageNo, pageSize, searchVal); searchVal = ParameterUtils.handleEscapes(searchVal); Map result = workerGroupService.queryAllGroupPaging(loginUser, pageNo, pageSize, searchVal); return returnDataListPaging(result); @@ -95,14 +131,48 @@ public class WorkerGroupController extends BaseController { @GetMapping(value = "/all-groups") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_WORKER_GROUP_FAIL) - public Result queryAllWorkerGroups(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser - ) { - logger.info("query all worker group: login user {}", - loginUser.getUserName()); - + public Result queryAllWorkerGroups(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser) { + logger.info("query all worker group: login user {}", RegexUtils.escapeNRT(loginUser.getUserName())); Map result = workerGroupService.queryAllGroup(); return returnDataList(result); } + /** + * delete worker group by id + * + * @param loginUser login user + * @param id group id + * @return delete result code + */ + @ApiOperation(value = "deleteById", notes = "DELETE_WORKER_GROUP_BY_ID_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", required = true, dataType = "Int", example = "10"), + }) + @PostMapping(value = "/delete-by-id") + @ResponseStatus(HttpStatus.OK) + @ApiException(DELETE_WORKER_GROUP_FAIL) + public Result deleteById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("id") Integer id + ) { + logger.info("delete worker group: login user {}, id:{} ", RegexUtils.escapeNRT(loginUser.getUserName()), id); + Map result = workerGroupService.deleteWorkerGroupById(loginUser, id); + return returnDataList(result); + } + + /** + * query worker address list + * + * @param loginUser login user + * @return all worker address list + */ + @ApiOperation(value = "queryWorkerAddressList", notes = "QUERY_WORKER_ADDRESS_LIST_NOTES") + @GetMapping(value = "/worker-address-list") + @ResponseStatus(HttpStatus.OK) + @ApiException(QUERY_WORKER_ADDRESS_LIST_FAIL) + public Result queryWorkerAddressList(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser) { + logger.info("query worker address list: login user {}", RegexUtils.escapeNRT(loginUser.getUserName())); + Map result = workerGroupService.getWorkerAddressList(); + return returnDataList(result); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 5268b1d1ce..c936329080 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -16,10 +16,10 @@ */ package org.apache.dolphinscheduler.api.enums; -import org.springframework.context.i18n.LocaleContextHolder; - import java.util.Locale; +import org.springframework.context.i18n.LocaleContextHolder; + /** * status enum */ @@ -180,6 +180,11 @@ public enum Status { DATASOURCE_OTHER_PARAMS_ILLEGAL(10171, "datasource other params illegal", "数据源其他参数不合法"), DATASOURCE_NAME_ILLEGAL(10172, "datasource name illegal", "数据源名称不合法"), DATASOURCE_HOST_ILLEGAL(10173, "datasource host illegal", "数据源HOST不合法"), + DELETE_WORKER_GROUP_NOT_EXIST(10174, "delete worker group not exist ", "删除worker分组不存在"), + CREATE_WORKER_GROUP_FORBIDDEN_IN_DOCKER(10175, "create worker group forbidden in docker ", "创建worker分组在docker中禁止"), + DELETE_WORKER_GROUP_FORBIDDEN_IN_DOCKER(10176, "delete worker group forbidden in docker ", "删除worker分组在docker中禁止"), + WORKER_ADDRESS_INVALID(10177, "worker address {0} invalid", "worker地址[{0}]无效"), + QUERY_WORKER_ADDRESS_LIST_FAIL(10178, "query worker address list fail ", "查询worker地址列表失败"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java index 55c4fa113b..c653268a7f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/MonitorService.java @@ -18,12 +18,6 @@ package org.apache.dolphinscheduler.api.service; import static org.apache.dolphinscheduler.common.utils.Preconditions.checkNotNull; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; -import java.util.stream.Collectors; - import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor; import org.apache.dolphinscheduler.common.Constants; @@ -34,8 +28,16 @@ import org.apache.dolphinscheduler.dao.MonitorDBDao; import org.apache.dolphinscheduler.dao.entity.MonitorRecord; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; + import com.google.common.collect.Sets; /** @@ -150,7 +152,7 @@ public class MonitorService extends BaseService { checkNotNull(zookeeperMonitor); ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER; - return zookeeperMonitor.getServersList(zkNodeType); + return zookeeperMonitor.getServerList(zkNodeType); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index 26542b2ba7..89fa7b18e0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -18,25 +18,31 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; -import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; /** * work group service @@ -44,14 +50,115 @@ import org.springframework.stereotype.Service; @Service public class WorkerGroupService extends BaseService { + private static final Logger logger = LoggerFactory.getLogger(WorkerGroupService.class); @Autowired - ProcessInstanceMapper processInstanceMapper; + WorkerGroupMapper workerGroupMapper; @Autowired protected ZookeeperCachedOperator zookeeperCachedOperator; + @Autowired + private ZookeeperMonitor zookeeperMonitor; + @Autowired + ProcessInstanceMapper processInstanceMapper; + + /** + * create or update a worker group + * + * @param loginUser login user + * @param id worker group id + * @param name worker group name + * @param addrList addr list + * @return create or update result code + */ + public Map saveWorkerGroup(User loginUser, int id, String name, String addrList) { + Map result = new HashMap<>(); + if (checkAdmin(loginUser, result)) { + return result; + } + if (Constants.DOCKER_MODE && !Constants.KUBERNETES_MODE) { + putMsg(result, Status.CREATE_WORKER_GROUP_FORBIDDEN_IN_DOCKER); + return result; + } + if (StringUtils.isEmpty(name)) { + putMsg(result, Status.NAME_NULL); + return result; + } + Date now = new Date(); + WorkerGroup workerGroup; + if (id != 0) { + workerGroup = workerGroupMapper.selectById(id); + // check exist + if (workerGroup == null) { + workerGroup = new WorkerGroup(); + workerGroup.setCreateTime(now); + } + } else { + workerGroup = new WorkerGroup(); + workerGroup.setCreateTime(now); + } + workerGroup.setName(name); + workerGroup.setAddrList(addrList); + workerGroup.setUpdateTime(now); + + if (checkWorkerGroupNameExists(workerGroup)) { + putMsg(result, Status.NAME_EXIST, workerGroup.getName()); + return result; + } + String invalidAddr = checkWorkerGroupAddrList(workerGroup); + if (invalidAddr != null) { + putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr); + return result; + } + if (workerGroup.getId() != 0) { + workerGroupMapper.updateById(workerGroup); + } else { + workerGroupMapper.insert(workerGroup); + } + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * check worker group name exists + * @param workerGroup worker group + * @return boolean + */ + private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) { + List workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName()); + if (CollectionUtils.isNotEmpty(workerGroupList)) { + // new group has same name + if (workerGroup.getId() == 0) { + return true; + } + // check group id + for (WorkerGroup group : workerGroupList) { + if (group.getId() != workerGroup.getId()) { + return true; + } + } + } + // check zookeeper + String workerGroupPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS + Constants.SLASH + workerGroup.getName(); + return zookeeperCachedOperator.isExisted(workerGroupPath); + } + + /** + * check worker group addr list + * @param workerGroup worker group + * @return boolean + */ + private String checkWorkerGroupAddrList(WorkerGroup workerGroup) { + Map serverMaps = zookeeperMonitor.getServerMaps(ZKNodeType.WORKER, true); + for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) { + if (!serverMaps.containsKey(addr)) { + return addr; + } + } + return null; + } /** * query worker group paging @@ -62,39 +169,39 @@ public class WorkerGroupService extends BaseService { * @param pageSize page size * @return worker group list page */ - public Map queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) { - + public Map queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal) { // list from index - Integer fromIndex = (pageNo - 1) * pageSize; + int fromIndex = (pageNo - 1) * pageSize; // list to index - Integer toIndex = (pageNo - 1) * pageSize + pageSize; + int toIndex = (pageNo - 1) * pageSize + pageSize; - Map result = new HashMap<>(5); + Map result = new HashMap<>(); if (checkAdmin(loginUser, result)) { return result; } List workerGroups = getWorkerGroups(true); - List resultDataList = new ArrayList<>(); - if (CollectionUtils.isNotEmpty(workerGroups)){ + if (CollectionUtils.isNotEmpty(workerGroups)) { List searchValDataList = new ArrayList<>(); - if (StringUtils.isNotEmpty(searchVal)){ - for (WorkerGroup workerGroup : workerGroups){ - if (workerGroup.getName().contains(searchVal)){ + if (StringUtils.isNotEmpty(searchVal)) { + for (WorkerGroup workerGroup : workerGroups) { + if (workerGroup.getName().contains(searchVal)) { searchValDataList.add(workerGroup); } } - }else { + } else { searchValDataList = workerGroups; } - if (searchValDataList.size() < pageSize){ - toIndex = (pageNo - 1) * pageSize + searchValDataList.size(); + if (fromIndex < searchValDataList.size()) { + if (toIndex > searchValDataList.size()) { + toIndex = searchValDataList.size(); + } + resultDataList = searchValDataList.subList(fromIndex, toIndex); } - resultDataList = searchValDataList.subList(fromIndex, toIndex); } PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); @@ -106,57 +213,121 @@ public class WorkerGroupService extends BaseService { return result; } - - /** * query all worker group * * @return all worker group list */ - public Map queryAllGroup() { + public Map queryAllGroup() { Map result = new HashMap<>(); - List workerGroups = getWorkerGroups(false); - - Set availableWorkerGroupSet = workerGroups.stream() - .map(workerGroup -> workerGroup.getName()) - .collect(Collectors.toSet()); - result.put(Constants.DATA_LIST, availableWorkerGroupSet); + List availableWorkerGroupList = workerGroups.stream() + .map(WorkerGroup::getName) + .collect(Collectors.toList()); + int index = availableWorkerGroupList.indexOf(Constants.DEFAULT_WORKER_GROUP); + if (index > -1) { + availableWorkerGroupList.remove(index); + availableWorkerGroupList.add(0, Constants.DEFAULT_WORKER_GROUP); + } + result.put(Constants.DATA_LIST, availableWorkerGroupList); putMsg(result, Status.SUCCESS); return result; } - /** - * get worker groups + * get worker groups * * @param isPaging whether paging * @return WorkerGroup list */ private List getWorkerGroups(boolean isPaging) { - String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker"; - List workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath); - - // available workerGroup list - List availableWorkerGroupList = new ArrayList<>(); - List workerGroups = new ArrayList<>(); - - for (String workerGroup : workerGroupList){ - String workerGroupPath= workerPath + "/" + workerGroup; - List childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); - if (CollectionUtils.isNotEmpty(childrenNodes)){ - availableWorkerGroupList.add(workerGroup); + // worker groups from database + List workerGroups = workerGroupMapper.queryAllWorkerGroup(); + // worker groups from zookeeper + String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; + List workerGroupList = null; + try { + workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath); + } catch (Exception e) { + logger.error("getWorkerGroups exception: {}, workerPath: {}, isPaging: {}", e.getMessage(), workerPath, isPaging); + } + + if (CollectionUtils.isEmpty(workerGroupList)) { + if (CollectionUtils.isEmpty(workerGroups) && !isPaging) { WorkerGroup wg = new WorkerGroup(); - wg.setName(workerGroup); - if (isPaging){ - wg.setIpList(childrenNodes.stream().map(node -> Host.of(node).getIp()).collect(Collectors.toList())); - String registeredValue = zookeeperCachedOperator.get(workerGroupPath + "/" + childrenNodes.get(0)); - wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(",")[6])); - wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(",")[7])); - } + wg.setName(Constants.DEFAULT_WORKER_GROUP); workerGroups.add(wg); } + return workerGroups; + } + + for (String workerGroup : workerGroupList) { + String workerGroupPath = workerPath + Constants.SLASH + workerGroup; + List childrenNodes = null; + try { + childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); + } catch (Exception e) { + logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath); + } + if (childrenNodes == null || childrenNodes.isEmpty()) { + continue; + } + WorkerGroup wg = new WorkerGroup(); + wg.setName(workerGroup); + if (isPaging) { + wg.setAddrList(String.join(Constants.COMMA, childrenNodes)); + String registeredValue = zookeeperCachedOperator.get(workerGroupPath + Constants.SLASH + childrenNodes.get(0)); + wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[6])); + wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[7])); + wg.setSystemDefault(true); + } + workerGroups.add(wg); } return workerGroups; } + + /** + * delete worker group by id + * @param id worker group id + * @return delete result code + */ + @Transactional(rollbackFor = Exception.class) + public Map deleteWorkerGroupById(User loginUser, Integer id) { + Map result = new HashMap<>(); + if (checkAdmin(loginUser, result)) { + return result; + } + if (Constants.DOCKER_MODE && !Constants.KUBERNETES_MODE) { + putMsg(result, Status.DELETE_WORKER_GROUP_FORBIDDEN_IN_DOCKER); + return result; + } + WorkerGroup workerGroup = workerGroupMapper.selectById(id); + if (workerGroup == null) { + putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST); + return result; + } + List processInstances = processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), Constants.NOT_TERMINATED_STATES); + if (CollectionUtils.isNotEmpty(processInstances)) { + putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size()); + return result; + } + workerGroupMapper.deleteById(id); + processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), ""); + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * query all worker address list + * + * @return all worker address list + */ + public Map getWorkerAddressList() { + Map result = new HashMap<>(); + List serverNodeList = zookeeperMonitor.getServerNodeList(ZKNodeType.WORKER, true); + result.put(Constants.DATA_LIST, serverNodeList); + putMsg(result, Status.SUCCESS); + return result; + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java new file mode 100644 index 0000000000..482cb55306 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/RegexUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.utils; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * This is Regex expression utils. + */ +public class RegexUtils { + + /** + * check number regex expression + */ + private static final String CHECK_NUMBER = "^-?\\d+(\\.\\d+)?$"; + + private RegexUtils() { + } + + /** + * check if the input is number + * + * @param str input + * @return + */ + public static boolean isNumeric(String str) { + Pattern pattern = Pattern.compile(CHECK_NUMBER); + Matcher isNum = pattern.matcher(str); + return isNum.matches(); + } + + public static String escapeNRT(String str) { + // Logging should not be vulnerable to injection attacks: Replace pattern-breaking characters + if (str != null && !str.isEmpty()) { + return str.replaceAll("[\n|\r|\t]", "_"); + } + return null; + } + +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java index 7ba704916c..9fd4386d20 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java @@ -17,90 +17,89 @@ package org.apache.dolphinscheduler.api.utils; import org.apache.dolphinscheduler.common.enums.ZKNodeType; -import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Date; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** - * monitor zookeeper info + * monitor zookeeper info */ @Component public class ZookeeperMonitor extends AbstractZKClient { - private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class); - - /** - * - * @return zookeeper info list - */ - public List zookeeperInfoList(){ - String zookeeperServers = getZookeeperQuorum().replaceAll("[\\t\\n\\x0B\\f\\r]", ""); - try{ - return zookeeperInfoList(zookeeperServers); - }catch(Exception e){ - LOG.error(e.getMessage(),e); - } - return null; - } - - /** - * get master servers - * @return master server information - */ - public List getMasterServers(){ - return getServersList(ZKNodeType.MASTER); - } - - /** - * master construct is the same with worker, use the master instead - * @return worker server informations - */ - public List getWorkerServers(){ - return getServersList(ZKNodeType.WORKER); - } - - private static List zookeeperInfoList(String zookeeperServers) { - - List list = new ArrayList<>(5); - - if(StringUtils.isNotBlank(zookeeperServers)){ - String[] zookeeperServersArray = zookeeperServers.split(","); - - for (String zookeeperServer : zookeeperServersArray) { - ZooKeeperState state = new ZooKeeperState(zookeeperServer); - boolean ok = state.ruok(); - if(ok){ - state.getZookeeperInfo(); - } - - String hostName = zookeeperServer; - int connections = state.getConnections(); - int watches = state.getWatches(); - long sent = state.getSent(); - long received = state.getReceived(); - String mode = state.getMode(); - float minLatency = state.getMinLatency(); - float avgLatency = state.getAvgLatency(); - float maxLatency = state.getMaxLatency(); - int nodeCount = state.getNodeCount(); - int status = ok ? 1 : 0; - Date date = new Date(); - - ZookeeperRecord zookeeperRecord = new ZookeeperRecord(hostName,connections,watches,sent,received,mode,minLatency,avgLatency,maxLatency,nodeCount,status,date); - list.add(zookeeperRecord); - - } - } - - return list; - } + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class); + + /** + * + * @return zookeeper info list + */ + public List zookeeperInfoList() { + String zookeeperServers = getZookeeperQuorum().replaceAll("[\\t\\n\\x0B\\f\\r]", ""); + try { + return zookeeperInfoList(zookeeperServers); + } catch (Exception e) { + LOG.error(e.getMessage(),e); + } + return null; + } + + /** + * get master servers + * @return master server information + */ + public List getMasterServers() { + return getServerList(ZKNodeType.MASTER); + } + + /** + * master construct is the same with worker, use the master instead + * @return worker server informations + */ + public List getWorkerServers() { + return getServerList(ZKNodeType.WORKER); + } + + private static List zookeeperInfoList(String zookeeperServers) { + + List list = new ArrayList<>(5); + + if(StringUtils.isNotBlank(zookeeperServers)) { + String[] zookeeperServersArray = zookeeperServers.split(","); + + for (String zookeeperServer : zookeeperServersArray) { + ZooKeeperState state = new ZooKeeperState(zookeeperServer); + boolean ok = state.ruok(); + if (ok) { + state.getZookeeperInfo(); + } + + int connections = state.getConnections(); + int watches = state.getWatches(); + long sent = state.getSent(); + long received = state.getReceived(); + String mode = state.getMode(); + float minLatency = state.getMinLatency(); + float avgLatency = state.getAvgLatency(); + float maxLatency = state.getMaxLatency(); + int nodeCount = state.getNodeCount(); + int status = ok ? 1 : 0; + Date date = new Date(); + + ZookeeperRecord zookeeperRecord = new ZookeeperRecord(zookeeperServer,connections,watches,sent,received,mode,minLatency,avgLatency,maxLatency,nodeCount,status,date); + list.add(zookeeperRecord); + + } + } + + return list; + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index d3c60cc499..70298f75b6 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -16,18 +16,25 @@ */ package org.apache.dolphinscheduler.api.service; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.UserType; -import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -35,84 +42,155 @@ import org.junit.runner.RunWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.internal.matchers.Any; import org.mockito.junit.MockitoJUnitRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; @RunWith(MockitoJUnitRunner.class) public class WorkerGroupServiceTest { - private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceTest.class); - @InjectMocks private WorkerGroupService workerGroupService; + @Mock + private WorkerGroupMapper workerGroupMapper; + @Mock private ProcessInstanceMapper processInstanceMapper; @Mock private ZookeeperCachedOperator zookeeperCachedOperator; + @Mock + private ZookeeperMonitor zookeeperMonitor; + + private String groupName = "groupName000001"; @Before - public void init(){ + public void init() { ZookeeperConfig zookeeperConfig = new ZookeeperConfig(); zookeeperConfig.setDsRoot("/dolphinscheduler_qzw"); Mockito.when(zookeeperCachedOperator.getZookeeperConfig()).thenReturn(zookeeperConfig); - String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker"; + String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; List workerGroupStrList = new ArrayList<>(); workerGroupStrList.add("default"); workerGroupStrList.add("test"); Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath)).thenReturn(workerGroupStrList); - List defaultIpList = new ArrayList<>(); - defaultIpList.add("192.168.220.188:1234"); - defaultIpList.add("192.168.220.189:1234"); + List defaultAddressList = new ArrayList<>(); + defaultAddressList.add("192.168.220.188:1234"); + defaultAddressList.add("192.168.220.189:1234"); + + Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultAddressList); - Mockito.when(zookeeperCachedOperator.getChildrenKeys(workerPath + "/default")).thenReturn(defaultIpList); + Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238"); + } + + /** + * create or update a worker group + */ + @Test + public void testSaveWorkerGroup() { + // worker server maps + Map serverMaps = new HashMap<>(); + serverMaps.put("127.0.0.1:1234", "0.3,0.07,4.4,7.42,16.0,0.3,2021-03-19 20:17:58,2021-03-19 20:25:29,0,79214"); + Mockito.when(zookeeperMonitor.getServerMaps(ZKNodeType.WORKER, true)).thenReturn(serverMaps); + + User user = new User(); + // general user add + user.setUserType(UserType.GENERAL_USER); + Map result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234"); + Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(), result.get(Constants.MSG)); - Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultIpList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238"); + // success + user.setUserType(UserType.ADMIN_USER); + result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234"); + Assert.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG)); + // group name exist + Mockito.when(workerGroupMapper.selectById(2)).thenReturn(getWorkerGroup(2)); + Mockito.when(workerGroupMapper.queryWorkerGroupByName(groupName)).thenReturn(getList()); + result = workerGroupService.saveWorkerGroup(user, 2, groupName, "127.0.0.1:1234"); + Assert.assertEquals(Status.NAME_EXIST, result.get(Constants.STATUS)); } /** - * query worker group paging + * query worker group paging */ @Test - public void testQueryAllGroupPaging(){ + public void testQueryAllGroupPaging() { User user = new User(); // general user add user.setUserType(UserType.ADMIN_USER); Map result = workerGroupService.queryAllGroupPaging(user, 1, 10, null); PageInfo pageInfo = (PageInfo) result.get(Constants.DATA_LIST); - Assert.assertEquals(pageInfo.getLists().size(),1); + Assert.assertEquals(pageInfo.getLists().size(), 1); } - @Test - public void testQueryAllGroup() throws Exception { + public void testQueryAllGroup() { Map result = workerGroupService.queryAllGroup(); - Set workerGroups = (Set) result.get(Constants.DATA_LIST); + List workerGroups = (List) result.get(Constants.DATA_LIST); Assert.assertEquals(workerGroups.size(), 1); } + /** + * delete group by id + */ + @Test + public void testDeleteWorkerGroupById() { + User user = new User(); + user.setUserType(UserType.ADMIN_USER); + WorkerGroup wg2 = getWorkerGroup(2); + Mockito.when(workerGroupMapper.selectById(2)).thenReturn(wg2); + Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg2.getName(), Constants.NOT_TERMINATED_STATES)).thenReturn(getProcessInstanceList()); + Map result = workerGroupService.deleteWorkerGroupById(user, 1); + Assert.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); + result = workerGroupService.deleteWorkerGroupById(user, 2); + Assert.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); + // correct + WorkerGroup wg3 = getWorkerGroup(3); + Mockito.when(workerGroupMapper.selectById(3)).thenReturn(wg3); + Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg3.getName(), Constants.NOT_TERMINATED_STATES)).thenReturn(new ArrayList<>()); + result = workerGroupService.deleteWorkerGroupById(user, 3); + Assert.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG)); + } /** * get processInstances - * @return */ - private List getProcessInstanceList(){ - + private List getProcessInstanceList() { List processInstances = new ArrayList<>(); processInstances.add(new ProcessInstance()); return processInstances; } + @Test + public void testQueryAllGroupWithDefault() { + Map result = workerGroupService.queryAllGroup(); + List workerGroups = (List) result.get(Constants.DATA_LIST); + Assert.assertEquals(1, workerGroups.size()); + Assert.assertEquals("default", workerGroups.toArray()[0]); + } + + /** + * get Group + * @return + */ + private WorkerGroup getWorkerGroup(int id) { + WorkerGroup workerGroup = new WorkerGroup(); + workerGroup.setName(groupName); + workerGroup.setId(id); + return workerGroup; + } + + private WorkerGroup getWorkerGroup() { + return getWorkerGroup(1); + } + + private List getList() { + List list = new ArrayList<>(); + list.add(getWorkerGroup()); + return list; + } + } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 163af3c95a..76f27b8aad 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -760,8 +760,8 @@ public final class Constants { /** * master/worker server use for zk */ - public static final String MASTER_PREFIX = "master"; - public static final String WORKER_PREFIX = "worker"; + public static final String MASTER_TYPE = "master"; + public static final String WORKER_TYPE = "worker"; public static final String DELETE_ZK_OP = "delete"; public static final String ADD_ZK_OP = "add"; public static final String ALIAS = "alias"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ZKNodeType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ZKNodeType.java index b4b3c59321..034f880694 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ZKNodeType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ZKNodeType.java @@ -25,7 +25,6 @@ public enum ZKNodeType { * 0 master node; * 1 worker node; * 2 dead_server node; - * 3 task_queue node; */ - MASTER, WORKER, DEAD_SERVER, TASK_QUEUE; + MASTER, WORKER, DEAD_SERVER; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java index fa52a91f91..e869e309dc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/ResInfo.java @@ -112,4 +112,17 @@ public class ResInfo { return masterServer; } + /** + * is valid heartbeat info for zk + * @param heartBeatInfo heartbeat info + * @return heartbeat info is valid + */ + public static boolean isValidHeartbeatForZKInfo(String heartBeatInfo) { + if (StringUtils.isNotEmpty(heartBeatInfo)) { + String[] parts = heartBeatInfo.split(Constants.COMMA); + return parts.length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH; + } + return false; + } + } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java index a2cc3fdf6c..f41cf96f2e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java @@ -16,23 +16,59 @@ */ package org.apache.dolphinscheduler.dao.entity; - import java.util.Date; -import java.util.List; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; /** * worker group */ +@TableName("t_ds_worker_group") public class WorkerGroup { + @TableId(value = "id", type = IdType.AUTO) + private int id; + private String name; - private List ipList; + private String addrList; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date createTime; + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date updateTime; + @TableField(exist = false) + private boolean systemDefault; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getAddrList() { + return addrList; + } + + public void setAddrList(String addrList) { + this.addrList = addrList; + } public Date getCreateTime() { return createTime; @@ -50,21 +86,24 @@ public class WorkerGroup { this.updateTime = updateTime; } - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; + public boolean getSystemDefault() { + return systemDefault; } - public List getIpList() { - return ipList; + public void setSystemDefault(boolean systemDefault) { + this.systemDefault = systemDefault; } - public void setIpList(List ipList) { - this.ipList = ipList; + @Override + public String toString() { + return "WorkerGroup{" + + "id= " + id + + ", name= " + name + + ", addrList= " + addrList + + ", createTime= " + createTime + + ", updateTime= " + updateTime + + ", systemDefault= " + systemDefault + + "}"; } - } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index 5ca192811e..7fe9c4bf05 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -16,17 +16,19 @@ */ package org.apache.dolphinscheduler.dao.mapper; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.dao.entity.ExecuteStatusCount; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; -import org.apache.dolphinscheduler.common.enums.ExecutionStatus; + import org.apache.ibatis.annotations.Param; import java.util.Date; import java.util.List; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + /** * process instance mapper interface */ @@ -59,12 +61,12 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * query process instance by worker group and stateArray - * @param workerGroupId workerGroupId + * @param workerGroupName workerGroupName * @param states states array * @return process instance list */ - List queryByWorkerGroupIdAndStatus(@Param("workerGroupId") int workerGroupId, - @Param("states") int[] states); + List queryByWorkerGroupNameAndStatus(@Param("workerGroupName") String workerGroupName, + @Param("states") int[] states); /** * process instance page @@ -130,12 +132,13 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("destTenantId") int destTenantId); /** - * update process instance by worker groupId - * @param originWorkerGroupId originWorkerGroupId - * @param destWorkerGroupId destWorkerGroupId + * update process instance by worker group name + * @param originWorkerGroupName originWorkerGroupName + * @param destWorkerGroupName destWorkerGroupName * @return update result */ - int updateProcessInstanceByWorkerGroupId(@Param("originWorkerGroupId") int originWorkerGroupId, @Param("destWorkerGroupId") int destWorkerGroupId); + int updateProcessInstanceByWorkerGroupName(@Param("originWorkerGroupName") String originWorkerGroupName, + @Param("destWorkerGroupName") String destWorkerGroupName); /** * count process instance state by user diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java new file mode 100644 index 0000000000..0404c17a60 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * worker group mapper interface + */ +public interface WorkerGroupMapper extends BaseMapper { + + /** + * query all worker group + * @return worker group list + */ + List queryAllWorkerGroup(); + + /** + * query worer grouop by name + * @param name name + * @return worker group list + */ + List queryWorkerGroupByName(@Param("name") String name); + +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index 1ab759b00c..5bb952b3f0 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -51,12 +51,12 @@ order by id asc - select * from t_ds_process_instance where 1=1 - - and worker_group_id =#{workerGroupId} + + and worker_group =#{workerGroupName} and state in @@ -116,10 +116,10 @@ where tenant_id = #{originTenantId} - + update t_ds_process_instance - set worker_group_id = #{destWorkerGroupId} - where worker_group_id = #{originWorkerGroupId} + set worker_group = #{destWorkerGroupName} + where worker_group = #{originWorkerGroupName} + select * + from t_ds_worker_group + order by update_time desc + + + diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index e03e8e87b1..e126b16d26 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -28,7 +28,6 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerService; -import org.apache.dolphinscheduler.server.worker.WorkerServer; import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.quartz.QuartzExecutors; @@ -44,8 +43,15 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.FilterType; +/** + * master server + */ @ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { - @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {WorkerServer.class}) + @ComponentScan.Filter(type = FilterType.REGEX, pattern = { + "org.apache.dolphinscheduler.server.worker.*", + "org.apache.dolphinscheduler.server.monitor.*", + "org.apache.dolphinscheduler.server.log.*" + }) }) public class MasterServer implements IStoppable { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java index 7ded3b0056..c2cb677445 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManager.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor; -import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -29,18 +28,22 @@ import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteExce import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; + +import org.apache.commons.collections.CollectionUtils; -import javax.annotation.PostConstruct; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import javax.annotation.PostConstruct; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + /** * netty executor manager */ @@ -50,10 +53,10 @@ public class NettyExecutorManager extends AbstractExecutorManager{ private final Logger logger = LoggerFactory.getLogger(NettyExecutorManager.class); /** - * zookeeper node manager + * server node manager */ @Autowired - private ZookeeperNodeManager zookeeperNodeManager; + private ServerNodeManager serverNodeManager; /** * netty remote client @@ -183,7 +186,7 @@ public class NettyExecutorManager extends AbstractExecutorManager{ ExecutorType executorType = context.getExecutorType(); switch (executorType){ case WORKER: - nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup()); + nodes = serverNodeManager.getWorkerGroupNodes(context.getWorkerGroup()); break; case CLIENT: break; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java index 58006bf7f7..fbc510ede5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java @@ -21,28 +21,25 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Set; +import org.springframework.beans.factory.annotation.Autowired; /** - * round robin host manager + * common host manager */ public abstract class CommonHostManager implements HostManager { - private final Logger logger = LoggerFactory.getLogger(CommonHostManager.class); - /** - * zookeeperNodeManager + * server node manager */ @Autowired - protected ZookeeperNodeManager zookeeperNodeManager; + protected ServerNodeManager serverNodeManager; /** * select host @@ -50,39 +47,37 @@ public abstract class CommonHostManager implements HostManager { * @return host */ @Override - public Host select(ExecutionContext context){ - Host host = new Host(); - Collection nodes = null; - /** - * executor type - */ + public Host select(ExecutionContext context) { + List candidates = null; + String workerGroup = context.getWorkerGroup(); ExecutorType executorType = context.getExecutorType(); - switch (executorType){ + switch (executorType) { case WORKER: - nodes = zookeeperNodeManager.getWorkerGroupNodes(context.getWorkerGroup()); + candidates = getWorkerCandidates(workerGroup); break; case CLIENT: break; default: throw new IllegalArgumentException("invalid executorType : " + executorType); - } - if(CollectionUtils.isEmpty(nodes)){ - return host; - } - List candidateHosts = new ArrayList<>(nodes.size()); - nodes.stream().forEach(node -> candidateHosts.add(Host.of(node))); - return select(candidateHosts); + if (CollectionUtils.isEmpty(candidates)) { + return new Host(); + } + return select(candidates); } protected abstract Host select(Collection nodes); - public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) { - this.zookeeperNodeManager = zookeeperNodeManager; + protected List getWorkerCandidates(String workerGroup) { + List hosts = new ArrayList<>(); + Set nodes = serverNodeManager.getWorkerGroupNodes(workerGroup); + if (CollectionUtils.isNotEmpty(nodes)) { + for (String node : nodes) { + hosts.add(Host.of(node)); + } + } + return hosts; } - public ZookeeperNodeManager getZookeeperNodeManager() { - return zookeeperNodeManager; - } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java index 458a1ee036..bce1d4ff75 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java @@ -45,7 +45,7 @@ public class HostManagerConfig { String hostSelector = masterConfig.getHostSelector(); HostSelector selector = HostSelector.of(hostSelector); HostManager hostManager; - switch (selector){ + switch (selector) { case RANDOM: hostManager = new RandomHostManager(); break; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index ac7d8b0ffc..181cb98467 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -19,20 +19,18 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.CollectionUtils; -import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.common.utils.ResInfo; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -40,27 +38,19 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.dolphinscheduler.common.Constants.COMMA; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * round robin host manager + * lower weight host manager */ public class LowerWeightHostManager extends CommonHostManager { private final Logger logger = LoggerFactory.getLogger(LowerWeightHostManager.class); - /** - * zookeeper registry center - */ - @Autowired - private ZookeeperRegistryCenter registryCenter; - - /** - * round robin host manager - */ - private RoundRobinHostManager roundRobinHostManager; - /** * selector */ @@ -82,18 +72,16 @@ public class LowerWeightHostManager extends CommonHostManager { private ScheduledExecutorService executorService; @PostConstruct - public void init(){ + public void init() { this.selector = new LowerWeightRoundRobin(); this.workerHostWeightsMap = new ConcurrentHashMap<>(); this.lock = new ReentrantLock(); this.executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LowerWeightHostManagerExecutor")); this.executorService.scheduleWithFixedDelay(new RefreshResourceTask(),0, 5, TimeUnit.SECONDS); - this.roundRobinHostManager = new RoundRobinHostManager(); - this.roundRobinHostManager.setZookeeperNodeManager(getZookeeperNodeManager()); } @PreDestroy - public void close(){ + public void close() { this.executorService.shutdownNow(); } @@ -103,9 +91,9 @@ public class LowerWeightHostManager extends CommonHostManager { * @return host */ @Override - public Host select(ExecutionContext context){ + public Host select(ExecutionContext context) { Set workerHostWeights = getWorkerHostWeights(context.getWorkerGroup()); - if(CollectionUtils.isNotEmpty(workerHostWeights)){ + if (CollectionUtils.isNotEmpty(workerHostWeights)) { return selector.select(workerHostWeights).getHost(); } return new Host(); @@ -116,7 +104,7 @@ public class LowerWeightHostManager extends CommonHostManager { throw new UnsupportedOperationException("not support"); } - private void syncWorkerHostWeight(Map> workerHostWeights){ + private void syncWorkerHostWeight(Map> workerHostWeights) { lock.lock(); try { workerHostWeightsMap.clear(); @@ -126,7 +114,7 @@ public class LowerWeightHostManager extends CommonHostManager { } } - private Set getWorkerHostWeights(String workerGroup){ + private Set getWorkerHostWeights(String workerGroup) { lock.lock(); try { return workerHostWeightsMap.get(workerGroup); @@ -135,46 +123,50 @@ public class LowerWeightHostManager extends CommonHostManager { } } - class RefreshResourceTask implements Runnable{ + class RefreshResourceTask implements Runnable { @Override public void run() { try { - Map> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes(); - Set>> entries = workerGroupNodes.entrySet(); Map> workerHostWeights = new HashMap<>(); - for(Map.Entry> entry : entries){ + Map> workerGroupNodes = serverNodeManager.getWorkerGroupNodes(); + for (Map.Entry> entry : workerGroupNodes.entrySet()) { String workerGroup = entry.getKey(); Set nodes = entry.getValue(); - String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup); Set hostWeights = new HashSet<>(nodes.size()); - for(String node : nodes){ - String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node); - if(StringUtils.isNotEmpty(heartbeat) - && heartbeat.split(COMMA).length == Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){ - String[] parts = heartbeat.split(COMMA); - - int status = Integer.parseInt(parts[8]); - if (status == Constants.ABNORMAL_NODE_STATUS){ - logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", - Double.parseDouble(parts[3]) , Double.parseDouble(parts[2])); - continue; - } - - double cpu = Double.parseDouble(parts[0]); - double memory = Double.parseDouble(parts[1]); - double loadAverage = Double.parseDouble(parts[2]); - HostWeight hostWeight = new HostWeight(Host.of(node), cpu, memory, loadAverage); + for (String node : nodes) { + String heartbeat = serverNodeManager.getWorkerNodeInfo(node); + HostWeight hostWeight = getHostWeight(node, heartbeat); + if (hostWeight != null) { hostWeights.add(hostWeight); } } - workerHostWeights.put(workerGroup, hostWeights); + if (!hostWeights.isEmpty()) { + workerHostWeights.put(workerGroup, hostWeights); + } } syncWorkerHostWeight(workerHostWeights); - } catch (Throwable ex){ + } catch (Throwable ex) { logger.error("RefreshResourceTask error", ex); } } + + public HostWeight getHostWeight(String addr, String heartbeat) { + if (ResInfo.isValidHeartbeatForZKInfo(heartbeat)) { + String[] parts = heartbeat.split(Constants.COMMA); + int status = Integer.parseInt(parts[8]); + if (status == Constants.ABNORMAL_NODE_STATUS) { + logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", + Double.parseDouble(parts[3]), Double.parseDouble(parts[2])); + return null; + } + double cpu = Double.parseDouble(parts[0]); + double memory = Double.parseDouble(parts[1]); + double loadAverage = Double.parseDouble(parts[2]); + return new HostWeight(Host.of(addr), cpu, memory, loadAverage); + } + return null; + } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java index ef2b6fd22f..5918abee09 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RandomHostManager.java @@ -23,9 +23,8 @@ import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector; import java.util.Collection; - /** - * round robin host manager + * random host manager */ public class RandomHostManager extends CommonHostManager { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java index e9fef49ecf..f5c1d4a2b4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManager.java @@ -23,7 +23,6 @@ import org.apache.dolphinscheduler.server.master.dispatch.host.assign.Selector; import java.util.Collection; - /** * round robin host manager */ @@ -37,7 +36,7 @@ public class RoundRobinHostManager extends CommonHostManager { /** * set round robin */ - public RoundRobinHostManager(){ + public RoundRobinHostManager() { this.selector = new RoundRobinSelector<>(); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index c24302a7ab..84f39a8650 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -81,7 +81,7 @@ public class MasterRegistry { * registry */ public void registry() { - String address = NetUtils.getHost(); + String address = NetUtils.getAddr(masterConfig.getListenPort()); String localNodePath = getMasterPath(); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener( @@ -98,10 +98,10 @@ public class MasterRegistry { }); int masterHeartbeatInterval = masterConfig.getMasterHeartbeatInterval(); HeartBeatTask heartBeatTask = new HeartBeatTask(startTime, - masterConfig.getMasterReservedMemory(), masterConfig.getMasterMaxCpuloadAvg(), + masterConfig.getMasterReservedMemory(), Sets.newHashSet(getMasterPath()), - Constants.MASTER_PREFIX, + Constants.MASTER_TYPE, zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index ea187c20db..dcabd0346f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -48,17 +48,17 @@ public class HeartBeatTask implements Runnable { protected IStoppable stoppable = null; public HeartBeatTask(String startTime, - double reservedMemory, double maxCpuloadAvg, + double reservedMemory, Set heartBeatPaths, String serverType, ZookeeperRegistryCenter zookeeperRegistryCenter) { this.startTime = startTime; - this.reservedMemory = reservedMemory; this.maxCpuloadAvg = maxCpuloadAvg; + this.reservedMemory = reservedMemory; this.heartBeatPaths = heartBeatPaths; - this.zookeeperRegistryCenter = zookeeperRegistryCenter; this.serverType = serverType; + this.zookeeperRegistryCenter = zookeeperRegistryCenter; } @Override diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java similarity index 60% rename from dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java rename to dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java index bae4d141d4..a487c21218 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ServerNodeManager.java @@ -17,63 +17,103 @@ package org.apache.dolphinscheduler.server.registry; -import org.apache.commons.collections.CollectionUtils; -import org.apache.curator.framework.CuratorFramework; - -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; +import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; +import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.service.zk.AbstractListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; +import org.apache.dolphinscheduler.service.zk.AbstractZKClient; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; +import javax.annotation.PreDestroy; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import org.springframework.stereotype.Service; /** - * zookeeper node manager + * server node manager */ @Service -public class ZookeeperNodeManager implements InitializingBean { +public class ServerNodeManager implements InitializingBean { - private final Logger logger = LoggerFactory.getLogger(ZookeeperNodeManager.class); + private final Logger logger = LoggerFactory.getLogger(ServerNodeManager.class); /** - * master lock + * master lock */ private final Lock masterLock = new ReentrantLock(); /** - * worker group lock + * worker group lock */ private final Lock workerGroupLock = new ReentrantLock(); /** - * worker group nodes + * worker node info lock + */ + private final Lock workerNodeInfoLock = new ReentrantLock(); + + /** + * worker group nodes */ private final ConcurrentHashMap> workerGroupNodes = new ConcurrentHashMap<>(); /** - * master nodes + * master nodes */ private final Set masterNodes = new HashSet<>(); + /** + * worker node info + */ + private final Map workerNodeInfo = new HashMap<>(); + + /** + * executor service + */ + private ScheduledExecutorService executorService; + + /** + * zk client + */ + @Autowired + private ZKClient zkClient; + /** * zookeeper registry center */ @Autowired private ZookeeperRegistryCenter registryCenter; + /** + * worker group mapper + */ + @Autowired + private WorkerGroupMapper workerGroupMapper; + /** * alert dao */ @@ -87,9 +127,14 @@ public class ZookeeperNodeManager implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { /** - * load nodes from zookeeper + * load nodes from zookeeper */ load(); + /** + * init executor service + */ + executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ServerNodeManagerExecutor")); + executorService.scheduleWithFixedDelay(new WorkerNodeInfoAndGroupDbSyncTask(), 0, 10, TimeUnit.SECONDS); /** * init MasterNodeListener listener */ @@ -103,22 +148,59 @@ public class ZookeeperNodeManager implements InitializingBean { /** * load nodes from zookeeper */ - private void load(){ + private void load() { /** * master nodes from zookeeper */ - Set masterNodes = registryCenter.getMasterNodesDirectly(); - syncMasterNodes(masterNodes); + Set initMasterNodes = registryCenter.getMasterNodesDirectly(); + syncMasterNodes(initMasterNodes); /** * worker group nodes from zookeeper */ Set workerGroups = registryCenter.getWorkerGroupDirectly(); - for(String workerGroup : workerGroups){ + for (String workerGroup : workerGroups) { syncWorkerGroupNodes(workerGroup, registryCenter.getWorkerGroupNodesDirectly(workerGroup)); } } + /** + * zookeeper client + */ + @Component + static class ZKClient extends AbstractZKClient {} + + /** + * worker node info and worker group db sync task + */ + class WorkerNodeInfoAndGroupDbSyncTask implements Runnable { + + @Override + public void run() { + // sync worker node info + Map newWorkerNodeInfo = zkClient.getServerMaps(ZKNodeType.WORKER, true); + syncWorkerNodeInfo(newWorkerNodeInfo); + + // sync worker group nodes from database + List workerGroupList = workerGroupMapper.queryAllWorkerGroup(); + if (CollectionUtils.isNotEmpty(workerGroupList)) { + for (WorkerGroup wg : workerGroupList) { + String workerGroup = wg.getName(); + Set nodes = new HashSet<>(); + String[] addrs = wg.getAddrList().split(Constants.COMMA); + for (String addr : addrs) { + if (newWorkerNodeInfo.containsKey(addr)) { + nodes.add(addr); + } + } + if (!nodes.isEmpty()) { + syncWorkerGroupNodes(workerGroup, nodes); + } + } + } + } + } + /** * worker group node listener */ @@ -126,44 +208,38 @@ public class ZookeeperNodeManager implements InitializingBean { @Override protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) { - if(registryCenter.isWorkerPath(path)){ + if (registryCenter.isWorkerPath(path)) { try { if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { logger.info("worker group node : {} added.", path); String group = parseGroup(path); - Set workerNodes = workerGroupNodes.getOrDefault(group, new HashSet<>()); - Set previousNodes = new HashSet<>(workerNodes); Set currentNodes = registryCenter.getWorkerGroupNodesDirectly(group); logger.info("currentNodes : {}", currentNodes); syncWorkerGroupNodes(group, currentNodes); } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) { logger.info("worker group node : {} down.", path); String group = parseGroup(path); - Set workerNodes = workerGroupNodes.getOrDefault(group, new HashSet<>()); - Set previousNodes = new HashSet<>(workerNodes); Set currentNodes = registryCenter.getWorkerGroupNodesDirectly(group); syncWorkerGroupNodes(group, currentNodes); alertDao.sendServerStopedAlert(1, path, "WORKER"); } - } catch (IllegalArgumentException ignore) { - logger.warn(ignore.getMessage()); + } catch (IllegalArgumentException ex) { + logger.warn(ex.getMessage()); } catch (Exception ex) { logger.error("WorkerGroupListener capture data change and get data failed", ex); } } } - private String parseGroup(String path){ - String[] parts = path.split("\\/"); - if(parts.length < 6){ + private String parseGroup(String path) { + String[] parts = path.split("/"); + if (parts.length < 6) { throw new IllegalArgumentException(String.format("worker group path : %s is not valid, ignore", path)); } - String group = parts[parts.length - 2]; - return group; + return parts[parts.length - 2]; } } - /** * master node listener */ @@ -175,12 +251,10 @@ public class ZookeeperNodeManager implements InitializingBean { try { if (event.getType() == TreeCacheEvent.Type.NODE_ADDED) { logger.info("master node : {} added.", path); - Set previousNodes = new HashSet<>(masterNodes); Set currentNodes = registryCenter.getMasterNodesDirectly(); syncMasterNodes(currentNodes); } else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED) { logger.info("master node : {} down.", path); - Set previousNodes = new HashSet<>(masterNodes); Set currentNodes = registryCenter.getMasterNodesDirectly(); syncMasterNodes(currentNodes); alertDao.sendServerStopedAlert(1, path, "MASTER"); @@ -209,7 +283,7 @@ public class ZookeeperNodeManager implements InitializingBean { * sync master nodes * @param nodes master nodes */ - private void syncMasterNodes(Set nodes){ + private void syncMasterNodes(Set nodes) { masterLock.lock(); try { masterNodes.clear(); @@ -224,7 +298,7 @@ public class ZookeeperNodeManager implements InitializingBean { * @param workerGroup worker group * @param nodes worker nodes */ - private void syncWorkerGroupNodes(String workerGroup, Set nodes){ + private void syncWorkerGroupNodes(String workerGroup, Set nodes) { workerGroupLock.lock(); try { workerGroup = workerGroup.toLowerCase(); @@ -237,7 +311,7 @@ public class ZookeeperNodeManager implements InitializingBean { } } - public Map> getWorkerGroupNodes(){ + public Map> getWorkerGroupNodes() { return Collections.unmodifiableMap(workerGroupNodes); } @@ -246,15 +320,15 @@ public class ZookeeperNodeManager implements InitializingBean { * @param workerGroup workerGroup * @return worker nodes */ - public Set getWorkerGroupNodes(String workerGroup){ + public Set getWorkerGroupNodes(String workerGroup) { workerGroupLock.lock(); try { - if(StringUtils.isEmpty(workerGroup)){ - workerGroup = DEFAULT_WORKER_GROUP; + if (StringUtils.isEmpty(workerGroup)) { + workerGroup = Constants.DEFAULT_WORKER_GROUP; } workerGroup = workerGroup.toLowerCase(); Set nodes = workerGroupNodes.get(workerGroup); - if(CollectionUtils.isNotEmpty(nodes)){ + if (CollectionUtils.isNotEmpty(nodes)) { return Collections.unmodifiableSet(nodes); } return nodes; @@ -264,9 +338,48 @@ public class ZookeeperNodeManager implements InitializingBean { } /** - * close + * get worker node info + * @return worker node info + */ + public Map getWorkerNodeInfo() { + return Collections.unmodifiableMap(workerNodeInfo); + } + + /** + * get worker node info + * @param workerNode worker node + * @return worker node info + */ + public String getWorkerNodeInfo(String workerNode) { + workerNodeInfoLock.lock(); + try { + return workerNodeInfo.getOrDefault(workerNode, null); + } finally { + workerNodeInfoLock.unlock(); + } + } + + /** + * sync worker node info + * @param newWorkerNodeInfo new worker node info */ - public void close(){ + private void syncWorkerNodeInfo(Map newWorkerNodeInfo) { + workerNodeInfoLock.lock(); + try { + workerNodeInfo.clear(); + workerNodeInfo.putAll(newWorkerNodeInfo); + } finally { + workerNodeInfoLock.unlock(); + } + } + + /** + * destroy + */ + @PreDestroy + public void destroy() { + executorService.shutdownNow(); registryCenter.close(); } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java index 591333bffe..fdbcb8fd7d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java @@ -17,10 +17,8 @@ package org.apache.dolphinscheduler.server.registry; -import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; -import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; @@ -230,9 +228,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { // ip_sequence_no String[] zNodesPath = zNode.split("\\/"); String ipSeqNo = zNodesPath[zNodesPath.length - 1]; - - String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; - String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; + String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo; return !registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java index aa5600a6c4..961f88f0ab 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/WorkerServer.java @@ -43,11 +43,20 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.WebApplicationType; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.FilterType; /** * worker server */ -@ComponentScan("org.apache.dolphinscheduler") +@ComponentScan(value = "org.apache.dolphinscheduler", excludeFilters = { + @ComponentScan.Filter(type = FilterType.REGEX, pattern = { + "org.apache.dolphinscheduler.server.master.*", + "org.apache.dolphinscheduler.server.monitor.*", + "org.apache.dolphinscheduler.server.log.*", + "org.apache.dolphinscheduler.server.zk.ZKMasterClient", + "org.apache.dolphinscheduler.server.registry.ServerNodeManager" + }) +}) public class WorkerServer implements IStoppable { /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index e9eb7d3d7a..f9d8153ad9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -98,7 +98,7 @@ public class WorkerRegistry { * registry */ public void registry() { - String address = NetUtils.getHost(); + String address = NetUtils.getAddr(workerConfig.getListenPort()); Set workerZkPaths = getWorkerZkPaths(); int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); @@ -120,10 +120,10 @@ public class WorkerRegistry { } HeartBeatTask heartBeatTask = new HeartBeatTask(this.startTime, - this.workerConfig.getWorkerReservedMemory(), this.workerConfig.getWorkerMaxCpuloadAvg(), + this.workerConfig.getWorkerReservedMemory(), workerZkPaths, - Constants.WORKER_PREFIX, + Constants.WORKER_TYPE, this.zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index fc9694908e..cbabbc4c8c 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.zk; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.model.Server; @@ -29,7 +30,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.master.MasterServer; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -73,7 +73,7 @@ public class ZKMasterClient extends AbstractZKClient { @Autowired private MasterRegistry masterRegistry; - public void start(MasterServer masterServer) { + public void start(IStoppable stoppable) { InterProcessMutex mutex = null; try { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/master @@ -83,7 +83,7 @@ public class ZKMasterClient extends AbstractZKClient { // master registry masterRegistry.registry(); - masterRegistry.getZookeeperRegistryCenter().setStoppable(masterServer); + masterRegistry.getZookeeperRegistryCenter().setStoppable(stoppable); String registryPath = this.masterRegistry.getMasterPath(); masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registryPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP); @@ -280,7 +280,7 @@ public class ZKMasterClient extends AbstractZKClient { return false; } Date workerServerStartDate = null; - List workerServers = getServersList(ZKNodeType.WORKER); + List workerServers = getServerList(ZKNodeType.WORKER); for (Server workerServer : workerServers) { if (taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())) { workerServerStartDate = workerServer.getCreateTime(); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 2c2a1b5e3a..6b0841b37e 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -36,10 +37,12 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; @@ -47,8 +50,6 @@ import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue; import org.apache.dolphinscheduler.service.zk.RegisterOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; -import org.apache.curator.CuratorZookeeperClient; - import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -67,11 +68,10 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, - NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, - ZookeeperNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class}) + NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, ZKMasterClient.class, TaskPriorityQueueConsumer.class, + ServerNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class, MasterRegistry.class, SpringConnectionFactory.class}) public class TaskPriorityQueueConsumerTest { - @Autowired private TaskPriorityQueue taskPriorityQueue; diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java index 958df01cf8..b5d6f27448 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/ExecutorDispatcherTest.java @@ -17,13 +17,14 @@ package org.apache.dolphinscheduler.server.master.dispatch; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; @@ -46,7 +48,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, WorkerRegistry.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, WorkerConfig.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) + ServerNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, SpringConnectionFactory.class}) public class ExecutorDispatcherTest { @Autowired diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java index 1ecc2f302d..b1fd4e8342 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/executor/NettyExecutorManagerTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.dispatch.executor; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; @@ -30,7 +31,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor; @@ -53,7 +54,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, WorkerRegistry.class, - ZookeeperNodeManager.class, ZookeeperRegistryCenter.class, WorkerConfig.class, + ServerNodeManager.class, ZookeeperRegistryCenter.class, WorkerConfig.class, SpringConnectionFactory.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, SpringApplicationContext.class, NettyExecutorManager.class}) public class NettyExecutorManagerTest { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java index a06dbce7a1..8923547214 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/dispatch/host/RoundRobinHostManagerTest.java @@ -14,66 +14,51 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.master.dispatch.host; +package org.apache.dolphinscheduler.server.master.dispatch.host; -import org.apache.dolphinscheduler.common.utils.NetUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; -import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; -import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.utils.ExecutionContextTestUtils; -import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; -import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistry; -import org.apache.dolphinscheduler.server.zk.SpringZKServer; -import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; -import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringRunner; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import com.google.common.collect.Sets; /** * round robin host manager test */ -@RunWith(SpringRunner.class) -@ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, WorkerRegistry.class, ZookeeperRegistryCenter.class, WorkerConfig.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) +@RunWith(MockitoJUnitRunner.class) public class RoundRobinHostManagerTest { + @Mock + private ServerNodeManager serverNodeManager; - @Autowired - private ZookeeperNodeManager zookeeperNodeManager; - - @Autowired - private WorkerRegistry workerRegistry; - - @Autowired - private WorkerConfig workerConfig; + @InjectMocks + RoundRobinHostManager roundRobinHostManager; @Test - public void testSelectWithEmptyResult(){ - RoundRobinHostManager roundRobinHostManager = new RoundRobinHostManager(); - roundRobinHostManager.setZookeeperNodeManager(zookeeperNodeManager); + public void testSelectWithEmptyResult() { + Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(null); ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000); Host emptyHost = roundRobinHostManager.select(context); Assert.assertTrue(StringUtils.isEmpty(emptyHost.getAddress())); } @Test - public void testSelectWithResult(){ - workerRegistry.registry(); - RoundRobinHostManager roundRobinHostManager = new RoundRobinHostManager(); - roundRobinHostManager.setZookeeperNodeManager(zookeeperNodeManager); + public void testSelectWithResult() { + Mockito.when(serverNodeManager.getWorkerGroupNodes("default")).thenReturn(Sets.newHashSet("192.168.1.1:22")); ExecutionContext context = ExecutionContextTestUtils.getExecutionContext(10000); Host host = roundRobinHostManager.select(context); Assert.assertTrue(StringUtils.isNotEmpty(host.getAddress())); - Assert.assertTrue(host.getAddress().equalsIgnoreCase(NetUtils.getAddr(workerConfig.getListenPort()))); + Assert.assertTrue(host.getAddress().equalsIgnoreCase("192.168.1.1:22")); } } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java index 31222ff64e..094684835c 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseServiceTest.java @@ -18,12 +18,16 @@ package org.apache.dolphinscheduler.server.master.processor.queue; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import java.util.Date; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -31,11 +35,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import java.util.Date; - @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, TaskResponseService.class, ZookeeperRegistryCenter.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class, TaskResponseService.class}) + ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class, TaskResponseService.class, + SpringConnectionFactory.class}) public class TaskResponseServiceTest { @Autowired diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java index 83095a371a..e2b8d813e0 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/runner/MasterTaskExecThreadTest.java @@ -18,18 +18,23 @@ package org.apache.dolphinscheduler.server.master.runner; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; import org.apache.dolphinscheduler.server.registry.DependencyConfig; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; + +import java.util.HashSet; +import java.util.Set; + import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; @@ -37,13 +42,10 @@ import org.springframework.context.ApplicationContext; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; -import java.util.HashSet; -import java.util.Set; - @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, - ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class}) + ServerNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class, SpringConnectionFactory.class}) public class MasterTaskExecThreadTest { diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java similarity index 81% rename from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java rename to dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java index faa7b9b9d5..867a15c7e4 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ZookeeperNodeManagerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/registry/ServerNodeManagerTest.java @@ -14,11 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.registry; +package org.apache.dolphinscheduler.server.registry; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.NetUtils; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -38,16 +39,16 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** - * zookeeper node manager test + * server node manager test */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes={DependencyConfig.class, SpringZKServer.class, MasterRegistry.class,WorkerRegistry.class, - ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, - ZookeeperCachedOperator.class, ZookeeperConfig.class, ZookeeperNodeManager.class}) -public class ZookeeperNodeManagerTest { + ZookeeperRegistryCenter.class, MasterConfig.class, WorkerConfig.class, SpringConnectionFactory.class, + ZookeeperCachedOperator.class, ZookeeperConfig.class, ServerNodeManager.class}) +public class ServerNodeManagerTest { @Autowired - private ZookeeperNodeManager zookeeperNodeManager; + private ServerNodeManager serverNodeManager; @Autowired private MasterRegistry masterRegistry; @@ -68,11 +69,11 @@ public class ZookeeperNodeManagerTest { public void testGetMasterNodes(){ masterRegistry.registry(); try { - //let the zookeeperNodeManager catch the registry event + //let the serverNodeManager catch the registry event Thread.sleep(2000); } catch (InterruptedException ignore) { } - Set masterNodes = zookeeperNodeManager.getMasterNodes(); + Set masterNodes = serverNodeManager.getMasterNodes(); Assert.assertTrue(CollectionUtils.isNotEmpty(masterNodes)); Assert.assertEquals(1, masterNodes.size()); Assert.assertEquals(NetUtils.getAddr(masterConfig.getListenPort()), masterNodes.iterator().next()); @@ -82,11 +83,11 @@ public class ZookeeperNodeManagerTest { public void testGetWorkerGroupNodes(){ workerRegistry.registry(); try { - //let the zookeeperNodeManager catch the registry event + //let the serverNodeManager catch the registry event Thread.sleep(2000); } catch (InterruptedException ignore) { } - Map> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes(); + Map> workerGroupNodes = serverNodeManager.getWorkerGroupNodes(); Assert.assertEquals(1, workerGroupNodes.size()); Assert.assertEquals("default".trim(), workerGroupNodes.keySet().iterator().next()); } @@ -95,12 +96,11 @@ public class ZookeeperNodeManagerTest { public void testGetWorkerGroupNodesWithParam(){ workerRegistry.registry(); try { - //let the zookeeperNodeManager catch the registry event + //let the serverNodeManager catch the registry event Thread.sleep(3000); } catch (InterruptedException ignore) { } - Map> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes(); - Set workerNodes = zookeeperNodeManager.getWorkerGroupNodes("default"); + Set workerNodes = serverNodeManager.getWorkerGroupNodes("default"); Assert.assertTrue(CollectionUtils.isNotEmpty(workerNodes)); Assert.assertEquals(1, workerNodes.size()); Assert.assertEquals(NetUtils.getAddr(workerConfig.getListenPort()), workerNodes.iterator().next()); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index f228802f8f..7d1d24d36e 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; @@ -31,7 +32,7 @@ import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor; import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor; import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; -import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; +import org.apache.dolphinscheduler.server.registry.ServerNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; @@ -61,24 +62,11 @@ import io.netty.channel.Channel; */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = { - TaskCallbackServiceTestConfig.class, - SpringZKServer.class, - SpringApplicationContext.class, - MasterRegistry.class, - WorkerRegistry.class, - ZookeeperRegistryCenter.class, - MasterConfig.class, - WorkerConfig.class, - RegisterOperator.class, - ZookeeperConfig.class, - ZookeeperNodeManager.class, - TaskCallbackService.class, - TaskResponseService.class, - TaskAckProcessor.class, - TaskResponseProcessor.class, - TaskExecuteProcessor.class, - CuratorZookeeperClient.class, - TaskExecutionContextCacheManagerImpl.class}) + TaskCallbackServiceTestConfig.class, SpringZKServer.class, SpringApplicationContext.class, + SpringConnectionFactory.class, MasterRegistry.class, WorkerRegistry.class, ZookeeperRegistryCenter.class, + MasterConfig.class, WorkerConfig.class, RegisterOperator.class, ZookeeperConfig.class, ServerNodeManager.class, + TaskCallbackService.class, TaskResponseService.class, TaskAckProcessor.class, TaskResponseProcessor.class, + TaskExecuteProcessor.class, CuratorZookeeperClient.class, TaskExecutionContextCacheManagerImpl.class}) public class TaskCallbackServiceTest { @Autowired diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index 24cdb89a06..f4501bb00e 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -17,14 +17,8 @@ package org.apache.dolphinscheduler.service.zk; -import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP; import static org.apache.dolphinscheduler.common.Constants.COLON; -import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP; import static org.apache.dolphinscheduler.common.Constants.DIVISION_STRING; -import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; -import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; -import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; -import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; @@ -35,9 +29,12 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,59 +79,123 @@ public abstract class AbstractZKClient extends RegisterOperator { * @param zkNodeType zookeeper node type * @return server list */ - public List getServersList(ZKNodeType zkNodeType) { - Map masterMap = getServerMaps(zkNodeType); + public List getServerList(ZKNodeType zkNodeType) { + Map serverMaps = getServerMaps(zkNodeType); String parentPath = getZNodeParentPath(zkNodeType); - List masterServers = new ArrayList<>(); - for (Map.Entry entry : masterMap.entrySet()) { - Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); - if (masterServer == null) { + List serverList = new ArrayList<>(); + for (Map.Entry entry : serverMaps.entrySet()) { + Server server = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); + if (server == null) { continue; } String key = entry.getKey(); - masterServer.setZkDirectory(parentPath + "/" + key); - //set host and port + server.setZkDirectory(parentPath + "/" + key); + // set host and port String[] hostAndPort = key.split(COLON); String[] hosts = hostAndPort[0].split(DIVISION_STRING); // fetch the last one - masterServer.setHost(hosts[hosts.length - 1]); - masterServer.setPort(Integer.parseInt(hostAndPort[1])); - masterServers.add(masterServer); + server.setHost(hosts[hosts.length - 1]); + server.setPort(Integer.parseInt(hostAndPort[1])); + serverList.add(server); } - return masterServers; + return serverList; } /** - * get master server list map. + * get server zk nodes. * * @param zkNodeType zookeeper node type - * @return result : {host : resource info} + * @return result : list */ - public Map getServerMaps(ZKNodeType zkNodeType) { + public List getServerZkNodes(ZKNodeType zkNodeType) { + String path = getZNodeParentPath(zkNodeType); + List serverList = super.getChildrenKeys(path); + if (zkNodeType == ZKNodeType.WORKER) { + List workerList = new ArrayList<>(); + for (String group : serverList) { + List groupServers = super.getChildrenKeys(path + Constants.SLASH + group); + for (String groupServer : groupServers) { + workerList.add(group + Constants.SLASH + groupServer); + } + } + serverList = workerList; + } + return serverList; + } - Map masterMap = new HashMap<>(); + /** + * get server list map. + * + * @param zkNodeType zookeeper node type + * @param hostOnly host only + * @return result : {host : resource info} + */ + public Map getServerMaps(ZKNodeType zkNodeType, boolean hostOnly) { + Map serverMap = new HashMap<>(); try { String path = getZNodeParentPath(zkNodeType); - List serverList = super.getChildrenKeys(path); - if (zkNodeType == ZKNodeType.WORKER) { - List workerList = new ArrayList<>(); - for (String group : serverList) { - List groupServers = super.getChildrenKeys(path + Constants.SLASH + group); - for (String groupServer : groupServers) { - workerList.add(group + Constants.SLASH + groupServer); - } + List serverList = getServerZkNodes(zkNodeType); + for (String server : serverList) { + String host = server; + if (zkNodeType == ZKNodeType.WORKER && hostOnly) { + host = server.split(Constants.SLASH)[1]; } - serverList = workerList; + serverMap.putIfAbsent(host, super.get(path + Constants.SLASH + server)); } + } catch (Exception e) { + logger.error("get server list failed", e); + } + + return serverMap; + } + + /** + * get server list map. + * + * @param zkNodeType zookeeper node type + * @return result : {host : resource info} + */ + public Map getServerMaps(ZKNodeType zkNodeType) { + return getServerMaps(zkNodeType, false); + } + + /** + * get server node set. + * + * @param zkNodeType zookeeper node type + * @param hostOnly host only + * @return result : set + */ + public Set getServerNodeSet(ZKNodeType zkNodeType, boolean hostOnly) { + Set serverSet = new HashSet<>(); + try { + List serverList = getServerZkNodes(zkNodeType); for (String server : serverList) { - masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server)); + String host = server; + if (zkNodeType == ZKNodeType.WORKER && hostOnly) { + host = server.split(Constants.SLASH)[1]; + } + serverSet.add(host); } } catch (Exception e) { - logger.error("get server list failed", e); + logger.error("get server node set failed", e); } + return serverSet; + } - return masterMap; + /** + * get server node list. + * + * @param zkNodeType zookeeper node type + * @param hostOnly host only + * @return result : list + */ + public List getServerNodeList(ZKNodeType zkNodeType, boolean hostOnly) { + Set serverSet = getServerNodeSet(zkNodeType, hostOnly); + List serverList = new ArrayList<>(serverSet); + Collections.sort(serverList); + return serverList; } /** @@ -151,7 +212,7 @@ public abstract class AbstractZKClient extends RegisterOperator { host, zkNodeType); return false; } - Map serverMaps = getServerMaps(zkNodeType); + Map serverMaps = getServerMaps(zkNodeType, true); for (String hostKey : serverMaps.keySet()) { if (hostKey.contains(host)) { return true; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java index 0fd4a4fa92..dd0b27d359 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java @@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.service.zk; import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP; import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP; -import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; -import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; @@ -99,7 +99,7 @@ public class RegisterOperator extends ZookeeperCachedOperator { */ public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception { String host = getHostByEventDataPath(zNode); - String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; + String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE; //check server restart, if restart , dead server path in zk should be delete if (opType.equals(DELETE_ZK_OP)) { @@ -130,7 +130,7 @@ public class RegisterOperator extends ZookeeperCachedOperator { */ public void handleDeadServer(Set zNodeSet, ZKNodeType zkNodeType, String opType) throws Exception { - String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; + String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE; for (String zNode : zNodeSet) { String host = getHostByEventDataPath(zNode); //check server restart, if restart , dead server path in zk should be delete diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java index 7823c9b1af..cf77080fe3 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java @@ -97,7 +97,7 @@ public class RegisterOperatorTest { testAfterPropertiesSet(); registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP); String path = registerOperator.getDeadZNodeParentPath(); - Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_TYPE,MASTER_NODE))); } @@ -107,10 +107,10 @@ public class RegisterOperatorTest { String path = registerOperator.getDeadZNodeParentPath(); registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP); - Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_TYPE,MASTER_NODE))); - registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_PREFIX); - Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_TYPE); + Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_TYPE,MASTER_NODE))); } @Test diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/queue/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/queue/_source/list.vue index dc3e8b007c..de737ed404 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/queue/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/queue/_source/list.vue @@ -34,7 +34,7 @@ {{$t('Update Time')}} - + {{$t('Operation')}} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/_source/list.vue index d0029d27fd..3b490dfeda 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/_source/list.vue @@ -40,7 +40,7 @@ {{$t('Update Time')}} - + {{$t('Operation')}} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue index ead385c109..20cc35f07c 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue @@ -47,7 +47,7 @@ {{$t('Update Time')}} - + {{$t('Operation')}} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/list.vue index 528693af7d..58344edbc4 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/warningGroups/_source/list.vue @@ -37,7 +37,7 @@ {{$t('Update Time')}} - + {{$t('Operation')}} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/createWorker.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/createWorker.vue index dd42565475..f594d96ea5 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/createWorker.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/createWorker.vue @@ -34,17 +34,9 @@ - + @@ -56,6 +48,8 @@ import store from '@/conf/home/store' import mPopup from '@/module/components/popup/popup' import mListBoxF from '@/module/components/listBoxF/listBoxF' + import Treeselect from '@riophae/vue-treeselect' + import '@riophae/vue-treeselect/dist/vue-treeselect.css' export default { name: 'create-warning', @@ -64,11 +58,12 @@ store, id: 0, name: '', - ipList: '' + addrList: [] } }, props: { - item: Object + item: Object, + workerAddressList: Object }, methods: { _ok () { @@ -77,28 +72,14 @@ this._submit() } }, - checkIsIps(ips) { - let reg = /^(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])\.(\d{1,2}|1\d\d|2[0-4]\d|25[0-5])$/ - let valdata = ips.split(','); - for(let i=0;i - \ No newline at end of file + \ No newline at end of file diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/list.vue index 5991196389..1c84957a3d 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/_source/list.vue @@ -26,7 +26,7 @@ {{$t('Group')}} - IPList + {{$t('Addresses')}} {{$t('Create Time')}} @@ -34,18 +34,19 @@ {{$t('Update Time')}} + + {{$t('Operation')}} + {{parseInt(pageNo === 1 ? ($index + 1) : (($index + 1) + (pageSize * (pageNo - 1))))}} - - {{item.name}} - + {{item.name}} - {{item.ipList.join(',')}} + {{item.addrList}} {{item.createTime | formatDate}} @@ -55,6 +56,24 @@ {{item.updateTime | formatDate}} - + + + +

{{$t('Delete?')}}

+
+ {{$t('Cancel')}} + {{$t('Confirm')}} +
+ +
+ diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/index.vue index 32bb8620a6..933d352899 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/index.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/workerGroups/index.vue @@ -17,7 +17,11 @@