Browse Source

Use bootstrap user as the default tenant (#13825)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
9df89cf0cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      docs/docs/en/guide/start/quick-start.md
  2. 4
      docs/docs/zh/guide/start/quick-start.md
  3. 28
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
  4. 133
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  5. 90
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  6. 25
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/TenantConstants.java
  7. 6
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql
  8. 7
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql
  9. 8
      dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql
  10. 19
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_dml.sql
  11. 2
      dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_dml.sql
  12. 37
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  13. 1
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java
  14. 2
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java
  15. 30
      dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java
  16. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  17. 1
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
  18. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java
  19. 27
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

2
docs/docs/en/guide/start/quick-start.md

@ -43,6 +43,8 @@ The brief is as follows:
We can create a tenant in DolphinScheduler `Security -> Tenant Manage` page.
> NOTE: The user will bind to a default tenant when it is created, if you use the default tenant, the task will be executed by worker's bootstrap user.
![create-tenant](../../../../img/start/create-tenant.gif)
#### Assign Tenant to User

4
docs/docs/zh/guide/start/quick-start.md

@ -33,7 +33,9 @@ Tenant是使用DolphinScheduler时绕不开的一个概念,所以先简单介
我们可以在 DolphinScheduler `Security -> Tenant Manage` 页面创建租户。
![create-tenant](../../../../img/start/create-tenant.gif)
> ![create-tenant](../../../../img/start/create-tenant.gif)
>
> 注意:如果没有关联租户,则会使用默认租户,默认租户为default,会使用程序启动用户执行任务。
#### 将租户分配给用户

28
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java

@ -70,7 +70,6 @@ import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
@ -241,13 +240,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
processDefinition.getVersion());
// check current version whether include startNodeList
checkStartNodeList(startNodeList, processDefinitionCode, processDefinition.getVersion());
if (!checkTenantSuitable(processDefinition)) {
log.error(
"There is not any valid tenant for the process definition, processDefinitionCode:{}, processDefinitionName:{}.",
processDefinition.getCode(), processDefinition.getName());
putMsg(result, Status.TENANT_NOT_SUITABLE);
return result;
}
checkScheduleTimeNumExceed(commandType, cronTime);
checkMasterExists();
@ -466,14 +458,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
this.checkProcessDefinitionValid(projectCode, processDefinition, processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (!checkTenantSuitable(processDefinition)) {
log.error(
"There is not any valid tenant for the process definition, processDefinitionId:{}, processDefinitionCode:{}, ",
processDefinition.getId(), processDefinition.getName());
putMsg(response, Status.TENANT_NOT_SUITABLE);
return response;
}
// get the startParams user specified at the first starting while repeat running is needed
long startNodeListLong;
@ -554,18 +538,6 @@ public class ExecutorServiceImpl extends BaseServiceImpl implements ExecutorServ
return forceStart(processInstance, taskGroupQueue);
}
/**
* check tenant suitable
*
* @param processDefinition process definition
* @return true if tenant suitable, otherwise return false
*/
private boolean checkTenantSuitable(ProcessDefinition processDefinition) {
Tenant tenant =
processService.getTenantForProcess(processDefinition.getTenantId(), processDefinition.getUserId());
return tenant != null;
}
public void checkStartNodeList(String startNodeList, Long processDefinitionCode, int version) {
if (StringUtils.isNotEmpty(startNodeList)) {
List<ProcessTaskRelation> processTaskRelations =

133
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -161,14 +161,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
if (!isUserTenantValid(isAdmin(loginUser), tenantCode, "")) {
log.error("current user does not have permission");
@ -240,14 +233,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
if (!isUserTenantValid(isAdmin(loginUser), tenantCode, "")) {
log.error("current user does not have permission");
@ -378,14 +364,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
if (!isUserTenantValid(isAdmin(loginUser), tenantCode, resTenantCode)) {
log.error("current user does not have permission");
@ -594,14 +573,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
if (!isUserTenantValid(isAdmin(loginUser), tenantCode, resTenantCode)) {
log.error("current user does not have permission");
@ -619,7 +591,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
List<User> userList = userMapper.selectList(null);
Set<String> visitedTenantEntityCode = new HashSet<>();
for (User userEntity : userList) {
String tenantEntityCode = tenantMapper.queryById(userEntity.getTenantId()).getTenantCode();
String tenantEntityCode = getTenantCode(userEntity);
if (!visitedTenantEntityCode.contains(tenantEntityCode)) {
defaultPath = storageOperate.getResDir(tenantEntityCode);
if (type.equals(ResourceType.UDF)) {
@ -723,7 +695,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return false;
}
// query tenant
String tenantCode = tenantMapper.queryById(loginUser.getTenantId()).getTenantCode();
String tenantCode = getTenantCode(loginUser);
// random file name
String localFilename = FileUtils.getUploadFilename(tenantCode, UUID.randomUUID().toString());
@ -768,13 +740,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return null;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return null;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
String defaultPath = "";
List<StorageEntity> resourcesList = new ArrayList<>();
@ -785,7 +751,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
Set<String> visitedTenantEntityCode = new HashSet<>();
for (User userEntity : userList) {
String tenantEntityCode = tenantMapper.queryById(userEntity.getTenantId()).getTenantCode();
String tenantEntityCode = getTenantCode(userEntity);
if (!visitedTenantEntityCode.contains(tenantEntityCode)) {
defaultPath = storageOperate.getResDir(tenantEntityCode);
if (type.equals(ResourceType.UDF)) {
@ -911,14 +877,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
if (!isUserTenantValid(isAdmin(loginUser), tenantCode, resTenantCode)) {
log.error("current user does not have permission");
@ -1042,14 +1001,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
if (!isUserTenantValid(isAdmin(loginUser), tenantCode, resTenantCode)) {
log.error("current user does not have permission");
@ -1095,14 +1047,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
if (!isUserTenantValid(isAdmin(loginUser), tenantCode, resTenantCode)) {
log.error("current user does not have permission");
@ -1153,14 +1098,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
if (!isUserTenantValid(isAdmin(loginUser), tenantCode, resTenantCode)) {
log.error("current user does not have permission");
@ -1236,14 +1174,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
if (!isUserTenantValid(isAdmin(loginUser), tenantCode, "")) {
log.error("current user does not have permission");
@ -1384,14 +1315,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
if (!isUserTenantValid(isAdmin(loginUser), tenantCode, resTenantCode)) {
log.error("current user does not have permission");
@ -1510,17 +1434,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
throw new ServiceException(String.format("Resource owner id %d does not exist", userId));
}
String tenantCode = "";
if (user.getTenantId() != 0) {
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("Tenant id {} not exists", user.getTenantId());
throw new ServiceException(
String.format("The tenant id %d of resource owner not exist", user.getTenantId()));
}
tenantCode = tenant.getTenantCode();
}
String tenantCode = getTenantCode(user);
String[] aliasArr = fullName.split("/");
String alias = aliasArr[aliasArr.length - 1];
@ -1598,14 +1512,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result;
}
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
log.error("tenant not exists");
putMsg(result, Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
return result;
}
String tenantCode = tenant.getTenantCode();
String tenantCode = getTenantCode(user);
String baseFolder = storageOperate.getResourceFileName(tenantCode, "DATA_TRANSFER");
@ -1875,4 +1782,12 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return true;
}
private String getTenantCode(User user) {
Tenant tenant = tenantMapper.queryById(user.getTenantId());
if (tenant == null) {
throw new ServiceException(Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST);
}
return tenant.getTenantCode();
}
}

90
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.eq;
import org.apache.dolphinscheduler.api.dto.resources.DeleteDataTransferResponse;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.permission.ResourcePermissionCheckService;
import org.apache.dolphinscheduler.api.service.impl.BaseServiceImpl;
import org.apache.dolphinscheduler.api.service.impl.ResourcesServiceImpl;
@ -154,27 +155,26 @@ public class ResourcesServiceTest {
user.setUserType(UserType.GENERAL_USER);
// CURRENT_LOGIN_USER_TENANT_NOT_EXIST
MockMultipartFile mockMultipartFile = new MockMultipartFile("test.pdf", "test.pdf", "pdf", "test".getBytes());
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(null);
Result result = resourcesService.createResource(user, "ResourcesServiceTest", "ResourcesServiceTest",
ResourceType.FILE, mockMultipartFile, "/");
logger.info(result.toString());
Assertions.assertEquals(Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST.getMsg(), result.getMsg());
Assertions.assertThrows(ServiceException.class,
() -> resourcesService.createResource(user, "ResourcesServiceTest", "ResourcesServiceTest",
ResourceType.FILE, new MockMultipartFile("test.pdf", "test.pdf", "pdf", "test".getBytes()),
"/"));
// set tenant for user
user.setTenantId(1);
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
// HDFS_NOT_STARTUP
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
result = resourcesService.createResource(user, "ResourcesServiceTest", "ResourcesServiceTest",
Result<Object> result = resourcesService.createResource(user, "ResourcesServiceTest", "ResourcesServiceTest",
ResourceType.FILE, null, "/");
logger.info(result.toString());
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// RESOURCE_FILE_IS_EMPTY
mockMultipartFile = new MockMultipartFile("test.pdf", "".getBytes());
MockMultipartFile mockMultipartFile = new MockMultipartFile("test.pdf", "".getBytes());
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
result = resourcesService.createResource(user, "ResourcesServiceTest", "ResourcesServiceTest",
ResourceType.FILE, mockMultipartFile, "/");
@ -328,10 +328,9 @@ public class ResourcesServiceTest {
// TENANT_NOT_EXIST
Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(null);
result = resourcesService.updateResource(user, "ResourcesServiceTest1.jar",
"", "ResourcesServiceTest", "", ResourceType.UDF, null);
logger.info(result.toString());
Assertions.assertEquals(Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST.getMsg(), result.getMsg());
Assertions.assertThrows(ServiceException.class,
() -> resourcesService.updateResource(user, "ResourcesServiceTest1.jar",
"", "ResourcesServiceTest", "", ResourceType.UDF, null));
// SUCCESS
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
@ -414,47 +413,40 @@ public class ResourcesServiceTest {
}
@Test
public void testDelete() {
public void testDelete() throws Exception {
User loginUser = new User();
loginUser.setId(0);
loginUser.setUserType(UserType.GENERAL_USER);
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false);
try {
// HDFS_NOT_STARTUP
Result result = resourcesService.delete(loginUser, "", "");
logger.info(result.toString());
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// TENANT_NOT_EXIST
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
loginUser.setUserType(UserType.ADMIN_USER);
loginUser.setTenantId(2);
Mockito.when(userMapper.selectById(loginUser.getId())).thenReturn(loginUser);
result = resourcesService.delete(loginUser, "", "");
logger.info(result.toString());
Assertions.assertEquals(Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST.getMsg(), result.getMsg());
// HDFS_NOT_STARTUP
Result result = resourcesService.delete(loginUser, "", "");
logger.info(result.toString());
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// RESOURCE_NOT_EXIST
Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(getTenant());
Mockito.when(storageOperate.getFileStatus("/dolphinscheduler/123/resources/ResourcesServiceTest",
null, "123", null))
.thenReturn(getStorageEntityResource());
result = resourcesService.delete(loginUser, "/dolphinscheduler/123/resources/ResNotExist", "123");
logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), result.getMsg());
// TENANT_NOT_EXIST
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true);
loginUser.setUserType(UserType.ADMIN_USER);
loginUser.setTenantId(2);
Mockito.when(userMapper.selectById(loginUser.getId())).thenReturn(loginUser);
Assertions.assertThrows(ServiceException.class, () -> resourcesService.delete(loginUser, "", ""));
// SUCCESS
loginUser.setTenantId(1);
result = resourcesService.delete(loginUser, "/dolphinscheduler/123/resources/ResourcesServiceTest",
"123");
logger.info(result.toString());
Assertions.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
// RESOURCE_NOT_EXIST
Mockito.when(tenantMapper.queryById(Mockito.anyInt())).thenReturn(getTenant());
Mockito.when(storageOperate.getFileStatus("/dolphinscheduler/123/resources/ResourcesServiceTest",
null, "123", null))
.thenReturn(getStorageEntityResource());
result = resourcesService.delete(loginUser, "/dolphinscheduler/123/resources/ResNotExist", "123");
logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), result.getMsg());
// SUCCESS
loginUser.setTenantId(1);
result = resourcesService.delete(loginUser, "/dolphinscheduler/123/resources/ResourcesServiceTest",
"123");
logger.info(result.toString());
Assertions.assertEquals(Status.SUCCESS.getMsg(), result.getMsg());
} catch (Exception e) {
logger.error("delete error", e);
Assertions.assertTrue(false);
}
}
@Test
@ -519,9 +511,8 @@ public class ResourcesServiceTest {
// TENANT_NOT_EXIST
Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(getUser().getTenantId())).thenReturn(null);
result = resourcesService.readResource(getUser(), "", "", 1, 10);
logger.info(result.toString());
Assertions.assertEquals(Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST.getMsg(), result.getMsg());
Assertions.assertThrows(ServiceException.class,
() -> resourcesService.readResource(getUser(), "", "", 1, 10));
// SUCCESS
Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser());
@ -646,9 +637,8 @@ public class ResourcesServiceTest {
// TENANT_NOT_EXIST
Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(null);
result = resourcesService.updateResourceContent(getUser(), "", "123", "content");
logger.info(result.toString());
Assertions.assertTrue(Status.CURRENT_LOGIN_USER_TENANT_NOT_EXIST.getCode() == result.getCode());
Assertions.assertThrows(ServiceException.class,
() -> resourcesService.updateResourceContent(getUser(), "", "123", "content"));
// SUCCESS
try {

25
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/TenantConstants.java

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.constants;
public class TenantConstants {
public static final String DEFAULT_TENANT_CODE = "default";
public static final String BOOTSTRAPT_SYSTEM_USER = System.getProperty("user.name");
}

6
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

@ -916,6 +916,8 @@ CREATE TABLE t_ds_tenant
-- ----------------------------
-- Records of t_ds_tenant
-- ----------------------------
INSERT IGNORE INTO `t_ds_tenant`
VALUES ('-1', 'default', 'default tenant', '1', current_timestamp, current_timestamp);
-- ----------------------------
-- Table structure for t_ds_udfs
@ -955,7 +957,7 @@ CREATE TABLE t_ds_user
user_type tinyint(4) DEFAULT NULL,
email varchar(64) DEFAULT NULL,
phone varchar(11) DEFAULT NULL,
tenant_id int(11) DEFAULT NULL,
tenant_id int(11) DEFAULT -1,
create_time datetime DEFAULT NULL,
update_time datetime DEFAULT NULL,
queue varchar(64) DEFAULT NULL,
@ -1020,7 +1022,7 @@ VALUES (NULL, 1, 'default admin warning group', 'default admin warning group', '
-- Records of t_ds_user
-- ----------------------------
INSERT INTO t_ds_user
VALUES ('1', 'admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', '', '0', '2018-03-27 15:48:50',
VALUES ('1', 'admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', '', '-1', '2018-03-27 15:48:50',
'2018-10-24 17:40:22', null, 1, null);
-- ----------------------------

7
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

@ -914,6 +914,9 @@ CREATE TABLE `t_ds_tenant` (
-- Records of t_ds_tenant
-- ----------------------------
INSERT IGNORE INTO `t_ds_tenant`
VALUES ('-1', 'default', 'default tenant', '1', current_timestamp, current_timestamp);
-- ----------------------------
-- Table structure for t_ds_udfs
-- ----------------------------
@ -950,7 +953,7 @@ CREATE TABLE `t_ds_user` (
`user_type` tinyint(4) DEFAULT NULL COMMENT 'user type, 0:administrator,1:ordinary user',
`email` varchar(64) DEFAULT NULL COMMENT 'email',
`phone` varchar(11) DEFAULT NULL COMMENT 'phone',
`tenant_id` int(11) DEFAULT NULL COMMENT 'tenant id',
`tenant_id` int(11) DEFAULT -1 COMMENT 'tenant id',
`create_time` datetime DEFAULT NULL COMMENT 'create time',
`update_time` datetime DEFAULT NULL COMMENT 'update time',
`queue` varchar(64) DEFAULT NULL COMMENT 'queue',
@ -1011,7 +1014,7 @@ VALUES (NULL, 1, 'default admin warning group', 'default admin warning group', c
-- Records of t_ds_user
-- ----------------------------
INSERT IGNORE INTO `t_ds_user`
VALUES ('1', 'admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', '', '0', current_timestamp, current_timestamp, null, 1, null);
VALUES ('1', 'admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', '', '-1', current_timestamp, current_timestamp, null, 1, null);
-- ----------------------------
-- Table structure for t_ds_plugin_define

8
dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

@ -847,7 +847,7 @@ CREATE TABLE t_ds_user (
user_type int DEFAULT NULL ,
email varchar(64) DEFAULT NULL ,
phone varchar(11) DEFAULT NULL ,
tenant_id int DEFAULT NULL ,
tenant_id int DEFAULT -1 ,
create_time timestamp DEFAULT NULL ,
update_time timestamp DEFAULT NULL ,
queue varchar(64) DEFAULT NULL ,
@ -977,7 +977,11 @@ ALTER TABLE t_ds_worker_group ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_worker_g
-- Records of t_ds_user?user : admin , password : dolphinscheduler123
INSERT INTO t_ds_user(user_name, user_password, user_type, email, phone, tenant_id, state, create_time, update_time, time_zone)
VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', '', '0', 1, '2018-03-27 15:48:50', '2018-10-24 17:40:22', null);
VALUES ('admin', '7ad2410b2f4c074479a8937a28a22b8f', '0', 'xxx@qq.com', '', '-1', 1, '2018-03-27 15:48:50', '2018-10-24 17:40:22', null);
-- Records of t_ds_tenant
INSERT INTO t_ds_tenant(id, tenant_code, description, queue_id, create_time, update_time)
VALUES (-1, 'default', 'default tenant', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');
-- Records of t_ds_alertgroup, default admin warning group
INSERT INTO t_ds_alertgroup(alert_instance_ids, create_user_id, group_name, description, create_time, update_time)

19
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/mysql/dolphinscheduler_dml.sql

@ -14,3 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- ############################# t_ds_tenant #############################
drop PROCEDURE if EXISTS dolphin_t_ds_tenant_insert_default;
delimiter d//
CREATE PROCEDURE dolphin_t_ds_tenant_insert_default()
BEGIN
IF
NOT EXISTS(SELECT 1
FROM t_ds_tenant
WHERE id = -1)
THEN
INSERT INTO `t_ds_tenant` VALUES ('-1', 'default', 'default tenant', '1', current_timestamp, current_timestamp);
END IF;
END;
d//
delimiter ;
CALL dolphin_t_ds_tenant_insert_default();
DROP PROCEDURE dolphin_t_ds_tenant_insert_default;

2
dolphinscheduler-dao/src/main/resources/sql/upgrade/3.2.0_schema/postgresql/dolphinscheduler_dml.sql

@ -14,3 +14,5 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
INSERT INTO t_ds_tenant(id, tenant_code, description, queue_id, create_time, update_time) VALUES (-1, 'default', 'default tenant', '0', '2018-03-27 15:48:50', '2018-10-24 17:40:22');

37
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -104,6 +104,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected final Logger log =
LoggerFactory.getLogger(BaseTaskProcessor.class);
private String tenantCode;
protected boolean killed = false;
protected boolean paused = false;
@ -147,6 +149,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
this.processInstance = processInstance;
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
this.commitInterval = masterConfig.getTaskCommitInterval().toMillis();
this.tenantCode = getTenantCode();
}
protected javax.sql.DataSource defaultDataSource =
@ -305,20 +308,13 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* @return TaskExecutionContext
*/
protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance) {
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
if (tenantCode == null) {
log.info("Task state changes to {}", TaskExecutionStatus.FAILURE);
taskInstance.setState(TaskExecutionStatus.FAILURE);
taskInstanceDao.upsertTaskInstance(taskInstance);
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processService.queryUserQueueByProcessInstance(taskInstance.getProcessInstance());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.getProcessInstance().setTenantCode(tenantCode);
taskInstance.setResources(getResourceFullNames(taskInstance));
TaskChannel taskChannel = taskPluginManager.getTaskChannel(taskInstance.getTaskType());
@ -329,7 +325,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
DataQualityTaskExecutionContext dataQualityTaskExecutionContext = null;
if (TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) {
dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext();
setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, tenant.getTenantCode());
setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, tenantCode);
}
K8sTaskExecutionContext k8sTaskExecutionContext = setK8sTaskRelation(taskInstance);
@ -601,21 +597,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
}
}
/**
* whehter tenant is null
*
* @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
protected boolean verifyTenantIsNull(Tenant tenant, TaskInstance taskInstance) {
if (tenant == null) {
log.error("Tenant does not exists");
return true;
}
return false;
}
/**
* get resource map key is full name and value is tenantCode
*/
@ -671,4 +652,10 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
}
return k8sTaskExecutionContext;
}
private String getTenantCode() {
int userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
Tenant tenant = processService.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(), userId);
return tenant == null ? null : tenant.getTenantCode();
}
}

1
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/BlockingTaskTest.java

@ -216,6 +216,7 @@ public class BlockingTaskTest {
Mockito.when(
taskInstanceDao.findValidTaskListByProcessId(processInstance.getId(), processInstance.getTestFlag()))
.thenReturn(conditions);
taskInstance.setProcessInstance(processInstance);
return taskInstance;
}

2
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/SubProcessTaskTest.java

@ -106,7 +106,7 @@ public class SubProcessTaskTest {
Mockito.when(processService
.findSubProcessInstance(processInstance.getId(), taskInstance.getId()))
.thenReturn(subProcessInstance);
taskInstance.setProcessInstance(processInstance);
return taskInstance;
}

30
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessorTest.java

@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Resource;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -38,7 +37,6 @@ import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
@ -149,34 +147,6 @@ public class CommonTaskProcessorTest {
}
@Test
public void testVerifyTenantIsNull() {
Tenant tenant = null;
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setTaskType("SHELL");
taskInstance.setProcessInstanceId(1);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setId(1);
taskInstance.setProcessInstance(processInstance);
boolean res = commonTaskProcessor.verifyTenantIsNull(tenant, taskInstance);
Assertions.assertFalse(res);
tenant = new Tenant();
tenant.setId(1);
tenant.setTenantCode("journey");
tenant.setDescription("journey");
tenant.setQueueId(1);
tenant.setCreateTime(new Date());
tenant.setUpdateTime(new Date());
res = commonTaskProcessor.verifyTenantIsNull(tenant, taskInstance);
Assertions.assertFalse(res);
}
@Test
public void testReplaceTestDatSource() {
CommonTaskProcessor commonTaskProcessor1 = new CommonTaskProcessor();

4
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.api;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import org.apache.dolphinscheduler.common.constants.TenantConstants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
@ -135,7 +136,8 @@ public abstract class AbstractCommandExecutor {
processBuilder.redirectErrorStream(true);
// if sudo.enable=true,setting up user to run commands
if (OSUtils.isSudoEnable()) {
// todo: Create a ShellExecuteClass to generate the shell and execute shell commands
if (OSUtils.isSudoEnable() && !TenantConstants.DEFAULT_TENANT_CODE.equals(taskRequest.getTenantCode())) {
if (SystemUtils.IS_OS_LINUX
&& PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) {
generateCgroupCommand(command);

1
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java

@ -81,7 +81,6 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
log.error("task execute request command content is null");
return;
}
final String workflowMasterAddress = taskDispatchMessage.getMessageSenderAddress();
log.info("Receive task dispatch request, command: {}", taskDispatchMessage);
TaskExecutionContext taskExecutionContext = taskDispatchMessage.getTaskExecutionContext();

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

@ -193,7 +193,7 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
log.info("Set task status to {}", TaskExecutionStatus.RUNNING_EXECUTION);
TaskExecutionCheckerUtils.checkTenantExist(workerConfig, taskExecutionContext);
log.info("TenantCode:{} check success", taskExecutionContext.getTenantCode());
log.info("TenantCode: {} check success", taskExecutionContext.getTenantCode());
TaskExecutionCheckerUtils.createProcessLocalPathIfAbsent(taskExecutionContext);
log.info("ProcessExecDir:{} check success", taskExecutionContext.getExecutePath());

27
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.utils;
import org.apache.dolphinscheduler.common.constants.TenantConstants;
import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
@ -44,13 +45,21 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
@Slf4j
public class TaskExecutionCheckerUtils {
public static void checkTenantExist(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) {
try {
String tenantCode = taskExecutionContext.getTenantCode();
if (TenantConstants.DEFAULT_TENANT_CODE.equals(tenantCode)) {
log.warn("Current tenant is default tenant, will use {} to execute the task",
TenantConstants.BOOTSTRAPT_SYSTEM_USER);
return;
}
boolean osUserExistFlag;
// if Using distributed is true and Currently supported systems are linux,Should not let it
// automatically
@ -89,8 +98,11 @@ public class TaskExecutionCheckerUtils {
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setExecutePath(execLocalPath);
taskExecutionContext.setAppInfoPath(FileUtils.getAppInfoPath(execLocalPath));
createDirectoryWithOwner(Paths.get(taskExecutionContext.getExecutePath()),
taskExecutionContext.getTenantCode());
Path executePath = Paths.get(taskExecutionContext.getExecutePath());
createDirectory(executePath);
if (!TenantConstants.DEFAULT_TENANT_CODE.equals(taskExecutionContext.getTenantCode())) {
setOwner(executePath, taskExecutionContext.getTenantCode());
}
} catch (Throwable ex) {
throw new TaskException("Cannot create process execute dir", ex);
}
@ -143,12 +155,19 @@ public class TaskExecutionCheckerUtils {
}
}
private static void createDirectoryWithOwner(Path filePath, String tenant) {
private static void createDirectory(Path filePath) {
if (Files.exists(filePath)) {
return;
}
try {
Files.createDirectories(filePath);
} catch (IOException e) {
throw new TaskException("Create directory " + filePath + " failed ", e);
}
}
private static void setOwner(Path filePath, String tenant) {
try {
if (!OSUtils.isSudoEnable()) {
// we need to open sudo, then we can change the owner.
return;
@ -158,7 +177,7 @@ public class TaskExecutionCheckerUtils {
UserPrincipal tenantPrincipal = userPrincipalLookupService.lookupPrincipalByName(tenant);
Files.setOwner(filePath, tenantPrincipal);
} catch (IOException e) {
throw new TaskException("Set tenant directory permission failed, tenant: " + tenant, e);
throw new TaskException("Set tenant directory " + filePath + " permission failed, tenant: " + tenant, e);
}
}
}

Loading…
Cancel
Save