diff --git a/docs/docs/en/guide/upgrade/incompatible.md b/docs/docs/en/guide/upgrade/incompatible.md index 12dd7fa356..12152c58a1 100644 --- a/docs/docs/en/guide/upgrade/incompatible.md +++ b/docs/docs/en/guide/upgrade/incompatible.md @@ -35,4 +35,5 @@ This document records the incompatible updates between each version. You need to * Remove the `Data Quality` module ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794) * Remove the `registry-disconnect-strategy` in `application.yaml` ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821) * Remove `exec-threads` in worker's `application.yaml`, please use `physical-task-config`;Remove `master-async-task-executor-thread-pool-size` in master's `application.yaml`, please use `logic-task-config` ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790) +* Drop unused column `other_params_json` in `t_ds_worker_group` ([#16860])(https://github.com/apache/dolphinscheduler/pull/16860) diff --git a/docs/docs/zh/guide/upgrade/incompatible.md b/docs/docs/zh/guide/upgrade/incompatible.md index 412a66e4b3..530e006c9c 100644 --- a/docs/docs/zh/guide/upgrade/incompatible.md +++ b/docs/docs/zh/guide/upgrade/incompatible.md @@ -33,4 +33,5 @@ * 移除 `数据质量` 模块 ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794) * 在`application.yaml`中移除`registry-disconnect-strategy`配置 ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821) * 在worker的`application.yaml`中移除`exec-threads`,使用`physical-task-config`替代;在master的`application.yaml`中移除`master-async-task-executor-thread-pool-size`使用`logic-task-config`替代 ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790) +* 在`t_ds_worker_group` 表中移除 无用的`other_params_json`字段 ([#16860])(https://github.com/apache/dolphinscheduler/pull/16860) diff --git a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/WorkerGroupAPITest.java b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/WorkerGroupAPITest.java index bf63d23392..36cb88ccb8 100644 --- a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/WorkerGroupAPITest.java +++ b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/WorkerGroupAPITest.java @@ -75,8 +75,12 @@ public class WorkerGroupAPITest { @Test @Order(1) public void testSaveWorkerGroup() { - HttpResponse saveWorkerGroupHttpResponse = workerGroupPage - .saveWorkerGroup(loginUser, 1, "test_worker_group", "10.5.0.5:1234", "test", null); + HttpResponse saveWorkerGroupHttpResponse = workerGroupPage.saveWorkerGroup( + loginUser, + 0, + "test_worker_group", + "10.5.0.5:1234", + "test"); Assertions.assertTrue(saveWorkerGroupHttpResponse.getBody().getSuccess()); HttpResponse queryAllWorkerGroupsResponse = workerGroupPage.queryAllWorkerGroups(loginUser); diff --git a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/pages/security/WorkerGroupPage.java b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/pages/security/WorkerGroupPage.java index 6ea77b743a..1d2c911969 100644 --- a/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/pages/security/WorkerGroupPage.java +++ b/dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/pages/security/WorkerGroupPage.java @@ -32,15 +32,13 @@ public class WorkerGroupPage { private String sessionId; - public HttpResponse saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, - String otherParamsJson) { + public HttpResponse saveWorkerGroup(User loginUser, int id, String name, String addrList, String description) { Map params = new HashMap<>(); params.put("loginUser", loginUser); params.put("id", id); params.put("name", name); params.put("addrList", addrList); params.put("description", description); - params.put("otherParamsJson", otherParamsJson); Map headers = new HashMap<>(); headers.put(Constants.SESSION_ID_KEY, sessionId); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java index 8d025bd873..fb3212240a 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.service.WorkerGroupService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import java.util.Map; @@ -51,16 +52,13 @@ import io.swagger.v3.oas.annotations.Parameters; import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.tags.Tag; -/** - * worker group controller - */ @Tag(name = "WORKER_GROUP_TAG") @RestController @RequestMapping("/worker-groups") public class WorkerGroupController extends BaseController { @Autowired - WorkerGroupService workerGroupService; + private WorkerGroupService workerGroupService; /** * create or update a worker group @@ -77,21 +75,18 @@ public class WorkerGroupController extends BaseController { @Parameter(name = "name", description = "WORKER_GROUP_NAME", required = true, schema = @Schema(implementation = String.class)), @Parameter(name = "addrList", description = "WORKER_ADDR_LIST", required = true, schema = @Schema(implementation = String.class)), @Parameter(name = "description", description = "WORKER_DESC", required = false, schema = @Schema(implementation = String.class)), - @Parameter(name = "otherParamsJson", description = "WORKER_PARAMS_JSON", required = false, schema = @Schema(implementation = String.class)), }) @PostMapping() @ResponseStatus(HttpStatus.OK) @ApiException(SAVE_ERROR) @OperatorLog(auditType = AuditType.WORKER_GROUP_CREATE) - public Result saveWorkerGroup(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "id", required = false, defaultValue = "0") int id, - @RequestParam(value = "name") String name, - @RequestParam(value = "addrList") String addrList, - @RequestParam(value = "description", required = false, defaultValue = "") String description, - @RequestParam(value = "otherParamsJson", required = false, defaultValue = "") String otherParamsJson) { - Map result = - workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description, otherParamsJson); - return returnDataList(result); + public Result saveWorkerGroup(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "id", required = false, defaultValue = "0") int id, + @RequestParam(value = "name") String name, + @RequestParam(value = "addrList") String addrList, + @RequestParam(value = "description", required = false, defaultValue = "") String description) { + final WorkerGroup workerGroup = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description); + return Result.success(workerGroup); } /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index 9ef215b65d..13cd3c3e35 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import java.util.List; import java.util.Map; @@ -33,11 +34,9 @@ public interface WorkerGroupService { * @param name worker group name * @param addrList addr list * @param description description - * @param otherParamsJson otherParamsJson * @return create or update result code */ - Map saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, - String otherParamsJson); + WorkerGroup saveWorkerGroup(User loginUser, int id, String name, String addrList, String description); /** * Query worker group paging diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index c55b868797..4173a0c9e4 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_DELETE; import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.WorkerGroupService; import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; @@ -28,6 +29,7 @@ import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.common.model.Server; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation; import org.apache.dolphinscheduler.dao.entity.Schedule; @@ -42,9 +44,10 @@ import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; +import org.apache.dolphinscheduler.extract.base.client.Clients; +import org.apache.dolphinscheduler.extract.master.IMasterContainerService; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -55,14 +58,13 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -85,9 +87,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro @Autowired private EnvironmentWorkerGroupRelationMapper environmentWorkerGroupRelationMapper; - @Autowired - private ProcessService processService; - @Autowired private ScheduleMapper scheduleMapper; @@ -107,89 +106,53 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro * @return create or update result code */ @Override - @Transactional - public Map saveWorkerGroup(User loginUser, int id, String name, String addrList, String description, - String otherParamsJson) { + public WorkerGroup saveWorkerGroup(User loginUser, + int id, + String name, + String addrList, + String description) { Map result = new HashMap<>(); if (!canOperatorPermissions(loginUser, null, AuthorizationType.WORKER_GROUP, WORKER_GROUP_CREATE)) { - putMsg(result, Status.USER_NO_OPERATION_PERM); - return result; + // todo: add permission exception + throw new ServiceException(Status.USER_NO_OPERATION_PERM); } if (StringUtils.isEmpty(name)) { - log.warn("Parameter name can ot be null."); - putMsg(result, Status.NAME_NULL); - return result; + throw new ServiceException(Status.NAME_NULL); } - Date now = new Date(); - WorkerGroup workerGroup = null; - if (id != 0) { - workerGroup = workerGroupMapper.selectById(id); - if (Objects.nonNull(workerGroup) && !workerGroup.getName().equals(name)) { - if (checkWorkerGroupDependencies(workerGroup, result)) { - return result; + checkWorkerGroupAddrList(addrList); + final Date now = new Date(); + final WorkerGroup workerGroup; + try { + if (id == 0) { + // insert + workerGroup = new WorkerGroup(); + workerGroup.setCreateTime(now); + workerGroup.setName(name); + workerGroup.setAddrList(addrList); + workerGroup.setUpdateTime(now); + workerGroup.setDescription(description); + workerGroupMapper.insert(workerGroup); + } else { + workerGroup = workerGroupMapper.selectById(id); + if (workerGroup == null) { + throw new ServiceException(Status.WORKER_GROUP_NOT_EXIST, id); } + // todo: Can we update the worker name? + if (!workerGroup.getName().equals(name)) { + checkWorkerGroupDependencies(workerGroup, result); + } + workerGroup.setName(name); + workerGroup.setAddrList(addrList); + workerGroup.setUpdateTime(now); + workerGroup.setDescription(description); + workerGroupMapper.updateById(workerGroup); + log.info("Update worker group: {} success .", workerGroup); } + boardCastToMasterThatWorkerGroupChanged(); + return workerGroup; + } catch (DuplicateKeyException duplicateKeyException) { + throw new ServiceException(Status.NAME_EXIST, name); } - if (workerGroup == null) { - workerGroup = new WorkerGroup(); - workerGroup.setCreateTime(now); - } - - workerGroup.setName(name); - workerGroup.setAddrList(addrList); - workerGroup.setUpdateTime(now); - workerGroup.setDescription(description); - - if (checkWorkerGroupNameExists(workerGroup)) { - log.warn("Worker group with the same name already exists, name:{}.", workerGroup.getName()); - putMsg(result, Status.NAME_EXIST, workerGroup.getName()); - return result; - } - String invalidAddr = checkWorkerGroupAddrList(workerGroup); - if (invalidAddr != null) { - log.warn("Worker group address is invalid, invalidAddr:{}.", invalidAddr); - putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr); - return result; - } - - handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson); - log.info("Worker group save complete, workerGroupName:{}.", workerGroup.getName()); - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, workerGroup); - return result; - } - - protected void handleDefaultWorkGroup(WorkerGroupMapper workerGroupMapper, WorkerGroup workerGroup, User loginUser, - String otherParamsJson) { - if (workerGroup.getId() != null) { - workerGroupMapper.updateById(workerGroup); - } else { - workerGroupMapper.insert(workerGroup); - } - } - - /** - * check worker group name exists - * - * @param workerGroup worker group - * @return boolean - */ - private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) { - // check database - List workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName()); - if (CollectionUtils.isNotEmpty(workerGroupList)) { - // create group, the same group name exists in the database - if (workerGroup.getId() == null) { - return true; - } - // update group, the database exists with the same group name except itself - Optional sameNameWorkGroupOptional = workerGroupList.stream() - .filter(group -> !Objects.equals(group.getId(), workerGroup.getId())).findFirst(); - if (sameNameWorkGroupOptional.isPresent()) { - return true; - } - } - return false; } /** @@ -240,23 +203,16 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro return false; } - /** - * check worker group addr list - * - * @param workerGroup worker group - * @return boolean - */ - private String checkWorkerGroupAddrList(WorkerGroup workerGroup) { - if (Strings.isNullOrEmpty(workerGroup.getAddrList())) { - return null; + private void checkWorkerGroupAddrList(String workerGroupAddress) { + if (Strings.isNullOrEmpty(workerGroupAddress)) { + return; } Map serverMaps = registryClient.getServerMaps(RegistryNodeType.WORKER); - for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) { + for (String addr : workerGroupAddress.split(Constants.COMMA)) { if (!serverMaps.containsKey(addr)) { - return addr; + throw new ServiceException(Status.WORKER_ADDRESS_INVALID); } } - return null; } /** @@ -438,4 +394,20 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro Schedule::getWorkerGroup)); } + private void boardCastToMasterThatWorkerGroupChanged() { + final List masters = registryClient.getServerList(RegistryNodeType.MASTER); + if (CollectionUtils.isEmpty(masters)) { + return; + } + for (Server master : masters) { + try { + Clients.withService(IMasterContainerService.class) + .withHost(master.getHost() + ":" + master.getPort()) + .refreshWorkerGroup(); + } catch (Exception e) { + log.error("Broadcast to master: {} that worker group changed failed", master, e); + } + } + } + } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index 5cce297d8a..86ea324fd6 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -17,9 +17,12 @@ package org.apache.dolphinscheduler.api.service; +import static org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThrow; +import static org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_CREATE; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_DELETE; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; @@ -40,7 +43,6 @@ import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; -import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; import java.util.HashMap; @@ -60,6 +62,7 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.dao.DuplicateKeyException; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -78,9 +81,6 @@ public class WorkerGroupServiceTest { @Mock private WorkflowInstanceMapper workflowInstanceMapper; - @Mock - private ProcessService processService; - @Mock private RegistryClient registryClient; @@ -109,85 +109,78 @@ public class WorkerGroupServiceTest { @Test public void giveNoPermission_whenSaveWorkerGroup_expectNoOperation() { User loginUser = getLoginUser(); - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(false); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, baseServiceLogger)).thenReturn(false); - Map result = - workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", ""); - Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(), - ((Status) result.get(Constants.STATUS)).getCode()); + assertThrowsServiceException(Status.USER_NO_OPERATION_PERM, () -> { + workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group"); + }); } @Test public void giveNullName_whenSaveWorkerGroup_expectNAME_NULL() { User loginUser = getLoginUser(); - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, baseServiceLogger)).thenReturn(true); - Map result = - workerGroupService.saveWorkerGroup(loginUser, 1, "", "localhost:0000", "test group", ""); - Assertions.assertEquals(Status.NAME_NULL.getCode(), - ((Status) result.get(Constants.STATUS)).getCode()); + assertThrowsServiceException(Status.NAME_NULL, () -> { + workerGroupService.saveWorkerGroup(loginUser, 1, "", "localhost:0000", "test group"); + }); } @Test public void giveSameUserName_whenSaveWorkerGroup_expectNAME_EXIST() { User loginUser = getLoginUser(); - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, baseServiceLogger)).thenReturn(true); - Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null); - List workerGroupList = new ArrayList(); - workerGroupList.add(getWorkerGroup(1)); - Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(workerGroupList); - - Map result = - workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", ""); - Assertions.assertEquals(Status.NAME_EXIST.getCode(), - ((Status) result.get(Constants.STATUS)).getCode()); + + Map serverMaps = new HashMap<>(); + serverMaps.put("localhost:0000", ""); + + when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps); + when(workerGroupMapper.insert(Mockito.any())).thenThrow(DuplicateKeyException.class); + assertThrowsServiceException(Status.NAME_EXIST, () -> { + workerGroupService.saveWorkerGroup(loginUser, 0, GROUP_NAME, "localhost:0000", "test group"); + }); } @Test public void giveInvalidAddress_whenSaveWorkerGroup_expectADDRESS_INVALID() { User loginUser = getLoginUser(); - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, baseServiceLogger)).thenReturn(true); - Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null); - Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null); + when(workerGroupMapper.selectById(1)).thenReturn(null); + when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null); Map serverMaps = new HashMap<>(); serverMaps.put("localhost1:0000", ""); - Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps); - - Map result = - workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", ""); - Assertions.assertEquals(Status.WORKER_ADDRESS_INVALID.getCode(), - ((Status) result.get(Constants.STATUS)).getCode()); + when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps); + assertThrowsServiceException(Status.WORKER_ADDRESS_INVALID, () -> { + workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group"); + }); } @Test public void giveValidWorkerGroup_whenSaveWorkerGroup_expectSuccess() { User loginUser = getLoginUser(); - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, baseServiceLogger)).thenReturn(true); - Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null); - Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null); + when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null); Map serverMaps = new HashMap<>(); serverMaps.put("localhost:0000", ""); - Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps); - Mockito.when(workerGroupMapper.insert(any())).thenReturn(1); - - Map result = - workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", ""); - Assertions.assertEquals(Status.SUCCESS.getCode(), - ((Status) result.get(Constants.STATUS)).getCode()); + when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps); + when(workerGroupMapper.insert(any())).thenReturn(1); + assertDoesNotThrow(() -> { + workerGroupService.saveWorkerGroup(loginUser, 0, GROUP_NAME, "localhost:0000", "test group"); + }); } @Test @@ -197,13 +190,13 @@ public class WorkerGroupServiceTest { ids.add(1); List workerGroups = new ArrayList<>(); workerGroups.add(getWorkerGroup(1)); - Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP, + when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP, loginUser.getId(), serviceLogger)).thenReturn(ids); - Mockito.when(workerGroupMapper.selectBatchIds(ids)).thenReturn(workerGroups); + when(workerGroupMapper.selectBatchIds(ids)).thenReturn(workerGroups); Set activeWorkerNodes = new HashSet<>(); activeWorkerNodes.add("localhost:12345"); activeWorkerNodes.add("localhost:23456"); - Mockito.when(registryClient.getServerNodeSet(RegistryNodeType.WORKER)).thenReturn(activeWorkerNodes); + when(registryClient.getServerNodeSet(RegistryNodeType.WORKER)).thenReturn(activeWorkerNodes); Result result = workerGroupService.queryAllGroupPaging(loginUser, 1, 1, null); Assertions.assertEquals(result.getCode(), Status.SUCCESS.getCode()); @@ -219,11 +212,11 @@ public class WorkerGroupServiceTest { @Test public void giveNotExistsWorkerGroup_whenDeleteWorkerGroupById_expectNotExists() { User loginUser = getLoginUser(); - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, baseServiceLogger)).thenReturn(true); - Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null); + when(workerGroupMapper.selectById(1)).thenReturn(null); Map notExistResult = workerGroupService.deleteWorkerGroupById(loginUser, 1); Assertions.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(), @@ -233,19 +226,19 @@ public class WorkerGroupServiceTest { @Test public void giveRunningProcess_whenDeleteWorkerGroupById_expectFailed() { User loginUser = getLoginUser(); - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, baseServiceLogger)).thenReturn(true); WorkerGroup workerGroup = getWorkerGroup(1); - Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup); + when(workerGroupMapper.selectById(1)).thenReturn(workerGroup); WorkflowInstance workflowInstance = new WorkflowInstance(); workflowInstance.setId(1); List workflowInstances = new ArrayList(); workflowInstances.add(workflowInstance); - Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), + when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), WorkflowExecutionStatus.getNotTerminalStatus())) - .thenReturn(workflowInstances); + .thenReturn(workflowInstances); Map deleteFailed = workerGroupService.deleteWorkerGroupById(loginUser, 1); Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(), @@ -255,23 +248,23 @@ public class WorkerGroupServiceTest { @Test public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() { User loginUser = getLoginUser(); - Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true); - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, baseServiceLogger)).thenReturn(true); WorkerGroup workerGroup = getWorkerGroup(1); - Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup); - Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), + when(workerGroupMapper.selectById(1)).thenReturn(workerGroup); + when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), WorkflowExecutionStatus.getNotTerminalStatus())).thenReturn(null); - Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1); + when(workerGroupMapper.deleteById(1)).thenReturn(1); - Mockito.when(environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName())) + when(environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName())) .thenReturn(null); - Mockito.when(taskDefinitionMapper.selectList(Mockito.any())).thenReturn(null); + when(taskDefinitionMapper.selectList(Mockito.any())).thenReturn(null); - Mockito.when(scheduleMapper.selectList(Mockito.any())).thenReturn(null); + when(scheduleMapper.selectList(Mockito.any())).thenReturn(null); Map successResult = workerGroupService.deleteWorkerGroupById(loginUser, 1); Assertions.assertEquals(Status.SUCCESS.getCode(), diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java index 3119a8dddd..df9077602e 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java @@ -52,6 +52,4 @@ public class WorkerGroup { @TableField(exist = false) private boolean systemDefault; - private String otherParamsJson; - } diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index f4cdb31f03..9beb2ffd9b 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -1015,7 +1015,6 @@ CREATE TABLE t_ds_worker_group create_time datetime NULL DEFAULT NULL, update_time datetime NULL DEFAULT NULL, description text NULL DEFAULT NULL, - other_params_json text NULL DEFAULT NULL, PRIMARY KEY (id), UNIQUE KEY name_unique (name) ); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 4a63f32e8e..7826be4846 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -1012,7 +1012,6 @@ CREATE TABLE `t_ds_worker_group` ( `create_time` datetime NULL DEFAULT NULL COMMENT 'create time', `update_time` datetime NULL DEFAULT NULL COMMENT 'update time', `description` text NULL DEFAULT NULL COMMENT 'description', - `other_params_json` text NULL DEFAULT NULL COMMENT 'other params json', PRIMARY KEY (`id`), UNIQUE KEY `name_unique` (`name`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin; diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index f2a7aae44b..e31ba9184d 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -926,7 +926,6 @@ CREATE TABLE t_ds_worker_group ( create_time timestamp DEFAULT NULL , update_time timestamp DEFAULT NULL , description text DEFAULT NULL, - other_params_json text DEFAULT NULL, PRIMARY KEY (id) , CONSTRAINT name_unique UNIQUE (name) ) ; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql index c78c33ea8a..935ece7ca8 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql @@ -225,3 +225,22 @@ DROP PROCEDURE drop_data_quality_tables; ALTER TABLE `t_ds_workflow_definition` ADD KEY `idx_project_code` (`project_code`) USING BTREE; ALTER TABLE `t_ds_workflow_definition_log` ADD KEY `idx_project_code` (`project_code`) USING BTREE; + +-- drop_column_t_ds_worker_group other_params_json +DROP PROCEDURE if EXISTS drop_column_t_ds_worker_group_other_params_json; +delimiter d// +CREATE PROCEDURE drop_column_t_ds_worker_group_other_params_json() +BEGIN + IF EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_ds_worker_group' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME ='other_params_json') + THEN +ALTER TABLE `t_ds_worker_group` + DROP COLUMN `other_params_json`; +END IF; +END; +d// +delimiter ; +CALL drop_column_t_ds_worker_group_other_params_json; +DROP PROCEDURE drop_column_t_ds_worker_group_other_params_json; \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql index 31eaf8f645..9b9812033a 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql @@ -228,3 +228,22 @@ DROP FUNCTION IF EXISTS drop_data_quality_tables(); create index workflow_definition_index_project_code on t_ds_workflow_definition (project_code); create index workflow_definition_log_index_project_code on t_ds_workflow_definition_log (project_code); + +-- drop_column_t_ds_worker_group other_params_json +delimiter d// +CREATE OR REPLACE FUNCTION drop_column_t_ds_worker_group_other_params_json() RETURNS void AS $$ +BEGIN + IF EXISTS (SELECT 1 + FROM information_schema.columns + WHERE table_name = 't_ds_worker_group' + AND column_name = 'other_params_json') + THEN +ALTER TABLE t_ds_worker_group +DROP COLUMN "other_params_json"; +END IF; +END; +$$ LANGUAGE plpgsql; +d// + +select drop_column_t_ds_worker_group_other_params_json(); +DROP FUNCTION IF EXISTS drop_column_t_ds_worker_group_other_params_json(); \ No newline at end of file diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java index afd474193a..84f140ff0b 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java @@ -65,7 +65,6 @@ public class WorkerGroupMapperTest extends BaseDaoTest { workerGroup.setCreateTime(new Date()); workerGroup.setUpdateTime(new Date()); workerGroup.setSystemDefault(true); - workerGroup.setOtherParamsJson(""); workerGroup.setAddrList("localhost"); workerGroupMapper.insert(workerGroup); return workerGroup; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterContainerService.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterContainerService.java new file mode 100644 index 0000000000..d14f02c047 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterContainerService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master; + +import org.apache.dolphinscheduler.extract.base.RpcMethod; +import org.apache.dolphinscheduler.extract.base.RpcService; + +@RpcService +public interface IMasterContainerService { + + @RpcMethod + void refreshWorkerGroup(); + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java index 41c5fe0f9d..827ebfc576 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java @@ -70,9 +70,9 @@ public class WorkerGroupChangeNotifier { listeners.add(listener); } - void detectWorkerGroupChanges() { + public synchronized void detectWorkerGroupChanges() { try { - MapComparator mapComparator = detectChangedWorkerGroups(); + final MapComparator mapComparator = detectChangedWorkerGroups(); triggerListeners(mapComparator); workerGroupMap = mapComparator.getNewMap(); } catch (Exception ex) { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java index 76be92e36c..25f42a8985 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java @@ -57,7 +57,7 @@ public class MasterConfig implements Validator { private MasterServerLoadProtection serverLoadProtection = new MasterServerLoadProtection(); - private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L); + private Duration workerGroupRefreshInterval = Duration.ofMinutes(5); private CommandFetchStrategy commandFetchStrategy = new CommandFetchStrategy(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterContainerService.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterContainerService.java new file mode 100644 index 0000000000..f42d93a731 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterContainerService.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.rpc; + +import org.apache.dolphinscheduler.extract.master.IMasterContainerService; +import org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier; + +import lombok.extern.slf4j.Slf4j; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class MasterContainerService implements IMasterContainerService { + + @Autowired + private WorkerGroupChangeNotifier workerGroupChangeNotifier; + + @Override + public void refreshWorkerGroup() { + workerGroupChangeNotifier.detectWorkerGroupChanges(); + } +} diff --git a/dolphinscheduler-master/src/main/resources/application.yaml b/dolphinscheduler-master/src/main/resources/application.yaml index 1233d99acc..bb402a1ed8 100644 --- a/dolphinscheduler-master/src/main/resources/application.yaml +++ b/dolphinscheduler-master/src/main/resources/application.yaml @@ -101,7 +101,7 @@ master: max-system-memory-usage-percentage-thresholds: 0.7 # Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow. max-disk-usage-percentage-thresholds: 0.7 - worker-group-refresh-interval: 10s + worker-group-refresh-interval: 5m command-fetch-strategy: type: ID_SLOT_BASED config: