Browse Source

[Improvement][API] Support to check if the worker group has been used by any tasks or schedulers when users delete or rename it. (#14893)

3.2.1-prepare
calvin 10 months ago committed by GitHub
parent
commit
a070aa93a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 75
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  3. 17
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

15
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -568,7 +568,20 @@ public enum Status {
SCHEDULE_TIME_NUMBER_EXCEED(1400003, "The number of complement dates exceed 100.", "补数日期个数超过100"),
DESCRIPTION_TOO_LONG_ERROR(1400004, "description is too long error", "描述过长"),
DELETE_WORKER_GROUP_BY_ID_FAIL_ENV(1400005,
"delete worker group fail, for there are [{0}] enviroments using:{1}", "删除工作组失败,有 [{0}] 个环境正在使用:{1}");
"delete worker group fail, for there are [{0}] enviroments using:{1}", "删除工作组失败,有 [{0}] 个环境正在使用:{1}"),
WORKER_GROUP_DEPENDENT_TASK_EXISTS(1401000,
"You can not modify or remove this worker group, cause it has [{0}] dependent tasks like :{1}",
"不能修改或删除该Worker组,有 [{0}] 个任务正在使用:{1}"),
WORKER_GROUP_DEPENDENT_SCHEDULER_EXISTS(1401001,
"You can not modify or remove this worker group, cause it has [{0}] dependent workflow timings like :{1}",
"不能修改或删除该Worker组,有 [{0}] 个工作流定时正在使用:{1}"),
WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS(1401002,
"You can not modify or remove this worker group, cause it has [{0}] dependent environments.",
"不能修改或删除该Worker组,有 [{0}] 个环境配置正在使用"),
;
private final int code;
private final String enMsg;
private final String zhMsg;

75
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -27,15 +27,19 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@ -61,6 +65,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.facebook.presto.jdbc.internal.guava.base.Strings;
/**
@ -88,6 +93,12 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Autowired
private ScheduleMapper scheduleMapper;
@Autowired
private TaskDefinitionMapper taskDefinitionMapper;
@Autowired
private ProcessDefinitionMapper processDefinitionMapper;
/**
* create or update a worker group
*
@ -115,11 +126,17 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
WorkerGroup workerGroup = null;
if (id != 0) {
workerGroup = workerGroupMapper.selectById(id);
if (Objects.nonNull(workerGroup) && !workerGroup.getName().equals(name)) {
if (checkWorkerGroupDependencies(workerGroup, result)) {
return result;
}
}
}
if (workerGroup == null) {
workerGroup = new WorkerGroup();
workerGroup.setCreateTime(now);
}
workerGroup.setName(name);
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
@ -136,6 +153,7 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr);
return result;
}
handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson);
log.info("Worker group save complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
@ -177,6 +195,53 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
return false;
}
/**
* check if the worker group has any dependent tasks,schedulers or environments.
*
* @param workerGroup worker group
* @return boolean
*/
private boolean checkWorkerGroupDependencies(WorkerGroup workerGroup, Map<String, Object> result) {
// check if the worker group has any dependent tasks
List<TaskDefinition> taskDefinitions = taskDefinitionMapper.selectList(
new QueryWrapper<TaskDefinition>().lambda().eq(TaskDefinition::getWorkerGroup, workerGroup.getName()));
if (CollectionUtils.isNotEmpty(taskDefinitions)) {
List<String> taskNames = taskDefinitions.stream().limit(3).map(taskDefinition -> taskDefinition.getName())
.collect(Collectors.toList());
putMsg(result, Status.WORKER_GROUP_DEPENDENT_TASK_EXISTS, taskDefinitions.size(),
JSONUtils.toJsonString(taskNames));
return true;
}
// check if the worker group has any dependent schedulers
List<Schedule> schedules = scheduleMapper
.selectList(new QueryWrapper<Schedule>().lambda().eq(Schedule::getWorkerGroup, workerGroup.getName()));
if (CollectionUtils.isNotEmpty(schedules)) {
List<String> processNames = schedules.stream().limit(3)
.map(schedule -> processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode()).getName())
.collect(Collectors.toList());
putMsg(result, Status.WORKER_GROUP_DEPENDENT_SCHEDULER_EXISTS, schedules.size(),
JSONUtils.toJsonString(processNames));
return true;
}
// check if the worker group has any dependent environments
List<EnvironmentWorkerGroupRelation> environmentWorkerGroupRelations =
environmentWorkerGroupRelationMapper.selectList(new QueryWrapper<EnvironmentWorkerGroupRelation>()
.lambda().eq(EnvironmentWorkerGroupRelation::getWorkerGroup, workerGroup.getName()));
if (CollectionUtils.isNotEmpty(environmentWorkerGroupRelations)) {
putMsg(result, Status.WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS, environmentWorkerGroupRelations.size());
return true;
}
return false;
}
/**
* check worker group addr list
*
@ -341,15 +406,13 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL, processInstances.size());
return result;
}
List<EnvironmentWorkerGroupRelation> environmentWorkerGroupRelationList =
environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName());
if (CollectionUtils.isNotEmpty(environmentWorkerGroupRelationList)) {
putMsg(result, Status.DELETE_WORKER_GROUP_BY_ID_FAIL_ENV, environmentWorkerGroupRelationList.size(),
workerGroup.getName());
if (checkWorkerGroupDependencies(workerGroup, result)) {
return result;
}
workerGroupMapper.deleteById(id);
processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), "");
log.info("Delete worker group complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
return result;

17
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

@ -35,6 +35,8 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
@ -90,6 +92,12 @@ public class WorkerGroupServiceTest {
@Mock
private EnvironmentWorkerGroupRelationMapper environmentWorkerGroupRelationMapper;
@Mock
private TaskDefinitionMapper taskDefinitionMapper;
@Mock
private ScheduleMapper scheduleMapper;
private final String GROUP_NAME = "testWorkerGroup";
private User getLoginUser() {
@ -257,11 +265,16 @@ public class WorkerGroupServiceTest {
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
Mockito.when(processInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
org.apache.dolphinscheduler.service.utils.Constants.NOT_TERMINATED_STATES)).thenReturn(null);
Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1);
Mockito.when(processInstanceMapper.updateProcessInstanceByWorkerGroupName(workerGroup.getName(), ""))
.thenReturn(1);
Mockito.when(environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName()))
.thenReturn(null);
Mockito.when(taskDefinitionMapper.selectList(Mockito.any())).thenReturn(null);
Mockito.when(scheduleMapper.selectList(Mockito.any())).thenReturn(null);
Map<String, Object> successResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(),
((Status) successResult.get(Constants.STATUS)).getCode());

Loading…
Cancel
Save