Browse Source

[Refactor] Migrate all workergroup-related interface functions from ProcessServiceImpl (#12493)

* Migrate all workergroup-related interface functions from ProcessServiceImpl
3.2.0-release
Yann Ann 2 years ago committed by GitHub
parent
commit
ed209bdf82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
  2. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  3. 39
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  4. 5
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
  5. 313
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
  6. 4
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
  7. 37
      dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

29
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java

@ -18,8 +18,10 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import java.util.List;
import java.util.Map;
/**
@ -28,7 +30,7 @@ import java.util.Map;
public interface WorkerGroupService {
/**
* create or update a worker group
* Create or update a worker group
*
* @param loginUser login user
* @param id worker group id
@ -42,7 +44,7 @@ public interface WorkerGroupService {
String otherParamsJson);
/**
* query worker group paging
* Query worker group paging
*
* @param loginUser login user
* @param pageNo page number
@ -53,25 +55,40 @@ public interface WorkerGroupService {
Result queryAllGroupPaging(User loginUser, Integer pageNo, Integer pageSize, String searchVal);
/**
* query all worker group
* Query all worker group
*
* @param loginUser
* @param loginUser login user
* @return all worker group list
*/
Map<String, Object> queryAllGroup(User loginUser);
/**
* delete worker group by id
* Delete worker group by id
* @param loginUser login user
* @param id worker group id
* @return delete result code
*/
Map<String, Object> deleteWorkerGroupById(User loginUser, Integer id);
/**
* query all worker address list
* Query all worker address list
*
* @return all worker address list
*/
Map<String, Object> getWorkerAddressList();
/**
* Get task instance's worker group
* @param taskInstance task instance
* @return worker group
*/
String getTaskWorkerGroup(TaskInstance taskInstance);
/**
* Query worker group by process definition codes
* @param processDefinitionCodeList processDefinitionCodeList
* @return worker group map
*/
Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList);
}

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.api.service.MonitorService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
@ -150,6 +151,9 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
@Autowired
private TaskGroupQueueMapper taskGroupQueueMapper;
@Autowired
private WorkerGroupService workerGroupService;
/**
* execute process instance
*
@ -1030,7 +1034,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
.collect(Collectors.toList());
Map<Long, String> processDefinitionWorkerGroupMap =
processService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
workerGroupService.queryWorkerGroupByProcessDefinitionCodes(processDefinitionCodeList);
for (DependentProcessDefinition dependentProcessDefinition : dependentProcessDefinitionList) {
if (dependentProcessDefinition.getDependentCycle(upstreamProcessDefinitionCode) == processDefinitionCycle) {

39
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -29,10 +29,14 @@ import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import org.apache.commons.collections.CollectionUtils;
@ -74,6 +78,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Autowired
private RegistryClient registryClient;
@Autowired
private ProcessService processService;
@Autowired
private ScheduleMapper scheduleMapper;
/**
* create or update a worker group
*
@ -354,4 +364,33 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
return result;
}
@Override
public String getTaskWorkerGroup(TaskInstance taskInstance) {
if (taskInstance == null) {
return null;
}
String workerGroup = taskInstance.getWorkerGroup();
if (StringUtils.isNotEmpty(workerGroup)) {
return workerGroup;
}
int processInstanceId = taskInstance.getProcessInstanceId();
ProcessInstance processInstance = processService.findProcessInstanceById(processInstanceId);
if (processInstance != null) {
return processInstance.getWorkerGroup();
}
logger.info("task : {} will use default worker group", taskInstance.getId());
return Constants.DEFAULT_WORKER_GROUP;
}
@Override
public Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
List<Schedule> processDefinitionScheduleList =
scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
Schedule::getWorkerGroup));
}
}

5
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java

@ -106,6 +106,9 @@ public class ExecutorServiceTest {
@Mock
private CommandService commandService;
@Mock
private WorkerGroupService workerGroupService;
@Mock
private ProcessDefinitionMapper processDefinitionMapper;
@ -289,7 +292,7 @@ public class ExecutorServiceTest {
Map<Long, String> processDefinitionWorkerGroupMap = new HashMap<>();
processDefinitionWorkerGroupMap.put(1L, Constants.DEFAULT_WORKER_GROUP);
Mockito.when(processService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
Mockito.when(workerGroupService.queryWorkerGroupByProcessDefinitionCodes(Lists.newArrayList(1L)))
.thenReturn(processDefinitionWorkerGroupMap);
Command command = new Command();

313
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

@ -17,129 +17,314 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.ApiApplicationServer;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_DELETE;
import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.WorkerGroupServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ProfileType;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.registry.RegistryClient;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.ActiveProfiles;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ActiveProfiles(value = {ProfileType.H2})
@SpringBootTest(classes = ApiApplicationServer.class)
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class WorkerGroupServiceTest {
@MockBean(name = "registryClient")
private RegistryClient registryClient;
private static final Logger logger = LoggerFactory.getLogger(WorkerGroupServiceTest.class);
private static final Logger baseServiceLogger = LoggerFactory.getLogger(BaseServiceImpl.class);
@Autowired
private static final Logger serviceLogger = LoggerFactory.getLogger(WorkerGroupService.class);
@InjectMocks
private WorkerGroupServiceImpl workerGroupService;
@MockBean(name = "workerGroupMapper")
@Mock
private WorkerGroupMapper workerGroupMapper;
@MockBean(name = "processInstanceMapper")
@Mock
private ProcessInstanceMapper processInstanceMapper;
private String groupName = "groupName000001";
@Mock
private ProcessService processService;
@Mock
private RegistryClient registryClient;
@Mock
private ResourcePermissionCheckService resourcePermissionCheckService;
private final String GROUP_NAME = "testWorkerGroup";
private User getLoginUser() {
User loginUser = new User();
loginUser.setUserType(UserType.GENERAL_USER);
loginUser.setUserName("workerGroupTestUser");
loginUser.setId(1);
return loginUser;
}
@Test
public void giveNoPermission_whenSaveWorkerGroup_expectNoOperation() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(false);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(false);
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
}
@Test
public void giveNullName_whenSaveWorkerGroup_expectNAME_NULL() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, 1, "", "localhost:0000", "test group", "");
Assertions.assertEquals(Status.NAME_NULL.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
}
private User loginUSer;
@Test
public void giveSameUserName_whenSaveWorkerGroup_expectNAME_EXIST() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
List<WorkerGroup> workerGroupList = new ArrayList<WorkerGroup>();
workerGroupList.add(getWorkerGroup(1));
Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(workerGroupList);
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
Assertions.assertEquals(Status.NAME_EXIST.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
}
@Test
public void giveInvalidAddress_whenSaveWorkerGroup_expectADDRESS_INVALID() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
String workerGroupPath =
Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + GROUP_NAME;
Mockito.when(registryClient.exists(workerGroupPath)).thenReturn(false);
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("localhost1:0000", "");
Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
Assertions.assertEquals(Status.WORKER_ADDRESS_INVALID.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
}
@Test
public void giveValidWorkerGroup_whenSaveWorkerGroup_expectSuccess() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
String workerGroupPath =
Constants.REGISTRY_DOLPHINSCHEDULER_WORKERS + Constants.SINGLE_SLASH + GROUP_NAME;
Mockito.when(registryClient.exists(workerGroupPath)).thenReturn(false);
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("localhost:0000", "");
Mockito.when(registryClient.getServerMaps(NodeType.WORKER)).thenReturn(serverMaps);
Mockito.when(workerGroupMapper.insert(any())).thenReturn(1);
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
Assertions.assertEquals(Status.SUCCESS.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
}
@Test
public void giveValidParams_whenQueryAllGroupPaging_expectSuccess() {
User loginUser = getLoginUser();
Set<Integer> ids = new HashSet<>();
ids.add(1);
List<WorkerGroup> workerGroups = new ArrayList<>();
workerGroups.add(getWorkerGroup(1));
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP,
loginUser.getId(), serviceLogger)).thenReturn(ids);
Mockito.when(workerGroupMapper.selectBatchIds(ids)).thenReturn(workerGroups);
Set<String> activeWorkerNodes = new HashSet<>();
activeWorkerNodes.add("localhost:12345");
activeWorkerNodes.add("localhost:23456");
Mockito.when(registryClient.getServerNodeSet(NodeType.WORKER)).thenReturn(activeWorkerNodes);
@BeforeEach
public void init() {
loginUSer = new User();
loginUSer.setUserType(UserType.ADMIN_USER);
Result result = workerGroupService.queryAllGroupPaging(loginUser, 1, 1, null);
Assertions.assertEquals(result.getCode(), Status.SUCCESS.getCode());
}
@Test
public void testQueryAllGroup() {
Map<String, Object> result = workerGroupService.queryAllGroup(loginUSer);
Map<String, Object> result = workerGroupService.queryAllGroup(getLoginUser());
List<String> workerGroups = (List<String>) result.get(Constants.DATA_LIST);
Assertions.assertEquals(workerGroups.size(), 1);
}
/**
* delete group by id
*/
@Test
public void testDeleteWorkerGroupById() {
User user = new User();
user.setId(1);
user.setUserType(UserType.ADMIN_USER);
WorkerGroup wg2 = getWorkerGroup(2);
Mockito.when(workerGroupMapper.selectById(2)).thenReturn(wg2);
Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg2.getName(),
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES))
.thenReturn(getProcessInstanceList());
Map<String, Object> result = workerGroupService.deleteWorkerGroupById(user, 1);
public void giveNotExistsWorkerGroup_whenDeleteWorkerGroupById_expectNotExists() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
Map<String, Object> notExistResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
result = workerGroupService.deleteWorkerGroupById(user, 2);
Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
// correct
WorkerGroup wg3 = getWorkerGroup(3);
Mockito.when(workerGroupMapper.selectById(3)).thenReturn(wg3);
Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(wg3.getName(),
((Status) notExistResult.get(Constants.STATUS)).getCode());
}
@Test
public void giveRunningProcess_whenDeleteWorkerGroupById_expectFailed() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
List<ProcessInstance> processInstances = new ArrayList<ProcessInstance>();
processInstances.add(processInstance);
Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES))
.thenReturn(new ArrayList<>());
result = workerGroupService.deleteWorkerGroupById(user, 3);
Assertions.assertEquals(Status.SUCCESS.getMsg(), result.get(Constants.MSG));
.thenReturn(processInstances);
Map<String, Object> deleteFailed = workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.DELETE_WORKER_GROUP_BY_ID_FAIL.getCode(),
((Status) deleteFailed.get(Constants.STATUS)).getCode());
}
/**
* get processInstances
*/
private List<ProcessInstance> getProcessInstanceList() {
List<ProcessInstance> processInstances = new ArrayList<>();
processInstances.add(new ProcessInstance());
return processInstances;
@Test
public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)).thenReturn(null);
Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1);
Mockito.when(processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), ""))
.thenReturn(1);
Map<String, Object> successResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(),
((Status) successResult.get(Constants.STATUS)).getCode());
}
@Test
public void testQueryAllGroupWithDefault() {
Map<String, Object> result = workerGroupService.queryAllGroup(loginUSer);
Map<String, Object> result = workerGroupService.queryAllGroup(getLoginUser());
List<String> workerGroups = (List<String>) result.get(Constants.DATA_LIST);
Assertions.assertEquals(1, workerGroups.size());
Assertions.assertEquals("default", workerGroups.toArray()[0]);
}
@Test
public void giveNull_whenGetTaskWorkerGroup_expectNull() {
String nullWorkerGroup = workerGroupService.getTaskWorkerGroup(null);
Assertions.assertNull(nullWorkerGroup);
}
@Test
public void giveCorrectTaskInstance_whenGetTaskWorkerGroup_expectTaskWorkerGroup() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setWorkerGroup("cluster1");
String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
Assertions.assertEquals("cluster1", workerGroup);
}
@Test
public void giveNullWorkerGroup_whenGetTaskWorkerGroup_expectProcessWorkerGroup() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setProcessInstanceId(1);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
processInstance.setWorkerGroup("cluster1");
Mockito.when(processService.findProcessInstanceById(1)).thenReturn(processInstance);
String workerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
Assertions.assertEquals("cluster1", workerGroup);
}
@Test
public void giveNullTaskAndProcessWorkerGroup_whenGetTaskWorkerGroup_expectDefault() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setProcessInstanceId(1);
Mockito.when(processService.findProcessInstanceById(1)).thenReturn(null);
String defaultWorkerGroup = workerGroupService.getTaskWorkerGroup(taskInstance);
Assertions.assertEquals(Constants.DEFAULT_WORKER_GROUP, defaultWorkerGroup);
}
/**
* get Group
*/
private WorkerGroup getWorkerGroup(int id) {
WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setName(groupName);
workerGroup.setName(GROUP_NAME);
workerGroup.setId(id);
return workerGroup;
}
private WorkerGroup getWorkerGroup() {
return getWorkerGroup(1);
}
private List<WorkerGroup> getList() {
List<WorkerGroup> list = new ArrayList<>();
list.add(getWorkerGroup());
return list;
}
}

4
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java

@ -119,8 +119,6 @@ public interface ProcessService {
List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode);
Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList);
List<DependentProcessDefinition> queryDependentProcessDefinitionByProcessDefinitionCode(long processDefinitionCode);
List<ProcessInstance> queryNeedFailoverProcessInstances(String host);
@ -150,8 +148,6 @@ public interface ProcessService {
ProjectUser queryProjectWithUserByProcessInstanceId(int processInstanceId);
String getTaskWorkerGroup(TaskInstance taskInstance);
List<Project> getProjectListHavePerm(int userId);
<T> List<T> listUnauthorized(int userId, T[] needChecks, AuthorizationType authorizationType);

37
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessServiceImpl.java

@ -1570,20 +1570,6 @@ public class ProcessServiceImpl implements ProcessService {
return scheduleMapper.queryReleaseSchedulerListByProcessDefinitionCode(processDefinitionCode);
}
/**
* query Schedule by processDefinitionCode
*
* @param processDefinitionCodeList processDefinitionCodeList
* @see Schedule
*/
@Override
public Map<Long, String> queryWorkerGroupByProcessDefinitionCodes(List<Long> processDefinitionCodeList) {
List<Schedule> processDefinitionScheduleList =
scheduleMapper.querySchedulesByProcessDefinitionCodes(processDefinitionCodeList);
return processDefinitionScheduleList.stream().collect(Collectors.toMap(Schedule::getProcessDefinitionCode,
Schedule::getWorkerGroup));
}
/**
* query dependent process definition by process definition code
*
@ -1797,29 +1783,6 @@ public class ProcessServiceImpl implements ProcessService {
return projectMapper.queryProjectWithUserByProcessInstanceId(processInstanceId);
}
/**
* get task worker group
*
* @param taskInstance taskInstance
* @return workerGroupId
*/
@Override
public String getTaskWorkerGroup(TaskInstance taskInstance) {
String workerGroup = taskInstance.getWorkerGroup();
if (!Strings.isNullOrEmpty(workerGroup)) {
return workerGroup;
}
int processInstanceId = taskInstance.getProcessInstanceId();
ProcessInstance processInstance = findProcessInstanceById(processInstanceId);
if (processInstance != null) {
return processInstance.getWorkerGroup();
}
logger.info("task : {} will use default worker group", taskInstance.getId());
return Constants.DEFAULT_WORKER_GROUP;
}
/**
* get have perm project list
*

Loading…
Cancel
Save