From ed209bdf82d855a804b68d8833f20595e86bb069 Mon Sep 17 00:00:00 2001 From: Yann Ann Date: Wed, 26 Oct 2022 19:18:32 +0800 Subject: [PATCH] [Refactor] Migrate all workergroup-related interface functions from ProcessServiceImpl (#12493) * Migrate all workergroup-related interface functions from ProcessServiceImpl --- .../api/service/WorkerGroupService.java | 29 +- .../api/service/impl/ExecutorServiceImpl.java | 6 +- .../service/impl/WorkerGroupServiceImpl.java | 39 +++ .../api/service/ExecutorServiceTest.java | 5 +- .../api/service/WorkerGroupServiceTest.java | 313 ++++++++++++++---- .../service/process/ProcessService.java | 4 - .../service/process/ProcessServiceImpl.java | 37 --- 7 files changed, 320 insertions(+), 113 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java index b2f9d78178..2c87e4be2e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java @@ -18,8 +18,10 @@ package org.apache.dolphinscheduler.api.service; import org.apache.dolphinscheduler.api.utils.Result; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; +import java.util.List; import java.util.Map; /** @@ -28,7 +30,7 @@ import java.util.Map; public interface WorkerGroupService { /** - * create or update a worker group + * Create or update a worker group * * @param loginUser login user * @param id worker group id @@ -42,7 +44,7 @@ public interface WorkerGroupService { String otherParamsJson); /** - * query worker group paging + * Query worker group paging * * @param loginUser login user * @param pageNo page number @@ -53,25 +55,40 @@ public interface WorkerGroupService { Result queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal); /** - * query all worker group + * Query all worker group * - * @param loginUser + * @param loginUser login user * @return all worker group list */ Map queryAllGroup(User loginUser); /** - * delete worker group by id + * Delete worker group by id + * @param loginUser login user * @param id worker group id * @return delete result code */ Map deleteWorkerGroupById(User loginUser, Integer id); /** - * query all worker address list + * Query all worker address list * * @return all worker address list */ Map getWorkerAddressList(); + /** + * Get task instance's worker group + * @param taskInstance task instance + * @return worker group + */ + String getTaskWorkerGroup(TaskInstance taskInstance); + + /** + * Query worker group by process definition codes + * @param processDefinitionCodeList processDefinitionCodeList + * @return worker group map + */ + Map queryWorkerGroupByProcessDefinitionCodes(List processDefinitionCodeList); + } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java index 6681d12038..bc885f1b82 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.service.ExecutorService; import org.apache.dolphinscheduler.api.service.MonitorService; import org.apache.dolphinscheduler.api.service.ProjectService; +import org.apache.dolphinscheduler.api.service.WorkerGroupService; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ComplementDependentMode; @@ -150,6 +151,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ @Autowired private TaskGroupQueueMapper taskGroupQueueMapper; + @Autowired + private WorkerGroupService workerGroupService; + /** * execute process instance * @@ -1030,7 +1034,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ .collect(Collectors.toList()); Map processDefinitionWorkerGroupMap = - processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList); + workerGroupService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList); for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) { if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index 66b1f5203c..860c410b1f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -29,10 +29,14 @@ import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.Schedule; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; import org.apache.commons.collections.CollectionUtils; @@ -74,6 +78,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro @Autowired private RegistryClient registryClient; + @Autowired + private ProcessService processService; + + @Autowired + private ScheduleMapper scheduleMapper; + /** * create or update a worker group * @@ -354,4 +364,33 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro return result; } + @Override + public String getTaskWorkerGroup(TaskInstance taskInstance) { + if (taskInstance == null) { + return null; + } + + String workerGroup = taskInstance.getWorkerGroup(); + + if (StringUtils.isNotEmpty(workerGroup)) { + return workerGroup; + } + int processInstanceId = taskInstance.getProcessInstanceId(); + ProcessInstance processInstance = processService.findProcessInstanceById(processInstanceId); + + if (processInstance != null) { + return processInstance.getWorkerGroup(); + } + logger.info("task : {} will use default worker group", taskInstance.getId()); + return Constants.DEFAULT_WORKER_GROUP; + } + + @Override + public Map queryWorkerGroupByProcessDefinitionCodes(List processDefinitionCodeList) { + List processDefinitionScheduleList = + scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList); + return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode, + Schedule::getWorkerGroup)); + } + } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java index dd37a7cdd9..7b01b488c8 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java @@ -106,6 +106,9 @@ public class ExecutorServiceTest { @Mock private CommandService commandService; + @Mock + private WorkerGroupService workerGroupService; + @Mock private ProcessDefinitionMapper processDefinitionMapper; @@ -289,7 +292,7 @@ public class ExecutorServiceTest { Map processDefinitionWorkerGroupMap = new HashMap<>(); processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP); - Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L))) + Mockito.when(workerGroupService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L))) .thenReturn(processDefinitionWorkerGroupMap); Command command = new Command(); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index 168f4e78e2..987c718997 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -17,129 +17,314 @@ package org.apache.dolphinscheduler.api.service; -import org.apache.dolphinscheduler.api.ApiApplicationServer; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_CREATE; +import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_DELETE; +import static org.mockito.ArgumentMatchers.any; + import org.apache.dolphinscheduler.api.enums.Status; +import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService; +import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl; import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl; +import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.ProfileType; +import org.apache.dolphinscheduler.common.enums.AuthorizationType; +import org.apache.dolphinscheduler.common.enums.NodeType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper; +import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.registry.RegistryClient; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; import org.mockito.Mockito; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.test.context.ActiveProfiles; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -@ActiveProfiles(value = {ProfileType.H2}) -@SpringBootTest(classes = ApiApplicationServer.class) +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) public class WorkerGroupServiceTest { - @MockBean(name = "registryClient") - private RegistryClient registryClient; + private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceTest.class); + + private static final Logger baseServiceLogger = LoggerFactory.getLogger(BaseServiceImpl.class); - @Autowired + private static final Logger serviceLogger = LoggerFactory.getLogger(WorkerGroupService.class); + + @InjectMocks private WorkerGroupServiceImpl workerGroupService; - @MockBean(name = "workerGroupMapper") + @Mock private WorkerGroupMapper workerGroupMapper; - @MockBean(name = "processInstanceMapper") + @Mock private ProcessInstanceMapper processInstanceMapper; - private String groupName = "groupName000001"; + @Mock + private ProcessService processService; + + @Mock + private RegistryClient registryClient; + + @Mock + private ResourcePermissionCheckService resourcePermissionCheckService; + + private final String GROUP_NAME = "testWorkerGroup"; + + private User getLoginUser() { + User loginUser = new User(); + loginUser.setUserType(UserType.GENERAL_USER); + loginUser.setUserName("workerGroupTestUser"); + loginUser.setId(1); + return loginUser; + } + + @Test + public void giveNoPermission_whenSaveWorkerGroup_expectNoOperation() { + User loginUser = getLoginUser(); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(false); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + baseServiceLogger)).thenReturn(false); + Map result = + workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", ""); + Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(), + ((Status) result.get(Constants.STATUS)).getCode()); + } + + @Test + public void giveNullName_whenSaveWorkerGroup_expectNAME_NULL() { + User loginUser = getLoginUser(); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + baseServiceLogger)).thenReturn(true); + Map result = + workerGroupService.saveWorkerGroup(loginUser, 1, "", "localhost:0000", "test group", ""); + Assertions.assertEquals(Status.NAME_NULL.getCode(), + ((Status) result.get(Constants.STATUS)).getCode()); + } - private User loginUSer; + @Test + public void giveSameUserName_whenSaveWorkerGroup_expectNAME_EXIST() { + User loginUser = getLoginUser(); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + baseServiceLogger)).thenReturn(true); + Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null); + List workerGroupList = new ArrayList(); + workerGroupList.add(getWorkerGroup(1)); + Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(workerGroupList); + + Map result = + workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", ""); + Assertions.assertEquals(Status.NAME_EXIST.getCode(), + ((Status) result.get(Constants.STATUS)).getCode()); + } + + @Test + public void giveInvalidAddress_whenSaveWorkerGroup_expectADDRESS_INVALID() { + User loginUser = getLoginUser(); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + baseServiceLogger)).thenReturn(true); + Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null); + Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null); + String workerGroupPath = + Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + GROUP_NAME; + Mockito.when(registryClient.exists(workerGroupPath)).thenReturn(false); + Map serverMaps = new HashMap<>(); + serverMaps.put("localhost1:0000", ""); + Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps); + + Map result = + workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", ""); + Assertions.assertEquals(Status.WORKER_ADDRESS_INVALID.getCode(), + ((Status) result.get(Constants.STATUS)).getCode()); + } + + @Test + public void giveValidWorkerGroup_whenSaveWorkerGroup_expectSuccess() { + User loginUser = getLoginUser(); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + baseServiceLogger)).thenReturn(true); + + Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null); + Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null); + String workerGroupPath = + Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + GROUP_NAME; + Mockito.when(registryClient.exists(workerGroupPath)).thenReturn(false); + Map serverMaps = new HashMap<>(); + serverMaps.put("localhost:0000", ""); + Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps); + Mockito.when(workerGroupMapper.insert(any())).thenReturn(1); + + Map result = + workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", ""); + Assertions.assertEquals(Status.SUCCESS.getCode(), + ((Status) result.get(Constants.STATUS)).getCode()); + } + + @Test + public void giveValidParams_whenQueryAllGroupPaging_expectSuccess() { + User loginUser = getLoginUser(); + Set ids = new HashSet<>(); + ids.add(1); + List workerGroups = new ArrayList<>(); + workerGroups.add(getWorkerGroup(1)); + Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP, + loginUser.getId(), serviceLogger)).thenReturn(ids); + Mockito.when(workerGroupMapper.selectBatchIds(ids)).thenReturn(workerGroups); + Set activeWorkerNodes = new HashSet<>(); + activeWorkerNodes.add("localhost:12345"); + activeWorkerNodes.add("localhost:23456"); + Mockito.when(registryClient.getServerNodeSet(NodeType.WORKER)).thenReturn(activeWorkerNodes); - @BeforeEach - public void init() { - loginUSer = new User(); - loginUSer.setUserType(UserType.ADMIN_USER); + Result result = workerGroupService.queryAllGroupPaging(loginUser, 1, 1, null); + Assertions.assertEquals(result.getCode(), Status.SUCCESS.getCode()); } @Test public void testQueryAllGroup() { - Map result = workerGroupService.queryAllGroup(loginUSer); + Map result = workerGroupService.queryAllGroup(getLoginUser()); List workerGroups = (List) result.get(Constants.DATA_LIST); Assertions.assertEquals(workerGroups.size(), 1); } - /** - * delete group by id - */ @Test - public void testDeleteWorkerGroupById() { - User user = new User(); - user.setId(1); - user.setUserType(UserType.ADMIN_USER); - WorkerGroup wg2 = getWorkerGroup(2); - Mockito.when(workerGroupMapper.selectById(2)).thenReturn(wg2); - Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg2.getName(), - org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)) - .thenReturn(getProcessInstanceList()); - Map result = workerGroupService.deleteWorkerGroupById(user, 1); + public void giveNotExistsWorkerGroup_whenDeleteWorkerGroupById_expectNotExists() { + User loginUser = getLoginUser(); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + baseServiceLogger)).thenReturn(true); + Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null); + + Map notExistResult = workerGroupService.deleteWorkerGroupById(loginUser, 1); Assertions.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(), - ((Status) result.get(Constants.STATUS)).getCode()); - result = workerGroupService.deleteWorkerGroupById(user, 2); - Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(), - ((Status) result.get(Constants.STATUS)).getCode()); - // correct - WorkerGroup wg3 = getWorkerGroup(3); - Mockito.when(workerGroupMapper.selectById(3)).thenReturn(wg3); - Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg3.getName(), + ((Status) notExistResult.get(Constants.STATUS)).getCode()); + } + + @Test + public void giveRunningProcess_whenDeleteWorkerGroupById_expectFailed() { + User loginUser = getLoginUser(); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + baseServiceLogger)).thenReturn(true); + WorkerGroup workerGroup = getWorkerGroup(1); + Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup); + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(1); + List processInstances = new ArrayList(); + processInstances.add(processInstance); + Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)) - .thenReturn(new ArrayList<>()); - result = workerGroupService.deleteWorkerGroupById(user, 3); - Assertions.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG)); + .thenReturn(processInstances); + + Map deleteFailed = workerGroupService.deleteWorkerGroupById(loginUser, 1); + Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(), + ((Status) deleteFailed.get(Constants.STATUS)).getCode()); } - /** - * get processInstances - */ - private List getProcessInstanceList() { - List processInstances = new ArrayList<>(); - processInstances.add(new ProcessInstance()); - return processInstances; + @Test + public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() { + User loginUser = getLoginUser(); + Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true); + Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + baseServiceLogger)).thenReturn(true); + WorkerGroup workerGroup = getWorkerGroup(1); + Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup); + Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(), + org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)).thenReturn(null); + Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1); + Mockito.when(processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), "")) + .thenReturn(1); + + Map successResult = workerGroupService.deleteWorkerGroupById(loginUser, 1); + Assertions.assertEquals(Status.SUCCESS.getCode(), + ((Status) successResult.get(Constants.STATUS)).getCode()); } @Test public void testQueryAllGroupWithDefault() { - Map result = workerGroupService.queryAllGroup(loginUSer); + Map result = workerGroupService.queryAllGroup(getLoginUser()); List workerGroups = (List) result.get(Constants.DATA_LIST); Assertions.assertEquals(1, workerGroups.size()); Assertions.assertEquals("default", workerGroups.toArray()[0]); } + @Test + public void giveNull_whenGetTaskWorkerGroup_expectNull() { + String nullWorkerGroup = workerGroupService.getTaskWorkerGroup(null); + Assertions.assertNull(nullWorkerGroup); + } + + @Test + public void giveCorrectTaskInstance_whenGetTaskWorkerGroup_expectTaskWorkerGroup() { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setWorkerGroup("cluster1"); + + String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance); + Assertions.assertEquals("cluster1", workerGroup); + } + + @Test + public void giveNullWorkerGroup_whenGetTaskWorkerGroup_expectProcessWorkerGroup() { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setProcessInstanceId(1); + ProcessInstance processInstance = new ProcessInstance(); + processInstance.setId(1); + processInstance.setWorkerGroup("cluster1"); + Mockito.when(processService.findProcessInstanceById(1)).thenReturn(processInstance); + + String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance); + Assertions.assertEquals("cluster1", workerGroup); + } + + @Test + public void giveNullTaskAndProcessWorkerGroup_whenGetTaskWorkerGroup_expectDefault() { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setProcessInstanceId(1); + Mockito.when(processService.findProcessInstanceById(1)).thenReturn(null); + + String defaultWorkerGroup = workerGroupService.getTaskWorkerGroup(taskInstance); + Assertions.assertEquals(Constants.DEFAULT_WORKER_GROUP, defaultWorkerGroup); + } + /** * get Group */ private WorkerGroup getWorkerGroup(int id) { WorkerGroup workerGroup = new WorkerGroup(); - workerGroup.setName(groupName); + workerGroup.setName(GROUP_NAME); workerGroup.setId(id); return workerGroup; } - private WorkerGroup getWorkerGroup() { - return getWorkerGroup(1); - } - - private List getList() { - List list = new ArrayList<>(); - list.add(getWorkerGroup()); - return list; - } - } diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 76a6e6ea52..74989430e4 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -119,8 +119,6 @@ public interface ProcessService { List queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode); - Map queryWorkerGroupByProcessDefinitionCodes(List processDefinitionCodeList); - List queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode); List queryNeedFailoverProcessInstances(String host); @@ -150,8 +148,6 @@ public interface ProcessService { ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId); - String getTaskWorkerGroup(TaskInstance taskInstance); - List getProjectListHavePerm(int userId); List listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType); diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java index 5c6a149657..70d4c52504 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java @@ -1570,20 +1570,6 @@ public class ProcessServiceImpl implements ProcessService { return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode); } - /** - * query Schedule by processDefinitionCode - * - * @param processDefinitionCodeList processDefinitionCodeList - * @see Schedule - */ - @Override - public Map queryWorkerGroupByProcessDefinitionCodes(List processDefinitionCodeList) { - List processDefinitionScheduleList = - scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList); - return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode, - Schedule::getWorkerGroup)); - } - /** * query dependent process definition by process definition code * @@ -1797,29 +1783,6 @@ public class ProcessServiceImpl implements ProcessService { return projectMapper.queryProjectWithUserByProcessInstanceId(processInstanceId); } - /** - * get task worker group - * - * @param taskInstance taskInstance - * @return workerGroupId - */ - @Override - public String getTaskWorkerGroup(TaskInstance taskInstance) { - String workerGroup = taskInstance.getWorkerGroup(); - - if (!Strings.isNullOrEmpty(workerGroup)) { - return workerGroup; - } - int processInstanceId = taskInstance.getProcessInstanceId(); - ProcessInstance processInstance = findProcessInstanceById(processInstanceId); - - if (processInstance != null) { - return processInstance.getWorkerGroup(); - } - logger.info("task : {} will use default worker group", taskInstance.getId()); - return Constants.DEFAULT_WORKER_GROUP; - } - /** * get have perm project list *