diff --git a/docker/build/conf/dolphinscheduler/master.properties.tpl b/docker/build/conf/dolphinscheduler/master.properties.tpl index 7b07dd7d62..3d7910fe67 100644 --- a/docker/build/conf/dolphinscheduler/master.properties.tpl +++ b/docker/build/conf/dolphinscheduler/master.properties.tpl @@ -15,7 +15,7 @@ # limitations under the License. # -# master execute thread num +# master execute thread number master.exec.threads=${MASTER_EXEC_THREADS} # master execute task number in parallel diff --git a/docker/build/conf/dolphinscheduler/worker.properties.tpl b/docker/build/conf/dolphinscheduler/worker.properties.tpl index 8f5907ede6..7dd69355dc 100644 --- a/docker/build/conf/dolphinscheduler/worker.properties.tpl +++ b/docker/build/conf/dolphinscheduler/worker.properties.tpl @@ -15,12 +15,15 @@ # limitations under the License. # -# worker execute thread num +# worker execute thread number worker.exec.threads=${WORKER_EXEC_THREADS} # worker heartbeat interval worker.heartbeat.interval=${WORKER_HEARTBEAT_INTERVAL} +# worker host weight to dispatch tasks, default value 100 +worker.host.weight=${WORKER_HOST_WEIGHT} + # worker tenant auto create worker.tenant.auto.create=true @@ -33,11 +36,8 @@ worker.reserved.memory=${WORKER_RESERVED_MEMORY} # worker listener port worker.listen.port=${WORKER_LISTEN_PORT} -# default worker groups +# default worker groups, if this worker belongs different groups, you can config the following like that 'worker.groups=default,test' worker.groups=${WORKER_GROUPS} -# default worker host weight -worker.host.weight=${WORKER_HOST_WEIGHT} - # alert server listener host alert.listen.host=${ALERT_LISTEN_HOST} diff --git a/docker/kubernetes/dolphinscheduler/README.md b/docker/kubernetes/dolphinscheduler/README.md index a6f474af62..f481cf914e 100644 --- a/docker/kubernetes/dolphinscheduler/README.md +++ b/docker/kubernetes/dolphinscheduler/README.md @@ -134,7 +134,7 @@ The Configuration file is `values.yaml`, and the following tables lists the conf | `master.tolerations` | If specified, the pod's tolerations | `{}` | | `master.resources` | The `resource` limit and request config for master server | `{}` | | `master.configmap.DOLPHINSCHEDULER_OPTS` | The java options for master server | `""` | -| `master.configmap.MASTER_EXEC_THREADS` | Master execute thread num | `100` | +| `master.configmap.MASTER_EXEC_THREADS` | Master execute thread number | `100` | | `master.configmap.MASTER_EXEC_TASK_NUM` | Master execute task number in parallel | `20` | | `master.configmap.MASTER_HEARTBEAT_INTERVAL` | Master heartbeat interval | `10` | | `master.configmap.MASTER_TASK_COMMIT_RETRYTIMES` | Master commit task retry times | `5` | @@ -167,13 +167,13 @@ The Configuration file is `values.yaml`, and the following tables lists the conf | `worker.tolerations` | If specified, the pod's tolerations | `{}` | | `worker.resources` | The `resource` limit and request config for worker server | `{}` | | `worker.configmap.DOLPHINSCHEDULER_OPTS` | The java options for worker server | `""` | -| `worker.configmap.WORKER_EXEC_THREADS` | Worker execute thread num | `100` | +| `worker.configmap.WORKER_EXEC_THREADS` | Worker execute thread number | `100` | | `worker.configmap.WORKER_HEARTBEAT_INTERVAL` | Worker heartbeat interval | `10` | | `worker.configmap.WORKER_MAX_CPULOAD_AVG` | Only less than cpu avg load, worker server can work. default value : the number of cpu cores * 2 | `100` | | `worker.configmap.WORKER_RESERVED_MEMORY` | Only larger than reserved memory, worker server can work. default value : physical memory * 1/10, unit is G | `0.1` | | `worker.configmap.WORKER_LISTEN_PORT` | Worker listen port | `1234` | | `worker.configmap.WORKER_GROUPS` | Worker groups | `default` | -| `worker.configmap.WORKER_HOST_WEIGHT` | Worker host weight | `100` | +| `worker.configmap.WORKER_HOST_WEIGHT` | Worker host weight | `100` | | `worker.livenessProbe.enabled` | Turn on and off liveness probe | `true` | | `worker.livenessProbe.initialDelaySeconds` | Delay before liveness probe is initiated | `30` | | `worker.livenessProbe.periodSeconds` | How often to perform the probe | `30` | diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java index 00418ab64e..c2c74ce3da 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java @@ -17,10 +17,13 @@ package org.apache.dolphinscheduler.api.controller; +import static org.apache.dolphinscheduler.api.enums.Status.DELETE_WORKER_GROUP_FAIL; import static org.apache.dolphinscheduler.api.enums.Status.QUERY_WORKER_GROUP_FAIL; +import static org.apache.dolphinscheduler.api.enums.Status.SAVE_ERROR; import org.apache.dolphinscheduler.api.exceptions.ApiException; import org.apache.dolphinscheduler.api.service.WorkerGroupService; +import org.apache.dolphinscheduler.api.utils.RegexUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.utils.ParameterUtils; @@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestAttribute; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; @@ -58,6 +62,35 @@ public class WorkerGroupController extends BaseController { @Autowired WorkerGroupService workerGroupService; + /** + * create or update a worker group + * + * @param loginUser login user + * @param id worker group id + * @param name worker group name + * @param addrList addr list + * @return create or update result code + */ + @ApiOperation(value = "saveWorkerGroup", notes = "CREATE_WORKER_GROUP_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"), + @ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"), + @ApiImplicitParam(name = "addrList", value = "WORKER_ADDR_LIST", required = true, dataType = "String") + }) + @PostMapping(value = "/save") + @ResponseStatus(HttpStatus.OK) + @ApiException(SAVE_ERROR) + public Result saveWorkerGroup(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "id", required = false, defaultValue = "0") int id, + @RequestParam(value = "name") String name, + @RequestParam(value = "addrList") String addrList + ) { + logger.info("save worker group: login user {}, id:{}, name: {}, addrList: {} ", + RegexUtils.escapeNRT(loginUser.getUserName()), id, RegexUtils.escapeNRT(name), RegexUtils.escapeNRT(addrList)); + Map result = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList); + return returnDataList(result); + } + /** * query worker groups paging * @@ -69,21 +102,20 @@ public class WorkerGroupController extends BaseController { */ @ApiOperation(value = "queryAllWorkerGroupsPaging", notes = "QUERY_WORKER_GROUP_PAGING_NOTES") @ApiImplicitParams({ - @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", dataType = "Int", example = "10", defaultValue = "0"), - @ApiImplicitParam(name = "name", value = "WORKER_GROUP_NAME", required = true, dataType = "String"), - @ApiImplicitParam(name = "ipList", value = "WORKER_IP_LIST", required = true, dataType = "String") + @ApiImplicitParam(name = "pageNo", value = "PAGE_NO", dataType = "Int", example = "1"), + @ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", dataType = "Int", example = "20"), + @ApiImplicitParam(name = "searchVal", value = "SEARCH_VAL", dataType = "String") }) @GetMapping(value = "/list-paging") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_WORKER_GROUP_FAIL) public Result queryAllWorkerGroupsPaging(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @RequestParam("pageNo") Integer pageNo, - @RequestParam(value = "searchVal", required = false) String searchVal, - @RequestParam("pageSize") Integer pageSize + @RequestParam("pageSize") Integer pageSize, + @RequestParam(value = "searchVal", required = false) String searchVal ) { logger.info("query all worker group paging: login user {}, pageNo:{}, pageSize:{}, searchVal:{}", - loginUser.getUserName(), pageNo, pageSize, searchVal); - + RegexUtils.escapeNRT(loginUser.getUserName()), pageNo, pageSize, searchVal); searchVal = ParameterUtils.handleEscapes(searchVal); Map result = workerGroupService.queryAllGroupPaging(loginUser, pageNo, pageSize, searchVal); return returnDataListPaging(result); @@ -99,14 +131,32 @@ public class WorkerGroupController extends BaseController { @GetMapping(value = "/all-groups") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_WORKER_GROUP_FAIL) - public Result queryAllWorkerGroups(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser - ) { - logger.info("query all worker group: login user {}", - loginUser.getUserName()); - + public Result queryAllWorkerGroups(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser) { + logger.info("query all worker group: login user {}", RegexUtils.escapeNRT(loginUser.getUserName())); Map result = workerGroupService.queryAllGroup(); return returnDataList(result); } + /** + * delete worker group by id + * + * @param loginUser login user + * @param id group id + * @return delete result code + */ + @ApiOperation(value = "deleteById", notes = "DELETE_WORKER_GROUP_BY_ID_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "id", value = "WORKER_GROUP_ID", required = true, dataType = "Int", example = "10"), + }) + @PostMapping(value = "/delete-by-id") + @ResponseStatus(HttpStatus.OK) + @ApiException(DELETE_WORKER_GROUP_FAIL) + public Result deleteById(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam("id") Integer id + ) { + logger.info("delete worker group: login user {}, id:{} ", RegexUtils.escapeNRT(loginUser.getUserName()), id); + Map result = workerGroupService.deleteWorkerGroupById(loginUser, id); + return returnDataList(result); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 64a13dde15..f9ebfdfd37 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -205,6 +205,10 @@ public enum Status { DATASOURCE_OTHER_PARAMS_ILLEGAL(10171, "datasource other params illegal", "数据源其他参数不合法"), DATASOURCE_NAME_ILLEGAL(10172, "datasource name illegal", "数据源名称不合法"), DATASOURCE_HOST_ILLEGAL(10173, "datasource host illegal", "数据源HOST不合法"), + DELETE_WORKER_GROUP_NOT_EXIST(10174, "delete worker group not exist ", "删除worker分组不存在"), + CREATE_WORKER_GROUP_FORBIDDEN_IN_DOCKER(10175, "create worker group forbidden in docker ", "创建worker分组在docker中禁止"), + DELETE_WORKER_GROUP_FORBIDDEN_IN_DOCKER(10176, "delete worker group forbidden in docker ", "删除worker分组在docker中禁止"), + WORKER_ADDRESS_INVALID(10177, "worker address {0} invalid", "worker地址[{0}]无效"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java index ff73e52c42..b00f914c19 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/UsersService.java @@ -70,6 +70,14 @@ public interface UsersService { */ User queryUser(int id); + /** + * query user by ids + * + * @param ids id list + * @return user list + */ + List queryUser(List ids); + /** * query user * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index 2f1ea2c857..a8b11f45e8 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -26,6 +26,17 @@ import java.util.Map; */ public interface WorkerGroupService { + /** + * create or update a worker group + * + * @param loginUser login user + * @param id worker group id + * @param name worker group name + * @param addrList addr list + * @return create or update result code + */ + Map saveWorkerGroup(User loginUser, int id, String name, String addrList); + /** * query worker group paging * @@ -44,4 +55,11 @@ public interface WorkerGroupService { */ Map queryAllGroup(); + /** + * delete worker group by id + * @param id worker group id + * @return delete result code + */ + Map deleteWorkerGroupById(User loginUser, Integer id); + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java index 7f54e3ef95..3cdf1d1192 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/MonitorServiceImpl.java @@ -159,7 +159,7 @@ public class MonitorServiceImpl extends BaseServiceImpl implements MonitorServic checkNotNull(zookeeperMonitor); ZKNodeType zkNodeType = isMaster ? ZKNodeType.MASTER : ZKNodeType.WORKER; - return zookeeperMonitor.getServersList(zkNodeType); + return zookeeperMonitor.getServerList(zkNodeType); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 95e3f5c4a5..54d07f74d9 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -264,10 +264,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce project.getCode(), processDefinition.getCode(), searchVal, executorId, statusArray, host, start, end); List processInstances = processInstanceList.getRecords(); + List userIds = CollectionUtils.transformToList(processInstances, ProcessInstance::getExecutorId); + Map idToUserMap = CollectionUtils.collectionToMap(usersService.queryUser(userIds), User::getId); for (ProcessInstance processInstance : processInstances) { processInstance.setDuration(DateUtils.format2Duration(processInstance.getStartTime(), processInstance.getEndTime())); - User executor = usersService.queryUser(processInstance.getExecutorId()); + User executor = idToUserMap.get(processInstance.getExecutorId()); if (null != executor) { processInstance.setExecutorName(executor.getUserName()); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java index 00cb72079f..34ab22d85f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java @@ -245,6 +245,14 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { return userMapper.selectById(id); } + @Override + public List queryUser(List ids) { + if (CollectionUtils.isEmpty(ids)) { + return new ArrayList<>(); + } + return userMapper.selectByIds(ids); + } + /** * query user * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index e0282de221..381d1e853b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -17,34 +17,34 @@ package org.apache.dolphinscheduler.api.service.impl; -import static org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP; -import static org.apache.dolphinscheduler.common.Constants.SLASH; - import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.WorkerGroupService; import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; -import org.apache.dolphinscheduler.remote.utils.Host; +import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; - +import org.springframework.transaction.annotation.Transactional; /** * worker group service impl @@ -54,12 +54,115 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceImpl.class); + @Autowired + WorkerGroupMapper workerGroupMapper; + @Autowired protected ZookeeperCachedOperator zookeeperCachedOperator; + @Autowired + private ZookeeperMonitor zookeeperMonitor; + @Autowired ProcessInstanceMapper processInstanceMapper; + /** + * create or update a worker group + * + * @param loginUser login user + * @param id worker group id + * @param name worker group name + * @param addrList addr list + * @return create or update result code + */ + @Override + public Map saveWorkerGroup(User loginUser, int id, String name, String addrList) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + if (Constants.DOCKER_MODE && !Constants.KUBERNETES_MODE) { + putMsg(result, Status.CREATE_WORKER_GROUP_FORBIDDEN_IN_DOCKER); + return result; + } + if (StringUtils.isEmpty(name)) { + putMsg(result, Status.NAME_NULL); + return result; + } + Date now = new Date(); + WorkerGroup workerGroup; + if (id != 0) { + workerGroup = workerGroupMapper.selectById(id); + // check exist + if (workerGroup == null) { + workerGroup = new WorkerGroup(); + workerGroup.setCreateTime(now); + } + } else { + workerGroup = new WorkerGroup(); + workerGroup.setCreateTime(now); + } + workerGroup.setName(name); + workerGroup.setAddrList(addrList); + workerGroup.setUpdateTime(now); + + if (checkWorkerGroupNameExists(workerGroup)) { + putMsg(result, Status.NAME_EXIST, workerGroup.getName()); + return result; + } + String invalidAddr = checkWorkerGroupAddrList(workerGroup); + if (invalidAddr != null) { + putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr); + return result; + } + if (workerGroup.getId() != 0) { + workerGroupMapper.updateById(workerGroup); + } else { + workerGroupMapper.insert(workerGroup); + } + putMsg(result, Status.SUCCESS); + return result; + } + + /** + * check worker group name exists + * @param workerGroup worker group + * @return boolean + */ + private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) { + List workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName()); + if (CollectionUtils.isNotEmpty(workerGroupList)) { + // new group has same name + if (workerGroup.getId() == 0) { + return true; + } + // check group id + for (WorkerGroup group : workerGroupList) { + if (group.getId() != workerGroup.getId()) { + return true; + } + } + } + // check zookeeper + String workerGroupPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS + Constants.SLASH + workerGroup.getName(); + return zookeeperCachedOperator.isExisted(workerGroupPath); + } + + /** + * check worker group addr list + * @param workerGroup worker group + * @return boolean + */ + private String checkWorkerGroupAddrList(WorkerGroup workerGroup) { + Map serverMaps = zookeeperMonitor.getServerMaps(ZKNodeType.WORKER, true); + for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) { + if (!serverMaps.containsKey(addr)) { + return addr; + } + } + return null; + } + /** * query worker group paging * @@ -82,7 +185,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro } List workerGroups = getWorkerGroups(true); - List resultDataList = new ArrayList<>(); if (CollectionUtils.isNotEmpty(workerGroups)) { @@ -98,10 +200,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro searchValDataList = workerGroups; } - if (searchValDataList.size() < pageSize) { - toIndex = (pageNo - 1) * pageSize + searchValDataList.size(); + if (fromIndex < searchValDataList.size()) { + if (toIndex > searchValDataList.size()) { + toIndex = searchValDataList.size(); + } + resultDataList = searchValDataList.subList(fromIndex, toIndex); } - resultDataList = searchValDataList.subList(fromIndex, toIndex); } PageInfo pageInfo = new PageInfo<>(pageNo, pageSize); @@ -121,13 +225,16 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro @Override public Map queryAllGroup() { Map result = new HashMap<>(); - List workerGroups = getWorkerGroups(false); - - Set availableWorkerGroupSet = workerGroups.stream() + List availableWorkerGroupList = workerGroups.stream() .map(WorkerGroup::getName) - .collect(Collectors.toSet()); - result.put(Constants.DATA_LIST, availableWorkerGroupSet); + .collect(Collectors.toList()); + int index = availableWorkerGroupList.indexOf(Constants.DEFAULT_WORKER_GROUP); + if (index > -1) { + availableWorkerGroupList.remove(index); + availableWorkerGroupList.add(0, Constants.DEFAULT_WORKER_GROUP); + } + result.put(Constants.DATA_LIST, availableWorkerGroupList); putMsg(result, Status.SUCCESS); return result; } @@ -139,8 +246,10 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro * @return WorkerGroup list */ private List getWorkerGroups(boolean isPaging) { + // worker groups from database + List workerGroups = workerGroupMapper.queryAllWorkerGroup(); + // worker groups from zookeeper String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot() + Constants.ZOOKEEPER_DOLPHINSCHEDULER_WORKERS; - List workerGroups = new ArrayList<>(); List workerGroupList = null; try { workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath); @@ -148,32 +257,70 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro logger.error("getWorkerGroups exception: {}, workerPath: {}, isPaging: {}", e.getMessage(), workerPath, isPaging); } - if (workerGroupList == null || workerGroupList.isEmpty()) { - if (!isPaging) { + if (CollectionUtils.isEmpty(workerGroupList)) { + if (CollectionUtils.isEmpty(workerGroups) && !isPaging) { WorkerGroup wg = new WorkerGroup(); - wg.setName(DEFAULT_WORKER_GROUP); + wg.setName(Constants.DEFAULT_WORKER_GROUP); workerGroups.add(wg); } return workerGroups; } for (String workerGroup : workerGroupList) { - String workerGroupPath = workerPath + SLASH + workerGroup; - List childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); - if (CollectionUtils.isEmpty(childrenNodes)) { + String workerGroupPath = workerPath + Constants.SLASH + workerGroup; + List childrenNodes = null; + try { + childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath); + } catch (Exception e) { + logger.error("getChildrenNodes exception: {}, workerGroupPath: {}", e.getMessage(), workerGroupPath); + } + if (childrenNodes == null || childrenNodes.isEmpty()) { continue; } WorkerGroup wg = new WorkerGroup(); wg.setName(workerGroup); if (isPaging) { - wg.setIpList(childrenNodes.stream().map(node -> Host.of(node).getIp()).collect(Collectors.toList())); - String registeredValue = zookeeperCachedOperator.get(workerGroupPath + SLASH + childrenNodes.get(0)); - wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(",")[6])); - wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(",")[7])); + wg.setAddrList(String.join(Constants.COMMA, childrenNodes)); + String registeredValue = zookeeperCachedOperator.get(workerGroupPath + Constants.SLASH + childrenNodes.get(0)); + wg.setCreateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[6])); + wg.setUpdateTime(DateUtils.stringToDate(registeredValue.split(Constants.COMMA)[7])); + wg.setSystemDefault(true); } workerGroups.add(wg); } return workerGroups; } + /** + * delete worker group by id + * @param id worker group id + * @return delete result code + */ + @Override + @Transactional(rollbackFor = Exception.class) + public Map deleteWorkerGroupById(User loginUser, Integer id) { + Map result = new HashMap<>(); + if (isNotAdmin(loginUser, result)) { + return result; + } + if (Constants.DOCKER_MODE && !Constants.KUBERNETES_MODE) { + putMsg(result, Status.DELETE_WORKER_GROUP_FORBIDDEN_IN_DOCKER); + return result; + } + WorkerGroup workerGroup = workerGroupMapper.selectById(id); + if (workerGroup == null) { + putMsg(result, Status.DELETE_WORKER_GROUP_NOT_EXIST); + return result; + } + List processInstances = processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), Constants.NOT_TERMINATED_STATES); + if (CollectionUtils.isNotEmpty(processInstances)) { + putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size()); + return result; + } + workerGroupMapper.deleteById(id); + processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), ""); + putMsg(result, Status.SUCCESS); + return result; + } + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java index be4754544f..b599695b0d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/ZookeeperMonitor.java @@ -14,92 +14,93 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.api.utils; import org.apache.dolphinscheduler.common.enums.ZKNodeType; -import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.model.Server; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.dao.entity.ZookeeperRecord; import org.apache.dolphinscheduler.service.zk.AbstractZKClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Date; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; /** - * monitor zookeeper info + * monitor zookeeper info */ @Component public class ZookeeperMonitor extends AbstractZKClient { - private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class); - - /** - * - * @return zookeeper info list - */ - public List zookeeperInfoList(){ - String zookeeperServers = getZookeeperQuorum().replaceAll("[\\t\\n\\x0B\\f\\r]", ""); - try{ - return zookeeperInfoList(zookeeperServers); - }catch(Exception e){ - LOG.error(e.getMessage(),e); - } - return null; - } - - /** - * get master servers - * @return master server information - */ - public List getMasterServers(){ - return getServersList(ZKNodeType.MASTER); - } - - /** - * master construct is the same with worker, use the master instead - * @return worker server informations - */ - public List getWorkerServers(){ - return getServersList(ZKNodeType.WORKER); - } - - private static List zookeeperInfoList(String zookeeperServers) { - - List list = new ArrayList<>(5); - - if(StringUtils.isNotBlank(zookeeperServers)){ - String[] zookeeperServersArray = zookeeperServers.split(","); - - for (String zookeeperServer : zookeeperServersArray) { - ZooKeeperState state = new ZooKeeperState(zookeeperServer); - boolean ok = state.ruok(); - if(ok){ - state.getZookeeperInfo(); - } - - int connections = state.getConnections(); - int watches = state.getWatches(); - long sent = state.getSent(); - long received = state.getReceived(); - String mode = state.getMode(); - float minLatency = state.getMinLatency(); - float avgLatency = state.getAvgLatency(); - float maxLatency = state.getMaxLatency(); - int nodeCount = state.getNodeCount(); - int status = ok ? 1 : 0; - Date date = new Date(); - - ZookeeperRecord zookeeperRecord = new ZookeeperRecord(zookeeperServer,connections,watches,sent,received,mode,minLatency,avgLatency,maxLatency,nodeCount,status,date); - list.add(zookeeperRecord); - - } - } - - return list; - } + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperMonitor.class); + + /** + * + * @return zookeeper info list + */ + public List zookeeperInfoList() { + String zookeeperServers = getZookeeperQuorum().replaceAll("[\\t\\n\\x0B\\f\\r]", ""); + try { + return zookeeperInfoList(zookeeperServers); + } catch (Exception e) { + LOG.error(e.getMessage(),e); + } + return null; + } + + /** + * get master servers + * @return master server information + */ + public List getMasterServers() { + return getServerList(ZKNodeType.MASTER); + } + + /** + * master construct is the same with worker, use the master instead + * @return worker server informations + */ + public List getWorkerServers() { + return getServerList(ZKNodeType.WORKER); + } + + private static List zookeeperInfoList(String zookeeperServers) { + + List list = new ArrayList<>(5); + + if (StringUtils.isNotBlank(zookeeperServers)) { + String[] zookeeperServersArray = zookeeperServers.split(","); + + for (String zookeeperServer : zookeeperServersArray) { + ZooKeeperState state = new ZooKeeperState(zookeeperServer); + boolean ok = state.ruok(); + if (ok) { + state.getZookeeperInfo(); + } + + int connections = state.getConnections(); + int watches = state.getWatches(); + long sent = state.getSent(); + long received = state.getReceived(); + String mode = state.getMode(); + float minLatency = state.getMinLatency(); + float avgLatency = state.getAvgLatency(); + float maxLatency = state.getMaxLatency(); + int nodeCount = state.getNodeCount(); + int status = ok ? 1 : 0; + Date date = new Date(); + + ZookeeperRecord zookeeperRecord = new ZookeeperRecord(zookeeperServer,connections,watches,sent,received,mode,minLatency,avgLatency,maxLatency,nodeCount,status,date); + list.add(zookeeperRecord); + + } + } + + return list; + } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java index bba368b32d..fd08ec6f35 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UsersServiceTest.java @@ -180,6 +180,19 @@ public class UsersServiceTest { Assert.assertTrue(queryUser != null); } + @Test + public void testSelectByIds() { + List ids = new ArrayList<>(); + List users = usersService.queryUser(ids); + Assert.assertTrue(users.isEmpty()); + ids.add(1); + List userList = new ArrayList<>(); + userList.add(new User()); + when(userMapper.selectByIds(ids)).thenReturn(userList); + List userList1 = usersService.queryUser(ids); + Assert.assertFalse(userList1.isEmpty()); + } + @Test public void testGetUserIdByName() { User user = new User(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index ce9f239d8a..0930a545f3 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -17,21 +17,25 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl; import org.apache.dolphinscheduler.api.utils.PageInfo; +import org.apache.dolphinscheduler.api.utils.ZookeeperMonitor; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.UserType; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator; import org.apache.dolphinscheduler.service.zk.ZookeeperConfig; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import org.junit.Assert; import org.junit.Before; @@ -51,12 +55,20 @@ public class WorkerGroupServiceTest { @InjectMocks private WorkerGroupServiceImpl workerGroupService; + @Mock + private WorkerGroupMapper workerGroupMapper; + @Mock private ProcessInstanceMapper processInstanceMapper; @Mock private ZookeeperCachedOperator zookeeperCachedOperator; + @Mock + private ZookeeperMonitor zookeeperMonitor; + + private String groupName = "groupName000001"; + @Before public void init() { ZookeeperConfig zookeeperConfig = new ZookeeperConfig(); @@ -79,6 +91,33 @@ public class WorkerGroupServiceTest { Mockito.when(zookeeperCachedOperator.get(workerPath + "/default" + "/" + defaultAddressList.get(0))).thenReturn("0.01,0.17,0.03,25.83,8.0,1.0,2020-07-21 11:17:59,2020-07-21 14:39:20,0,13238"); } + /** + * create or update a worker group + */ + @Test + public void testSaveWorkerGroup() { + // worker server maps + Map serverMaps = new HashMap<>(); + serverMaps.put("127.0.0.1:1234", "0.3,0.07,4.4,7.42,16.0,0.3,2021-03-19 20:17:58,2021-03-19 20:25:29,0,79214"); + Mockito.when(zookeeperMonitor.getServerMaps(ZKNodeType.WORKER, true)).thenReturn(serverMaps); + + User user = new User(); + // general user add + user.setUserType(UserType.GENERAL_USER); + Map result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234"); + Assert.assertEquals(Status.USER_NO_OPERATION_PERM.getMsg(), result.get(Constants.MSG)); + + // success + user.setUserType(UserType.ADMIN_USER); + result = workerGroupService.saveWorkerGroup(user, 0, groupName, "127.0.0.1:1234"); + Assert.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG)); + // group name exist + Mockito.when(workerGroupMapper.selectById(2)).thenReturn(getWorkerGroup(2)); + Mockito.when(workerGroupMapper.queryWorkerGroupByName(groupName)).thenReturn(getList()); + result = workerGroupService.saveWorkerGroup(user, 2, groupName, "127.0.0.1:1234"); + Assert.assertEquals(Status.NAME_EXIST, result.get(Constants.STATUS)); + } + /** * query worker group paging */ @@ -93,17 +132,38 @@ public class WorkerGroupServiceTest { } @Test - public void testQueryAllGroup() throws Exception { + public void testQueryAllGroup() { Map result = workerGroupService.queryAllGroup(); - Set workerGroups = (Set) result.get(Constants.DATA_LIST); + List workerGroups = (List) result.get(Constants.DATA_LIST); Assert.assertEquals(workerGroups.size(), 1); } + /** + * delete group by id + */ + @Test + public void testDeleteWorkerGroupById() { + User user = new User(); + user.setUserType(UserType.ADMIN_USER); + WorkerGroup wg2 = getWorkerGroup(2); + Mockito.when(workerGroupMapper.selectById(2)).thenReturn(wg2); + Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg2.getName(), Constants.NOT_TERMINATED_STATES)).thenReturn(getProcessInstanceList()); + Map result = workerGroupService.deleteWorkerGroupById(user, 1); + Assert.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); + result = workerGroupService.deleteWorkerGroupById(user, 2); + Assert.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(), ((Status) result.get(Constants.STATUS)).getCode()); + // correct + WorkerGroup wg3 = getWorkerGroup(3); + Mockito.when(workerGroupMapper.selectById(3)).thenReturn(wg3); + Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg3.getName(), Constants.NOT_TERMINATED_STATES)).thenReturn(new ArrayList<>()); + result = workerGroupService.deleteWorkerGroupById(user, 3); + Assert.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG)); + } + /** * get processInstances */ private List getProcessInstanceList() { - List processInstances = new ArrayList<>(); processInstances.add(new ProcessInstance()); return processInstances; @@ -112,9 +172,30 @@ public class WorkerGroupServiceTest { @Test public void testQueryAllGroupWithDefault() { Map result = workerGroupService.queryAllGroup(); - Set workerGroups = (Set) result.get(Constants.DATA_LIST); + List workerGroups = (List) result.get(Constants.DATA_LIST); Assert.assertEquals(1, workerGroups.size()); Assert.assertEquals("default", workerGroups.toArray()[0]); } + /** + * get Group + * @return + */ + private WorkerGroup getWorkerGroup(int id) { + WorkerGroup workerGroup = new WorkerGroup(); + workerGroup.setName(groupName); + workerGroup.setId(id); + return workerGroup; + } + + private WorkerGroup getWorkerGroup() { + return getWorkerGroup(1); + } + + private List getList() { + List list = new ArrayList<>(); + list.add(getWorkerGroup()); + return list; + } + } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index a47f3f9545..0270d8fdbe 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -790,8 +790,8 @@ public final class Constants { /** * master/worker server use for zk */ - public static final String MASTER_PREFIX = "master"; - public static final String WORKER_PREFIX = "worker"; + public static final String MASTER_TYPE = "master"; + public static final String WORKER_TYPE = "worker"; public static final String DELETE_ZK_OP = "delete"; public static final String ADD_ZK_OP = "add"; public static final String ALIAS = "alias"; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ZKNodeType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ZKNodeType.java index b4b3c59321..034f880694 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ZKNodeType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ZKNodeType.java @@ -25,7 +25,6 @@ public enum ZKNodeType { * 0 master node; * 1 worker node; * 2 dead_server node; - * 3 task_queue node; */ - MASTER, WORKER, DEAD_SERVER, TASK_QUEUE; + MASTER, WORKER, DEAD_SERVER; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java index e90c606b63..64759c420d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CollectionUtils.java @@ -27,6 +27,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; /** * Provides utility methods and decorators for {@link Collection} instances. @@ -126,6 +128,38 @@ public class CollectionUtils { return map; } + /** + * Transform item in collection + * + * @param collection origin collection + * @param transformFunc transform function + * @param origin item type + * @param target type + * @return transform list + */ + public static List transformToList(Collection collection, Function transformFunc) { + if (isEmpty(collection)) { + return new ArrayList<>(); + } + return collection.stream().map(transformFunc).collect(Collectors.toList()); + } + + /** + * Collect collection to map + * + * @param collection origin collection + * @param keyTransformFunction key transform function + * @param target k type + * @param value + * @return map + */ + public static Map collectionToMap(Collection collection, Function keyTransformFunction) { + if (isEmpty(collection)) { + return new HashMap<>(); + } + return collection.stream().collect(Collectors.toMap(keyTransformFunction, Function.identity())); + } + /** * Helper class to easily access cardinality properties of two collections. * diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java index 99685265e6..ccf8ace802 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/CollectionUtilsTest.java @@ -142,4 +142,20 @@ public class CollectionUtilsTest { Assert.assertEquals(2, cardinalityMap.get(2).intValue()); Assert.assertEquals(3, cardinalityMap.get(3).intValue()); } + + @Test + public void transformToList() { + List stringList = new ArrayList<>(); + stringList.add("1"); + List integers = CollectionUtils.transformToList(stringList, String::length); + Assert.assertFalse(integers.isEmpty()); + } + + @Test + public void collectionToMap() { + List stringList = new ArrayList<>(); + stringList.add("1"); + Map lengthStringMap = CollectionUtils.collectionToMap(stringList, String::length); + Assert.assertFalse(lengthStringMap.isEmpty()); + } } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java index b850219836..b6dc13aaa7 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java @@ -14,30 +14,62 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.dao.entity; +import java.util.Date; + import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonFormat; -import java.util.Date; -import java.util.List; - /** * worker group */ +@TableName("t_ds_worker_group") public class WorkerGroup { + @TableId(value = "id", type = IdType.AUTO) + private int id; + private String name; - private List ipList; + private String addrList; - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date createTime; - @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone="GMT+8") + + @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8") private Date updateTime; + @TableField(exist = false) + private boolean systemDefault; + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getAddrList() { + return addrList; + } + + public void setAddrList(String addrList) { + this.addrList = addrList; + } public Date getCreateTime() { return createTime; @@ -55,21 +87,24 @@ public class WorkerGroup { this.updateTime = updateTime; } - public String getName() { - return name; + public boolean getSystemDefault() { + return systemDefault; } - public void setName(String name) { - this.name = name; + public void setSystemDefault(boolean systemDefault) { + this.systemDefault = systemDefault; } - public List getIpList() { - return ipList; + @Override + public String toString() { + return "WorkerGroup{" + + "id= " + id + + ", name= " + name + + ", addrList= " + addrList + + ", createTime= " + createTime + + ", updateTime= " + updateTime + + ", systemDefault= " + systemDefault + + "}"; } - public void setIpList(List ipList) { - this.ipList = ipList; - } - - } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java index 6e53b45eb4..7be58a7422 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java @@ -64,14 +64,12 @@ public interface ProcessInstanceMapper extends BaseMapper { @Param("states") int[] states); /** - * query process instance by worker group and stateArray - * - * @param workerGroupId workerGroupId + * @param workerGroupName workerGroupName * @param states states array * @return process instance list */ - List queryByWorkerGroupIdAndStatus(@Param("workerGroupId") int workerGroupId, - @Param("states") int[] states); + List queryByWorkerGroupNameAndStatus(@Param("workerGroupName") String workerGroupName, + @Param("states") int[] states); /** * process instance page @@ -144,11 +142,12 @@ public interface ProcessInstanceMapper extends BaseMapper { /** * update process instance by worker groupId * - * @param originWorkerGroupId originWorkerGroupId - * @param destWorkerGroupId destWorkerGroupId + * @param originWorkerGroupName originWorkerGroupName + * @param destWorkerGroupName destWorkerGroupName * @return update result */ - int updateProcessInstanceByWorkerGroupId(@Param("originWorkerGroupId") int originWorkerGroupId, @Param("destWorkerGroupId") int destWorkerGroupId); + int updateProcessInstanceByWorkerGroupName(@Param("originWorkerGroupName") String originWorkerGroupName, + @Param("destWorkerGroupName") String destWorkerGroupName); /** * count process instance state by user diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java index 3e766389da..4418363dc2 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/UserMapper.java @@ -116,4 +116,12 @@ public interface UserMapper extends BaseMapper { * @return update rows */ Integer updateUserQueue(@Param("oldQueue") String oldQueue, @Param("newQueue") String newQueue); + + /** + * query user by ids + * + * @param ids id list + * @return user list + */ + List selectByIds(@Param("ids") List ids); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java new file mode 100644 index 0000000000..21af4b38c3 --- /dev/null +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.dao.mapper; + +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; + +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +/** + * worker group mapper interface + */ +public interface WorkerGroupMapper extends BaseMapper { + + /** + * query all worker group + * @return worker group list + */ + List queryAllWorkerGroup(); + + /** + * query worer grouop by name + * @param name name + * @return worker group list + */ + List queryWorkerGroupByName(@Param("name") String name); + +} diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml index a1cece523c..7344f8e995 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml @@ -72,13 +72,13 @@ order by id asc - select from t_ds_process_instance where 1=1 - - and worker_group_id =#{workerGroupId} + + and worker_group =#{workerGroupName} and state in @@ -138,10 +138,10 @@ where tenant_id = #{originTenantId} - + update t_ds_process_instance - set worker_group_id = #{destWorkerGroupId} - where worker_group_id = #{originWorkerGroupId} + set worker_group = #{destWorkerGroupName} + where worker_group = #{originWorkerGroupName} + select * + from t_ds_user + where id in + + #{id} + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml new file mode 100644 index 0000000000..2665f123cf --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.xml @@ -0,0 +1,31 @@ + + + + + + + + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java index a0930fbbdd..6dc348c851 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapperTest.java @@ -17,7 +17,9 @@ package org.apache.dolphinscheduler.dao.mapper; -import org.apache.dolphinscheduler.common.enums.CommandType; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; + import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -30,6 +32,7 @@ import java.util.Date; import java.util.List; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; @@ -38,13 +41,10 @@ import org.springframework.test.annotation.Rollback; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.transaction.annotation.Transactional; -import com.baomidou.mybatisplus.core.metadata.IPage; -import com.baomidou.mybatisplus.extension.plugins.pagination.Page; - @RunWith(SpringRunner.class) @SpringBootTest @Transactional -@Rollback(true) +@Rollback public class TaskInstanceMapperTest { @Autowired @@ -59,6 +59,17 @@ public class TaskInstanceMapperTest { @Autowired ProcessInstanceMapMapper processInstanceMapMapper; + private int processInstanceId; + + @Before + public void before() { + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setWarningGroupId(0); + processInstance.setCommandParam(""); + processInstanceMapper.insert(processInstance); + processInstanceId = processInstance.getId(); + } + /** * insert * @@ -100,6 +111,7 @@ public class TaskInstanceMapperTest { taskInstance.setProcessInstanceId(processInstanceId); taskInstance.setTaskType(taskType); taskInstance.setProcessDefinitionCode(1L); +// taskInstance.setProcessDefinitionId(processDefinitionId); taskInstanceMapper.insert(taskInstance); return taskInstance; } @@ -351,10 +363,6 @@ public class TaskInstanceMapperTest { // insert taskInstance TaskInstance task = insertTaskInstance(processInstance.getId()); - task.setProcessDefinitionId(definition.getId()); - task.setProcessInstanceId(processInstance.getId()); - taskInstanceMapper.updateById(task); - Page page = new Page(1, 3); IPage taskInstanceIPage = taskInstanceMapper.queryTaskInstanceListPaging( page, diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java index e573ad77ee..3ee0e6cfce 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/UserMapperTest.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.dao.entity.Queue; import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; +import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -315,6 +316,16 @@ public class UserMapperTest { } + @Test + public void selectByIds() { + //insertOne + User user = insertOne(); + List userIds = new ArrayList<>(); + userIds.add(user.getId()); + List users = userMapper.selectByIds(userIds); + Assert.assertFalse(users.isEmpty()); + } + @Test public void testExistUser() { String queueName = "queue"; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java index 7184b9e328..8e1ead38fd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/CommonHostManager.java @@ -18,18 +18,24 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.ResInfo; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; +import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; +import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import org.springframework.beans.factory.annotation.Autowired; @@ -45,11 +51,23 @@ public abstract class CommonHostManager implements HostManager { protected ZookeeperRegistryCenter registryCenter; /** - * zookeeperNodeManager + * zookeeper node manager */ @Autowired protected ZookeeperNodeManager zookeeperNodeManager; + /** + * zk master client + */ + @Autowired + protected ZKMasterClient zkMasterClient; + + /** + * worker group mapper + */ + @Autowired + protected WorkerGroupMapper workerGroupMapper; + /** * select host * @param context context @@ -57,41 +75,72 @@ public abstract class CommonHostManager implements HostManager { */ @Override public Host select(ExecutionContext context) { - Host host = new Host(); - Collection nodes = null; + List candidates = null; String workerGroup = context.getWorkerGroup(); - // executor type ExecutorType executorType = context.getExecutorType(); switch (executorType) { case WORKER: - nodes = zookeeperNodeManager.getWorkerGroupNodes(workerGroup); + candidates = getHostWorkersFromDatabase(workerGroup); + if (candidates.isEmpty()) { + candidates = getHostWorkersFromZookeeper(workerGroup); + } break; case CLIENT: break; default: throw new IllegalArgumentException("invalid executorType : " + executorType); - } - if (nodes == null || nodes.isEmpty()) { - return host; + + if (CollectionUtils.isEmpty(candidates)) { + return new Host(); } - List candidateHosts = new ArrayList<>(); - nodes.forEach(node -> { - String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup); - String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node); - int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT; - if (StringUtils.isNotEmpty(heartbeat)) { - String[] parts = heartbeat.split(Constants.COMMA); - if (ResInfo.isNewHeartbeatWithWeight(parts)) { - hostWeight = Integer.parseInt(parts[10]); + return select(candidates); + } + + protected abstract HostWorker select(Collection nodes); + + protected List getHostWorkersFromDatabase(String workerGroup) { + List hostWorkers = new ArrayList<>(); + List workerGroups = workerGroupMapper.queryWorkerGroupByName(workerGroup); + if (CollectionUtils.isNotEmpty(workerGroups)) { + Map serverMaps = zkMasterClient.getServerMaps(ZKNodeType.WORKER, true); + for (WorkerGroup wg : workerGroups) { + for (String addr : wg.getAddrList().split(Constants.COMMA)) { + if (serverMaps.containsKey(addr)) { + String heartbeat = serverMaps.get(addr); + int hostWeight = getWorkerHostWeightFromHeartbeat(heartbeat); + hostWorkers.add(HostWorker.of(addr, hostWeight, workerGroup)); + } } } - candidateHosts.add(HostWorker.of(node, hostWeight, workerGroup)); - }); - return select(candidateHosts); + } + return hostWorkers; } - protected abstract HostWorker select(Collection nodes); + protected List getHostWorkersFromZookeeper(String workerGroup) { + List hostWorkers = new ArrayList<>(); + Collection nodes = zookeeperNodeManager.getWorkerGroupNodes(workerGroup); + if (CollectionUtils.isNotEmpty(nodes)) { + for (String node : nodes) { + String workerGroupPath = registryCenter.getWorkerGroupPath(workerGroup); + String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node); + int hostWeight = getWorkerHostWeightFromHeartbeat(heartbeat); + hostWorkers.add(HostWorker.of(node, hostWeight, workerGroup)); + } + } + return hostWorkers; + } + + protected int getWorkerHostWeightFromHeartbeat(String heartbeat) { + int hostWeight = Constants.DEFAULT_WORKER_HOST_WEIGHT; + if (StringUtils.isNotEmpty(heartbeat)) { + String[] parts = heartbeat.split(Constants.COMMA); + if (ResInfo.isNewHeartbeatWithWeight(parts)) { + hostWeight = Integer.parseInt(parts[10]); + } + } + return hostWeight; + } public void setZookeeperNodeManager(ZookeeperNodeManager zookeeperNodeManager) { this.zookeeperNodeManager = zookeeperNodeManager; @@ -100,4 +149,5 @@ public abstract class CommonHostManager implements HostManager { public ZookeeperNodeManager getZookeeperNodeManager() { return zookeeperNodeManager; } + } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java index 458a1ee036..bce1d4ff75 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/HostManagerConfig.java @@ -45,7 +45,7 @@ public class HostManagerConfig { String hostSelector = masterConfig.getHostSelector(); HostSelector selector = HostSelector.of(hostSelector); HostManager hostManager; - switch (selector){ + switch (selector) { case RANDOM: hostManager = new RandomHostManager(); break; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java index ef249a08dc..e0b585a50e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/dispatch/host/LowerWeightHostManager.java @@ -18,9 +18,11 @@ package org.apache.dolphinscheduler.server.master.dispatch.host; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.utils.CollectionUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.ResInfo; +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory; import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext; @@ -28,9 +30,11 @@ import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWeight import org.apache.dolphinscheduler.server.master.dispatch.host.assign.HostWorker; import org.apache.dolphinscheduler.server.master.dispatch.host.assign.LowerWeightRoundRobin; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -90,7 +94,7 @@ public class LowerWeightHostManager extends CommonHostManager { } @PreDestroy - public void close(){ + public void close() { this.executorService.shutdownNow(); } @@ -137,9 +141,30 @@ public class LowerWeightHostManager extends CommonHostManager { @Override public void run() { try { + Map> workerHostWeights = new HashMap<>(); + // from database + List workerGroups = workerGroupMapper.queryAllWorkerGroup(); + if (CollectionUtils.isNotEmpty(workerGroups)) { + Map serverMaps = zkMasterClient.getServerMaps(ZKNodeType.WORKER, true); + for (WorkerGroup wg : workerGroups) { + String workerGroup = wg.getName(); + List addrs = Arrays.asList(wg.getAddrList().split(Constants.COMMA)); + Set hostWeights = new HashSet<>(addrs.size()); + for (String addr : addrs) { + if (serverMaps.containsKey(addr)) { + String heartbeat = serverMaps.get(addr); + HostWeight hostWeight = getHostWeight(addr, workerGroup, heartbeat); + if (hostWeight != null) { + hostWeights.add(hostWeight); + } + } + } + workerHostWeights.put(workerGroup, hostWeights); + } + } + // from zookeeper Map> workerGroupNodes = zookeeperNodeManager.getWorkerGroupNodes(); Set>> entries = workerGroupNodes.entrySet(); - Map> workerHostWeights = new HashMap<>(); for (Map.Entry> entry : entries) { String workerGroup = entry.getKey(); Set nodes = entry.getValue(); @@ -147,20 +172,8 @@ public class LowerWeightHostManager extends CommonHostManager { Set hostWeights = new HashSet<>(nodes.size()); for (String node : nodes) { String heartbeat = registryCenter.getRegisterOperator().get(workerGroupPath + "/" + node); - if (ResInfo.isValidHeartbeatForZKInfo(heartbeat)) { - String[] parts = heartbeat.split(Constants.COMMA); - int status = Integer.parseInt(parts[8]); - if (status == Constants.ABNORMAL_NODE_STATUS) { - logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", - Double.parseDouble(parts[3]) , Double.parseDouble(parts[2])); - continue; - } - double cpu = Double.parseDouble(parts[0]); - double memory = Double.parseDouble(parts[1]); - double loadAverage = Double.parseDouble(parts[2]); - long startTime = DateUtils.stringToDate(parts[6]).getTime(); - int weight = ResInfo.isNewHeartbeatWithWeight(parts) ? Integer.parseInt(parts[10]) : Constants.DEFAULT_WORKER_HOST_WEIGHT; - HostWeight hostWeight = new HostWeight(HostWorker.of(node, weight, workerGroup), cpu, memory, loadAverage, startTime); + HostWeight hostWeight = getHostWeight(node, workerGroup, heartbeat); + if (hostWeight != null) { hostWeights.add(hostWeight); } } @@ -171,6 +184,25 @@ public class LowerWeightHostManager extends CommonHostManager { logger.error("RefreshResourceTask error", ex); } } + + public HostWeight getHostWeight(String addr, String workerGroup, String heartbeat) { + if (ResInfo.isValidHeartbeatForZKInfo(heartbeat)) { + String[] parts = heartbeat.split(Constants.COMMA); + int status = Integer.parseInt(parts[8]); + if (status == Constants.ABNORMAL_NODE_STATUS) { + logger.warn("load is too high or availablePhysicalMemorySize(G) is too low, it's availablePhysicalMemorySize(G):{},loadAvg:{}", + Double.parseDouble(parts[3]), Double.parseDouble(parts[2])); + return null; + } + double cpu = Double.parseDouble(parts[0]); + double memory = Double.parseDouble(parts[1]); + double loadAverage = Double.parseDouble(parts[2]); + long startTime = DateUtils.stringToDate(parts[6]).getTime(); + int weight = getWorkerHostWeightFromHeartbeat(heartbeat); + return new HostWeight(HostWorker.of(addr, weight, workerGroup), cpu, memory, loadAverage, startTime); + } + return null; + } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java index f77f00d53a..09e9e223f3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/registry/MasterRegistry.java @@ -81,7 +81,7 @@ public class MasterRegistry { * registry */ public void registry() { - String address = NetUtils.getHost(); + String address = NetUtils.getAddr(masterConfig.getListenPort()); String localNodePath = getMasterPath(); zookeeperRegistryCenter.getRegisterOperator().persistEphemeral(localNodePath, ""); zookeeperRegistryCenter.getRegisterOperator().getZkClient().getConnectionStateListenable().addListener( @@ -101,7 +101,7 @@ public class MasterRegistry { masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory(), Sets.newHashSet(getMasterPath()), - Constants.MASTER_PREFIX, + Constants.MASTER_TYPE, zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, masterHeartbeatInterval, masterHeartbeatInterval, TimeUnit.SECONDS); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java index e4c3d6ab6d..460cc9fcdf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/HeartBeatTask.java @@ -113,7 +113,7 @@ public class HeartBeatTask implements Runnable { // save process id builder.append(OSUtils.getProcessID()); // worker host weight - if (Constants.WORKER_PREFIX.equals(serverType)) { + if (Constants.WORKER_TYPE.equals(serverType)) { builder.append(Constants.COMMA).append(hostWeight); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java index 591333bffe..fdbcb8fd7d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/registry/ZookeeperRegistryCenter.java @@ -17,10 +17,8 @@ package org.apache.dolphinscheduler.server.registry; -import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; -import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.IStoppable; @@ -230,9 +228,7 @@ public class ZookeeperRegistryCenter implements InitializingBean { // ip_sequence_no String[] zNodesPath = zNode.split("\\/"); String ipSeqNo = zNodesPath[zNodesPath.length - 1]; - - String type = serverType.equals(MASTER_PREFIX) ? MASTER_PREFIX : WORKER_PREFIX; - String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + type + UNDERLINE + ipSeqNo; + String deadServerPath = getDeadZNodeParentPath() + SINGLE_SLASH + serverType + UNDERLINE + ipSeqNo; return !registerOperator.isExisted(zNode) || registerOperator.isExisted(deadServerPath); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java index 1492899e84..22c19af89f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/config/WorkerConfig.java @@ -35,6 +35,9 @@ public class WorkerConfig { @Value("${worker.heartbeat.interval:10}") private int workerHeartbeatInterval; + @Value("${worker.host.weight:100}") + private int hostWeight; + @Value("${worker.tenant.auto.create:false}") private boolean workerTenantAutoCreate; @@ -44,14 +47,11 @@ public class WorkerConfig { @Value("${worker.reserved.memory:0.3}") private double workerReservedMemory; - @Value("#{'${worker.groups:default}'.split(',')}") - private Set workerGroups; - @Value("${worker.listen.port:1234}") private int listenPort; - @Value("${worker.host.weight:100}") - private int hostWeight; + @Value("#{'${worker.groups:default}'.split(',')}") + private Set workerGroups; @Value("${alert.listen.host:localhost}") private String alertListenHost; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java index 994fb58ff9..0ef995678e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/registry/WorkerRegistry.java @@ -99,7 +99,7 @@ public class WorkerRegistry { * registry */ public void registry() { - String address = NetUtils.getHost(); + String address = NetUtils.getAddr(workerConfig.getListenPort()); Set workerZkPaths = getWorkerZkPaths(); int workerHeartbeatInterval = workerConfig.getWorkerHeartbeatInterval(); @@ -125,7 +125,7 @@ public class WorkerRegistry { workerConfig.getWorkerReservedMemory(), workerConfig.getHostWeight(), workerZkPaths, - Constants.WORKER_PREFIX, + Constants.WORKER_TYPE, zookeeperRegistryCenter); this.heartBeatExecutor.scheduleAtFixedRate(heartBeatTask, workerHeartbeatInterval, workerHeartbeatInterval, TimeUnit.SECONDS); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java index 0f30a5c155..f19bfa2208 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/zk/ZKMasterClient.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.zk; import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.IStoppable; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ZKNodeType; import org.apache.dolphinscheduler.common.model.Server; @@ -30,7 +31,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; -import org.apache.dolphinscheduler.server.master.MasterServer; import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -73,7 +73,7 @@ public class ZKMasterClient extends AbstractZKClient { @Autowired private MasterRegistry masterRegistry; - public void start(MasterServer masterServer) { + public void start(IStoppable stoppable) { InterProcessMutex mutex = null; try { // create distributed lock with the root node path of the lock space as /dolphinscheduler/lock/failover/startup-masters @@ -83,7 +83,7 @@ public class ZKMasterClient extends AbstractZKClient { // master registry masterRegistry.registry(); - masterRegistry.getZookeeperRegistryCenter().setStoppable(masterServer); + masterRegistry.getZookeeperRegistryCenter().setStoppable(stoppable); String registryPath = this.masterRegistry.getMasterPath(); masterRegistry.getZookeeperRegistryCenter().getRegisterOperator().handleDeadServer(registryPath, ZKNodeType.MASTER, Constants.DELETE_ZK_OP); @@ -282,7 +282,7 @@ public class ZKMasterClient extends AbstractZKClient { return false; } Date workerServerStartDate = null; - List workerServers = getServersList(ZKNodeType.WORKER); + List workerServers = getServerList(ZKNodeType.WORKER); for (Server workerServer : workerServers) { if (taskInstance.getHost().equals(workerServer.getHost() + Constants.COLON + workerServer.getPort())) { workerServerStartDate = workerServer.getCreateTime(); diff --git a/dolphinscheduler-server/src/main/resources/master.properties b/dolphinscheduler-server/src/main/resources/master.properties index 01d1189507..bb9877cb2e 100644 --- a/dolphinscheduler-server/src/main/resources/master.properties +++ b/dolphinscheduler-server/src/main/resources/master.properties @@ -15,7 +15,7 @@ # limitations under the License. # -# master execute thread num +# master execute thread number #master.exec.threads=100 # master execute task number in parallel diff --git a/dolphinscheduler-server/src/main/resources/worker.properties b/dolphinscheduler-server/src/main/resources/worker.properties index 88c15a6d5d..c5c379c693 100644 --- a/dolphinscheduler-server/src/main/resources/worker.properties +++ b/dolphinscheduler-server/src/main/resources/worker.properties @@ -15,12 +15,15 @@ # limitations under the License. # -# worker execute thread num +# worker execute thread number #worker.exec.threads=100 # worker heartbeat interval #worker.heartbeat.interval=10 +# worker host weight to dispatch tasks, default value 100 +#worker.host.weight=100 + # worker tenant auto create #worker.tenant.auto.create=false @@ -33,11 +36,8 @@ # worker listener port #worker.listen.port=1234 -# default worker groups +# default worker groups, if this worker belongs different groups, you can config the following like that 'worker.groups=default,test' #worker.groups=default -# default worker host weight -#worker.host.weight=100 - # alert server listener host alert.listen.host=localhost diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java index 74cd2da6c8..14b1c61ce3 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.enums.ResourceType; import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.datasource.SpringConnectionFactory; import org.apache.dolphinscheduler.dao.entity.DataSource; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; @@ -36,10 +37,12 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.master.registry.MasterRegistry; import org.apache.dolphinscheduler.server.registry.DependencyConfig; import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager; import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter; import org.apache.dolphinscheduler.server.zk.SpringZKServer; +import org.apache.dolphinscheduler.server.zk.ZKMasterClient; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.queue.TaskPriority; @@ -66,12 +69,11 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = {DependencyConfig.class, SpringApplicationContext.class, SpringZKServer.class, CuratorZookeeperClient.class, - NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, TaskPriorityQueueConsumer.class, - ZookeeperNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class, - CuratorZookeeperClient.class}) + NettyExecutorManager.class, ExecutorDispatcher.class, ZookeeperRegistryCenter.class, ZKMasterClient.class, TaskPriorityQueueConsumer.class, + ZookeeperNodeManager.class, RegisterOperator.class, ZookeeperConfig.class, MasterConfig.class, MasterRegistry.class, + CuratorZookeeperClient.class, SpringConnectionFactory.class}) public class TaskPriorityQueueConsumerTest { - @Autowired private TaskPriorityQueue taskPriorityQueue; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java index 7cdf680090..029dfac002 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/AbstractZKClient.java @@ -76,38 +76,38 @@ public abstract class AbstractZKClient extends RegisterOperator { * @param zkNodeType zookeeper node type * @return server list */ - public List getServersList(ZKNodeType zkNodeType) { - Map masterMap = getServerMaps(zkNodeType); + public List getServerList(ZKNodeType zkNodeType) { + Map serverMaps = getServerMaps(zkNodeType); String parentPath = getZNodeParentPath(zkNodeType); - List masterServers = new ArrayList<>(); - for (Map.Entry entry : masterMap.entrySet()) { - Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); - if (masterServer == null) { + List serverList = new ArrayList<>(); + for (Map.Entry entry : serverMaps.entrySet()) { + Server server = ResInfo.parseHeartbeatForZKInfo(entry.getValue()); + if (server == null) { continue; } String key = entry.getKey(); - masterServer.setZkDirectory(parentPath + "/" + key); - //set host and port + server.setZkDirectory(parentPath + "/" + key); + // set host and port String[] hostAndPort = key.split(COLON); String[] hosts = hostAndPort[0].split(DIVISION_STRING); // fetch the last one - masterServer.setHost(hosts[hosts.length - 1]); - masterServer.setPort(Integer.parseInt(hostAndPort[1])); - masterServers.add(masterServer); + server.setHost(hosts[hosts.length - 1]); + server.setPort(Integer.parseInt(hostAndPort[1])); + serverList.add(server); } - return masterServers; + return serverList; } /** - * get master server list map. + * get server list map. * * @param zkNodeType zookeeper node type + * @param hostOnly host only * @return result : {host : resource info} */ - public Map getServerMaps(ZKNodeType zkNodeType) { - - Map masterMap = new HashMap<>(); + public Map getServerMaps(ZKNodeType zkNodeType, boolean hostOnly) { + Map serverMap = new HashMap<>(); try { String path = getZNodeParentPath(zkNodeType); List serverList = super.getChildrenKeys(path); @@ -122,13 +122,27 @@ public abstract class AbstractZKClient extends RegisterOperator { serverList = workerList; } for (String server : serverList) { - masterMap.putIfAbsent(server, super.get(path + Constants.SLASH + server)); + String host = server; + if (zkNodeType == ZKNodeType.WORKER && hostOnly) { + host = server.split(Constants.SLASH)[1]; + } + serverMap.putIfAbsent(host, super.get(path + Constants.SLASH + server)); } } catch (Exception e) { logger.error("get server list failed", e); } - return masterMap; + return serverMap; + } + + /** + * get server list map. + * + * @param zkNodeType zookeeper node type + * @return result : {host : resource info} + */ + public Map getServerMaps(ZKNodeType zkNodeType) { + return getServerMaps(zkNodeType, false); } /** @@ -145,7 +159,7 @@ public abstract class AbstractZKClient extends RegisterOperator { host, zkNodeType); return false; } - Map serverMaps = getServerMaps(zkNodeType); + Map serverMaps = getServerMaps(zkNodeType, true); for (String hostKey : serverMaps.keySet()) { if (hostKey.contains(host)) { return true; diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java index 0c6ac6d0d4..185f9ff523 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/RegisterOperator.java @@ -19,10 +19,10 @@ package org.apache.dolphinscheduler.service.zk; import static org.apache.dolphinscheduler.common.Constants.ADD_ZK_OP; import static org.apache.dolphinscheduler.common.Constants.DELETE_ZK_OP; -import static org.apache.dolphinscheduler.common.Constants.MASTER_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.MASTER_TYPE; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.Constants.UNDERLINE; -import static org.apache.dolphinscheduler.common.Constants.WORKER_PREFIX; +import static org.apache.dolphinscheduler.common.Constants.WORKER_TYPE; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ZKNodeType; @@ -99,7 +99,7 @@ public class RegisterOperator extends ZookeeperCachedOperator { */ public void handleDeadServer(String zNode, ZKNodeType zkNodeType, String opType) throws Exception { String host = getHostByEventDataPath(zNode); - String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; + String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE; //check server restart, if restart , dead server path in zk should be delete if (opType.equals(DELETE_ZK_OP)) { @@ -130,7 +130,7 @@ public class RegisterOperator extends ZookeeperCachedOperator { */ public void handleDeadServer(Set zNodeSet, ZKNodeType zkNodeType, String opType) throws Exception { - String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_PREFIX : WORKER_PREFIX; + String type = (zkNodeType == ZKNodeType.MASTER) ? MASTER_TYPE : WORKER_TYPE; for (String zNode : zNodeSet) { String host = getHostByEventDataPath(zNode); //check server restart, if restart , dead server path in zk should be delete diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java index 7823c9b1af..cf77080fe3 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/zk/RegisterOperatorTest.java @@ -97,7 +97,7 @@ public class RegisterOperatorTest { testAfterPropertiesSet(); registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP); String path = registerOperator.getDeadZNodeParentPath(); - Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_TYPE,MASTER_NODE))); } @@ -107,10 +107,10 @@ public class RegisterOperatorTest { String path = registerOperator.getDeadZNodeParentPath(); registerOperator.handleDeadServer(MASTER_NODE, ZKNodeType.MASTER,Constants.ADD_ZK_OP); - Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + Assert.assertTrue(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_TYPE,MASTER_NODE))); - registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_PREFIX); - Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_PREFIX,MASTER_NODE))); + registerOperator.removeDeadServerByHost(MASTER_NODE,Constants.MASTER_TYPE); + Assert.assertFalse(registerOperator.getChildrenKeys(path).contains(String.format("%s_%s",Constants.MASTER_TYPE,MASTER_NODE))); } @Test diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue index d016c30d4a..3df9ee6c78 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sql.vue @@ -37,9 +37,9 @@
- {{$t('Display query result')}} + {{$t('Log display')}} - ({{$t('Rows')}}) + {{$t('rows of result')}}
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/queue/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/queue/index.vue index 2f7bea6ec8..353018e8bb 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/queue/index.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/queue/index.vue @@ -117,11 +117,9 @@ this._debounceGET('false') this.createQueueDialog = false }, - close () { this.createQueueDialog = false }, - _getList (flag) { if (sessionStorage.getItem('isLeft') === 0) { this.isLeft = false diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/index.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/index.vue index 89ed41e726..5c82073565 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/index.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/tenement/index.vue @@ -119,11 +119,9 @@ this._debounceGET('false') this.createTenementDialog = false }, - close () { this.createTenementDialog = false }, - _getList (flag) { if (sessionStorage.getItem('isLeft') === 0) { this.isLeft = false diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue index 7ecf124a73..615cf9706d 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/security/pages/users/_source/list.vue @@ -48,7 +48,7 @@ {{scope.row.updateTime | formatDate}} - + - +