Browse Source

[Improvement-16850][API] Broadcast to cluster when worker group changed (#16860)

dev
Wenjun Ruan 3 weeks ago committed by GitHub
parent
commit
84f6a0f070
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      docs/docs/en/guide/upgrade/incompatible.md
  2. 1
      docs/docs/zh/guide/upgrade/incompatible.md
  3. 8
      dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/WorkerGroupAPITest.java
  4. 4
      dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/pages/security/WorkerGroupPage.java
  5. 17
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java
  6. 5
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java
  7. 144
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java
  8. 125
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java
  9. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java
  10. 1
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  11. 1
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  12. 1
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  13. 19
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql
  14. 19
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql
  15. 1
      dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java
  16. 29
      dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterContainerService.java
  17. 4
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java
  18. 2
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java
  19. 39
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterContainerService.java
  20. 2
      dolphinscheduler-master/src/main/resources/application.yaml

1
docs/docs/en/guide/upgrade/incompatible.md

@ -35,4 +35,5 @@ This document records the incompatible updates between each version. You need to
* Remove the `Data Quality` module ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
* Remove the `registry-disconnect-strategy` in `application.yaml` ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* Remove `exec-threads` in worker's `application.yaml`, please use `physical-task-config`;Remove `master-async-task-executor-thread-pool-size` in master's `application.yaml`, please use `logic-task-config` ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
* Drop unused column `other_params_json` in `t_ds_worker_group` ([#16860])(https://github.com/apache/dolphinscheduler/pull/16860)

1
docs/docs/zh/guide/upgrade/incompatible.md

@ -33,4 +33,5 @@
* 移除 `数据质量` 模块 ([#16794])(https://github.com/apache/dolphinscheduler/pull/16794)
* 在`application.yaml`中移除`registry-disconnect-strategy`配置 ([#16821])(https://github.com/apache/dolphinscheduler/pull/16821)
* 在worker的`application.yaml`中移除`exec-threads`,使用`physical-task-config`替代;在master的`application.yaml`中移除`master-async-task-executor-thread-pool-size`使用`logic-task-config`替代 ([#16790])(https://github.com/apache/dolphinscheduler/pull/16790)
* 在`t_ds_worker_group` 表中移除 无用的`other_params_json`字段 ([#16860])(https://github.com/apache/dolphinscheduler/pull/16860)

8
dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/cases/WorkerGroupAPITest.java

@ -75,8 +75,12 @@ public class WorkerGroupAPITest {
@Test
@Order(1)
public void testSaveWorkerGroup() {
HttpResponse saveWorkerGroupHttpResponse = workerGroupPage
.saveWorkerGroup(loginUser, 1, "test_worker_group", "10.5.0.5:1234", "test", null);
HttpResponse saveWorkerGroupHttpResponse = workerGroupPage.saveWorkerGroup(
loginUser,
0,
"test_worker_group",
"10.5.0.5:1234",
"test");
Assertions.assertTrue(saveWorkerGroupHttpResponse.getBody().getSuccess());
HttpResponse queryAllWorkerGroupsResponse = workerGroupPage.queryAllWorkerGroups(loginUser);

4
dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/java/org/apache/dolphinscheduler/api/test/pages/security/WorkerGroupPage.java

@ -32,15 +32,13 @@ public class WorkerGroupPage {
private String sessionId;
public HttpResponse saveWorkerGroup(User loginUser, int id, String name, String addrList, String description,
String otherParamsJson) {
public HttpResponse saveWorkerGroup(User loginUser, int id, String name, String addrList, String description) {
Map<String, Object> params = new HashMap<>();
params.put("loginUser", loginUser);
params.put("id", id);
params.put("name", name);
params.put("addrList", addrList);
params.put("description", description);
params.put("otherParamsJson", otherParamsJson);
Map<String, String> headers = new HashMap<>();
headers.put(Constants.SESSION_ID_KEY, sessionId);

17
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/WorkerGroupController.java

@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.Map;
@ -51,16 +52,13 @@ import io.swagger.v3.oas.annotations.Parameters;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.tags.Tag;
/**
* worker group controller
*/
@Tag(name = "WORKER_GROUP_TAG")
@RestController
@RequestMapping("/worker-groups")
public class WorkerGroupController extends BaseController {
@Autowired
WorkerGroupService workerGroupService;
private WorkerGroupService workerGroupService;
/**
* create or update a worker group
@ -77,21 +75,18 @@ public class WorkerGroupController extends BaseController {
@Parameter(name = "name", description = "WORKER_GROUP_NAME", required = true, schema = @Schema(implementation = String.class)),
@Parameter(name = "addrList", description = "WORKER_ADDR_LIST", required = true, schema = @Schema(implementation = String.class)),
@Parameter(name = "description", description = "WORKER_DESC", required = false, schema = @Schema(implementation = String.class)),
@Parameter(name = "otherParamsJson", description = "WORKER_PARAMS_JSON", required = false, schema = @Schema(implementation = String.class)),
})
@PostMapping()
@ResponseStatus(HttpStatus.OK)
@ApiException(SAVE_ERROR)
@OperatorLog(auditType = AuditType.WORKER_GROUP_CREATE)
public Result saveWorkerGroup(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
public Result<WorkerGroup> saveWorkerGroup(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@RequestParam(value = "id", required = false, defaultValue = "0") int id,
@RequestParam(value = "name") String name,
@RequestParam(value = "addrList") String addrList,
@RequestParam(value = "description", required = false, defaultValue = "") String description,
@RequestParam(value = "otherParamsJson", required = false, defaultValue = "") String otherParamsJson) {
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description, otherParamsJson);
return returnDataList(result);
@RequestParam(value = "description", required = false, defaultValue = "") String description) {
final WorkerGroup workerGroup = workerGroupService.saveWorkerGroup(loginUser, id, name, addrList, description);
return Result.success(workerGroup);
}
/**

5
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/WorkerGroupService.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import java.util.List;
import java.util.Map;
@ -33,11 +34,9 @@ public interface WorkerGroupService {
* @param name worker group name
* @param addrList addr list
* @param description description
* @param otherParamsJson otherParamsJson
* @return create or update result code
*/
Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList, String description,
String otherParamsJson);
WorkerGroup saveWorkerGroup(User loginUser, int id, String name, String addrList, String description);
/**
* Query worker group paging

144
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java

@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationCon
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_DELETE;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.WorkerGroupService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
@ -28,6 +29,7 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.Schedule;
@ -42,9 +44,10 @@ import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.extract.base.client.Clients;
import org.apache.dolphinscheduler.extract.master.IMasterContainerService;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@ -55,14 +58,13 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -85,9 +87,6 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
@Autowired
private EnvironmentWorkerGroupRelationMapper environmentWorkerGroupRelationMapper;
@Autowired
private ProcessService processService;
@Autowired
private ScheduleMapper scheduleMapper;
@ -107,89 +106,53 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
* @return create or update result code
*/
@Override
@Transactional
public Map<String, Object> saveWorkerGroup(User loginUser, int id, String name, String addrList, String description,
String otherParamsJson) {
public WorkerGroup saveWorkerGroup(User loginUser,
int id,
String name,
String addrList,
String description) {
Map<String, Object> result = new HashMap<>();
if (!canOperatorPermissions(loginUser, null, AuthorizationType.WORKER_GROUP, WORKER_GROUP_CREATE)) {
putMsg(result, Status.USER_NO_OPERATION_PERM);
return result;
// todo: add permission exception
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}
if (StringUtils.isEmpty(name)) {
log.warn("Parameter name can ot be null.");
putMsg(result, Status.NAME_NULL);
return result;
}
Date now = new Date();
WorkerGroup workerGroup = null;
if (id != 0) {
workerGroup = workerGroupMapper.selectById(id);
if (Objects.nonNull(workerGroup) && !workerGroup.getName().equals(name)) {
if (checkWorkerGroupDependencies(workerGroup, result)) {
return result;
}
}
}
if (workerGroup == null) {
throw new ServiceException(Status.NAME_NULL);
}
checkWorkerGroupAddrList(addrList);
final Date now = new Date();
final WorkerGroup workerGroup;
try {
if (id == 0) {
// insert
workerGroup = new WorkerGroup();
workerGroup.setCreateTime(now);
}
workerGroup.setName(name);
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
workerGroup.setDescription(description);
if (checkWorkerGroupNameExists(workerGroup)) {
log.warn("Worker group with the same name already exists, name:{}.", workerGroup.getName());
putMsg(result, Status.NAME_EXIST, workerGroup.getName());
return result;
}
String invalidAddr = checkWorkerGroupAddrList(workerGroup);
if (invalidAddr != null) {
log.warn("Worker group address is invalid, invalidAddr:{}.", invalidAddr);
putMsg(result, Status.WORKER_ADDRESS_INVALID, invalidAddr);
return result;
}
handleDefaultWorkGroup(workerGroupMapper, workerGroup, loginUser, otherParamsJson);
log.info("Worker group save complete, workerGroupName:{}.", workerGroup.getName());
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, workerGroup);
return result;
}
protected void handleDefaultWorkGroup(WorkerGroupMapper workerGroupMapper, WorkerGroup workerGroup, User loginUser,
String otherParamsJson) {
if (workerGroup.getId() != null) {
workerGroupMapper.updateById(workerGroup);
} else {
workerGroupMapper.insert(workerGroup);
} else {
workerGroup = workerGroupMapper.selectById(id);
if (workerGroup == null) {
throw new ServiceException(Status.WORKER_GROUP_NOT_EXIST, id);
}
// todo: Can we update the worker name?
if (!workerGroup.getName().equals(name)) {
checkWorkerGroupDependencies(workerGroup, result);
}
/**
* check worker group name exists
*
* @param workerGroup worker group
* @return boolean
*/
private boolean checkWorkerGroupNameExists(WorkerGroup workerGroup) {
// check database
List<WorkerGroup> workerGroupList = workerGroupMapper.queryWorkerGroupByName(workerGroup.getName());
if (CollectionUtils.isNotEmpty(workerGroupList)) {
// create group, the same group name exists in the database
if (workerGroup.getId() == null) {
return true;
}
// update group, the database exists with the same group name except itself
Optional<WorkerGroup> sameNameWorkGroupOptional = workerGroupList.stream()
.filter(group -> !Objects.equals(group.getId(), workerGroup.getId())).findFirst();
if (sameNameWorkGroupOptional.isPresent()) {
return true;
workerGroup.setName(name);
workerGroup.setAddrList(addrList);
workerGroup.setUpdateTime(now);
workerGroup.setDescription(description);
workerGroupMapper.updateById(workerGroup);
log.info("Update worker group: {} success .", workerGroup);
}
boardCastToMasterThatWorkerGroupChanged();
return workerGroup;
} catch (DuplicateKeyException duplicateKeyException) {
throw new ServiceException(Status.NAME_EXIST, name);
}
return false;
}
/**
@ -240,23 +203,16 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
return false;
}
/**
* check worker group addr list
*
* @param workerGroup worker group
* @return boolean
*/
private String checkWorkerGroupAddrList(WorkerGroup workerGroup) {
if (Strings.isNullOrEmpty(workerGroup.getAddrList())) {
return null;
private void checkWorkerGroupAddrList(String workerGroupAddress) {
if (Strings.isNullOrEmpty(workerGroupAddress)) {
return;
}
Map<String, String> serverMaps = registryClient.getServerMaps(RegistryNodeType.WORKER);
for (String addr : workerGroup.getAddrList().split(Constants.COMMA)) {
for (String addr : workerGroupAddress.split(Constants.COMMA)) {
if (!serverMaps.containsKey(addr)) {
return addr;
throw new ServiceException(Status.WORKER_ADDRESS_INVALID);
}
}
return null;
}
/**
@ -438,4 +394,20 @@ public class WorkerGroupServiceImpl extends BaseServiceImpl implements WorkerGro
Schedule::getWorkerGroup));
}
private void boardCastToMasterThatWorkerGroupChanged() {
final List<Server> masters = registryClient.getServerList(RegistryNodeType.MASTER);
if (CollectionUtils.isEmpty(masters)) {
return;
}
for (Server master : masters) {
try {
Clients.withService(IMasterContainerService.class)
.withHost(master.getHost() + ":" + master.getPort())
.refreshWorkerGroup();
} catch (Exception e) {
log.error("Broadcast to master: {} that worker group changed failed", master, e);
}
}
}
}

125
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java

@ -17,9 +17,12 @@
package org.apache.dolphinscheduler.api.service;
import static org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThrow;
import static org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_CREATE;
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKER_GROUP_DELETE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
@ -40,7 +43,6 @@ import org.apache.dolphinscheduler.dao.mapper.WorkerGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper;
import org.apache.dolphinscheduler.registry.api.RegistryClient;
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.HashMap;
@ -60,6 +62,7 @@ import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DuplicateKeyException;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@ -78,9 +81,6 @@ public class WorkerGroupServiceTest {
@Mock
private WorkflowInstanceMapper workflowInstanceMapper;
@Mock
private ProcessService processService;
@Mock
private RegistryClient registryClient;
@ -109,85 +109,78 @@ public class WorkerGroupServiceTest {
@Test
public void giveNoPermission_whenSaveWorkerGroup_expectNoOperation() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(false);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(false);
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
Assertions.assertEquals(Status.USER_NO_OPERATION_PERM.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
assertThrowsServiceException(Status.USER_NO_OPERATION_PERM, () -> {
workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group");
});
}
@Test
public void giveNullName_whenSaveWorkerGroup_expectNAME_NULL() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, 1, "", "localhost:0000", "test group", "");
Assertions.assertEquals(Status.NAME_NULL.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
assertThrowsServiceException(Status.NAME_NULL, () -> {
workerGroupService.saveWorkerGroup(loginUser, 1, "", "localhost:0000", "test group");
});
}
@Test
public void giveSameUserName_whenSaveWorkerGroup_expectNAME_EXIST() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
List<WorkerGroup> workerGroupList = new ArrayList<WorkerGroup>();
workerGroupList.add(getWorkerGroup(1));
Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(workerGroupList);
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
Assertions.assertEquals(Status.NAME_EXIST.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("localhost:0000", "");
when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
when(workerGroupMapper.insert(Mockito.any())).thenThrow(DuplicateKeyException.class);
assertThrowsServiceException(Status.NAME_EXIST, () -> {
workerGroupService.saveWorkerGroup(loginUser, 0, GROUP_NAME, "localhost:0000", "test group");
});
}
@Test
public void giveInvalidAddress_whenSaveWorkerGroup_expectADDRESS_INVALID() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
when(workerGroupMapper.selectById(1)).thenReturn(null);
when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("localhost1:0000", "");
Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
Assertions.assertEquals(Status.WORKER_ADDRESS_INVALID.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
assertThrowsServiceException(Status.WORKER_ADDRESS_INVALID, () -> {
workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group");
});
}
@Test
public void giveValidWorkerGroup_whenSaveWorkerGroup_expectSuccess() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
WORKER_GROUP_CREATE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
Mockito.when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
when(workerGroupMapper.queryWorkerGroupByName(GROUP_NAME)).thenReturn(null);
Map<String, String> serverMaps = new HashMap<>();
serverMaps.put("localhost:0000", "");
Mockito.when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
Mockito.when(workerGroupMapper.insert(any())).thenReturn(1);
Map<String, Object> result =
workerGroupService.saveWorkerGroup(loginUser, 1, GROUP_NAME, "localhost:0000", "test group", "");
Assertions.assertEquals(Status.SUCCESS.getCode(),
((Status) result.get(Constants.STATUS)).getCode());
when(registryClient.getServerMaps(RegistryNodeType.WORKER)).thenReturn(serverMaps);
when(workerGroupMapper.insert(any())).thenReturn(1);
assertDoesNotThrow(() -> {
workerGroupService.saveWorkerGroup(loginUser, 0, GROUP_NAME, "localhost:0000", "test group");
});
}
@Test
@ -197,13 +190,13 @@ public class WorkerGroupServiceTest {
ids.add(1);
List<WorkerGroup> workerGroups = new ArrayList<>();
workerGroups.add(getWorkerGroup(1));
Mockito.when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP,
when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.WORKER_GROUP,
loginUser.getId(), serviceLogger)).thenReturn(ids);
Mockito.when(workerGroupMapper.selectBatchIds(ids)).thenReturn(workerGroups);
when(workerGroupMapper.selectBatchIds(ids)).thenReturn(workerGroups);
Set<String> activeWorkerNodes = new HashSet<>();
activeWorkerNodes.add("localhost:12345");
activeWorkerNodes.add("localhost:23456");
Mockito.when(registryClient.getServerNodeSet(RegistryNodeType.WORKER)).thenReturn(activeWorkerNodes);
when(registryClient.getServerNodeSet(RegistryNodeType.WORKER)).thenReturn(activeWorkerNodes);
Result result = workerGroupService.queryAllGroupPaging(loginUser, 1, 1, null);
Assertions.assertEquals(result.getCode(), Status.SUCCESS.getCode());
@ -219,11 +212,11 @@ public class WorkerGroupServiceTest {
@Test
public void giveNotExistsWorkerGroup_whenDeleteWorkerGroupById_expectNotExists() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(null);
when(workerGroupMapper.selectById(1)).thenReturn(null);
Map<String, Object> notExistResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.DELETE_WORKER_GROUP_NOT_EXIST.getCode(),
@ -233,17 +226,17 @@ public class WorkerGroupServiceTest {
@Test
public void giveRunningProcess_whenDeleteWorkerGroupById_expectFailed() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setId(1);
List<WorkflowInstance> workflowInstances = new ArrayList<WorkflowInstance>();
workflowInstances.add(workflowInstance);
Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
WorkflowExecutionStatus.getNotTerminalStatus()))
.thenReturn(workflowInstances);
@ -255,23 +248,23 @@ public class WorkerGroupServiceTest {
@Test
public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() {
User loginUser = getLoginUser();
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1);
Mockito.when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
Mockito.when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
when(workerGroupMapper.selectById(1)).thenReturn(workerGroup);
when(workflowInstanceMapper.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
WorkflowExecutionStatus.getNotTerminalStatus())).thenReturn(null);
Mockito.when(workerGroupMapper.deleteById(1)).thenReturn(1);
when(workerGroupMapper.deleteById(1)).thenReturn(1);
Mockito.when(environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName()))
when(environmentWorkerGroupRelationMapper.queryByWorkerGroupName(workerGroup.getName()))
.thenReturn(null);
Mockito.when(taskDefinitionMapper.selectList(Mockito.any())).thenReturn(null);
when(taskDefinitionMapper.selectList(Mockito.any())).thenReturn(null);
Mockito.when(scheduleMapper.selectList(Mockito.any())).thenReturn(null);
when(scheduleMapper.selectList(Mockito.any())).thenReturn(null);
Map<String, Object> successResult = workerGroupService.deleteWorkerGroupById(loginUser, 1);
Assertions.assertEquals(Status.SUCCESS.getCode(),

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkerGroup.java

@ -52,6 +52,4 @@ public class WorkerGroup {
@TableField(exist = false)
private boolean systemDefault;
private String otherParamsJson;
}

1
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -1015,7 +1015,6 @@ CREATE TABLE t_ds_worker_group
create_time datetime NULL DEFAULT NULL,
update_time datetime NULL DEFAULT NULL,
description text NULL DEFAULT NULL,
other_params_json text NULL DEFAULT NULL,
PRIMARY KEY (id),
UNIQUE KEY name_unique (name)
);

1
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -1012,7 +1012,6 @@ CREATE TABLE `t_ds_worker_group` (
`create_time` datetime NULL DEFAULT NULL COMMENT 'create time',
`update_time` datetime NULL DEFAULT NULL COMMENT 'update time',
`description` text NULL DEFAULT NULL COMMENT 'description',
`other_params_json` text NULL DEFAULT NULL COMMENT 'other params json',
PRIMARY KEY (`id`),
UNIQUE KEY `name_unique` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE = utf8_bin;

1
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -926,7 +926,6 @@ CREATE TABLE t_ds_worker_group (
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
description text DEFAULT NULL,
other_params_json text DEFAULT NULL,
PRIMARY KEY (id) ,
CONSTRAINT name_unique UNIQUE (name)
) ;

19
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/mysql/dolphinscheduler_ddl.sql

@ -225,3 +225,22 @@ DROP PROCEDURE drop_data_quality_tables;
ALTER TABLE `t_ds_workflow_definition` ADD KEY `idx_project_code` (`project_code`) USING BTREE;
ALTER TABLE `t_ds_workflow_definition_log` ADD KEY `idx_project_code` (`project_code`) USING BTREE;
-- drop_column_t_ds_worker_group other_params_json
DROP PROCEDURE if EXISTS drop_column_t_ds_worker_group_other_params_json;
delimiter d//
CREATE PROCEDURE drop_column_t_ds_worker_group_other_params_json()
BEGIN
IF EXISTS (SELECT 1 FROM information_schema.COLUMNS
WHERE TABLE_NAME='t_ds_worker_group'
AND TABLE_SCHEMA=(SELECT DATABASE())
AND COLUMN_NAME ='other_params_json')
THEN
ALTER TABLE `t_ds_worker_group`
DROP COLUMN `other_params_json`;
END IF;
END;
d//
delimiter ;
CALL drop_column_t_ds_worker_group_other_params_json;
DROP PROCEDURE drop_column_t_ds_worker_group_other_params_json;

19
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.3.0_schema/postgresql/dolphinscheduler_ddl.sql

@ -228,3 +228,22 @@ DROP FUNCTION IF EXISTS drop_data_quality_tables();
create index workflow_definition_index_project_code on t_ds_workflow_definition (project_code);
create index workflow_definition_log_index_project_code on t_ds_workflow_definition_log (project_code);
-- drop_column_t_ds_worker_group other_params_json
delimiter d//
CREATE OR REPLACE FUNCTION drop_column_t_ds_worker_group_other_params_json() RETURNS void AS $$
BEGIN
IF EXISTS (SELECT 1
FROM information_schema.columns
WHERE table_name = 't_ds_worker_group'
AND column_name = 'other_params_json')
THEN
ALTER TABLE t_ds_worker_group
DROP COLUMN "other_params_json";
END IF;
END;
$$ LANGUAGE plpgsql;
d//
select drop_column_t_ds_worker_group_other_params_json();
DROP FUNCTION IF EXISTS drop_column_t_ds_worker_group_other_params_json();

1
dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java

@ -65,7 +65,6 @@ public class WorkerGroupMapperTest extends BaseDaoTest {
workerGroup.setCreateTime(new Date());
workerGroup.setUpdateTime(new Date());
workerGroup.setSystemDefault(true);
workerGroup.setOtherParamsJson("");
workerGroup.setAddrList("localhost");
workerGroupMapper.insert(workerGroup);
return workerGroup;

29
dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/IMasterContainerService.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.extract.master;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.RpcService;
@RpcService
public interface IMasterContainerService {
@RpcMethod
void refreshWorkerGroup();
}

4
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerGroupChangeNotifier.java

@ -70,9 +70,9 @@ public class WorkerGroupChangeNotifier {
listeners.add(listener);
}
void detectWorkerGroupChanges() {
public synchronized void detectWorkerGroupChanges() {
try {
MapComparator<String, WorkerGroup> mapComparator = detectChangedWorkerGroups();
final MapComparator<String, WorkerGroup> mapComparator = detectChangedWorkerGroups();
triggerListeners(mapComparator);
workerGroupMap = mapComparator.getNewMap();
} catch (Exception ex) {

2
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/config/MasterConfig.java

@ -57,7 +57,7 @@ public class MasterConfig implements Validator {
private MasterServerLoadProtection serverLoadProtection = new MasterServerLoadProtection();
private Duration workerGroupRefreshInterval = Duration.ofSeconds(10L);
private Duration workerGroupRefreshInterval = Duration.ofMinutes(5);
private CommandFetchStrategy commandFetchStrategy = new CommandFetchStrategy();

39
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/MasterContainerService.java

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.rpc;
import org.apache.dolphinscheduler.extract.master.IMasterContainerService;
import org.apache.dolphinscheduler.server.master.cluster.WorkerGroupChangeNotifier;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MasterContainerService implements IMasterContainerService {
@Autowired
private WorkerGroupChangeNotifier workerGroupChangeNotifier;
@Override
public void refreshWorkerGroup() {
workerGroupChangeNotifier.detectWorkerGroupChanges();
}
}

2
dolphinscheduler-master/src/main/resources/application.yaml

@ -101,7 +101,7 @@ master:
max-system-memory-usage-percentage-thresholds: 0.7
# Master max disk usage , when the master's disk usage is smaller then this value, master server can execute workflow.
max-disk-usage-percentage-thresholds: 0.7
worker-group-refresh-interval: 10s
worker-group-refresh-interval: 5m
command-fetch-strategy:
type: ID_SLOT_BASED
config:

Loading…
Cancel
Save