Browse Source

[Fix][Resource Center] Remove NONE storage type and set LOCAL type as default (#14661)

3.2.1-prepare
Aaron Wang 1 year ago committed by GitHub
parent
commit
b12df72a0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      docs/docs/en/guide/resource/configuration.md
  2. 2
      docs/docs/zh/guide/resource/configuration.md
  3. 77
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  4. 14
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
  5. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
  6. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
  7. 1
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java
  8. 78
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  9. 23
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
  10. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java
  11. 8
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
  12. 2
      dolphinscheduler-common/src/main/resources/common.properties
  13. 5
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
  14. 2
      dolphinscheduler-common/src/test/resources/common.properties
  15. 25
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java
  16. 4
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorFactory.java
  17. 5
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

4
docs/docs/en/guide/resource/configuration.md

@ -80,8 +80,8 @@ data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
resource.storage.type=NONE
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS
resource.storage.type=LOCAL
# resource store on HDFS/S3/OSS path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/tmp/dolphinscheduler

2
docs/docs/zh/guide/resource/configuration.md

@ -79,7 +79,7 @@ resource.aws.s3.endpoint=
# user data local directory path, please make sure the directory exists and have read write permissions
data.basedir.path=/tmp/dolphinscheduler
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS
resource.storage.type=LOCAL
# resource store on HDFS/S3/OSS path, resource file will store to this hadoop hdfs path, self configuration,

77
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service.impl;
import static org.apache.dolphinscheduler.common.constants.Constants.ALIAS;
import static org.apache.dolphinscheduler.common.constants.Constants.CONTENT;
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_SS;
import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S;
@ -143,11 +144,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
int pid,
String currentDir) {
Result<Object> result = new Result<>();
result = checkResourceUploadStartupState();
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
if (FileUtils.directoryTraversal(name)) {
log.warn("Parameter name is invalid, name:{}.", RegexUtils.escapeNRT(name));
putMsg(result, Status.VERIFY_PARAMETER_NAME_FAILED);
@ -213,11 +209,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String currentDir) {
Result<Object> result = new Result<>();
result = checkResourceUploadStartupState();
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
User user = userMapper.selectById(loginUser.getId());
if (user == null) {
log.error("user {} not exists", loginUser.getId());
@ -272,6 +263,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
ApiServerMetrics.recordApiResourceUploadSize(file.getSize());
log.info("Upload resource file complete, resourceName:{}, fileName:{}.",
RegexUtils.escapeNRT(name), RegexUtils.escapeNRT(file.getOriginalFilename()));
putMsg(result, Status.SUCCESS);
return result;
}
@ -343,11 +335,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
MultipartFile file) {
Result<Object> result = new Result<>();
result = checkResourceUploadStartupState();
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
User user = userMapper.selectById(loginUser.getId());
if (user == null) {
log.error("user {} not exists", loginUser.getId());
@ -374,13 +361,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
throw new ServiceException((String.format("Get file status fail, resource path: %s", resourceFullName)));
}
if (!PropertyUtils.isResourceStorageStartup()) {
log.error("Storage does not start up, resource upload startup state: {}.",
PropertyUtils.isResourceStorageStartup());
putMsg(result, Status.STORAGE_NOT_STARTUP);
return result;
}
// TODO: deal with OSS
if (resource.isDirectory() && storageOperate.returnStorageType().equals(ResUploadType.S3)
&& !resource.getFileName().equals(name)) {
@ -459,6 +439,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
try {
log.info("start copy {} -> {}", originFullName, destHdfsFileName);
storageOperate.copy(originFullName, destHdfsFileName, true, true);
putMsg(result, Status.SUCCESS);
} catch (Exception e) {
log.error(MessageFormat.format(" copy {0} -> {1} fail", originFullName, destHdfsFileName), e);
putMsg(result, Status.HDFS_COPY_FAIL);
@ -596,7 +577,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String defaultPath = "";
List<StorageEntity> resourcesList = new ArrayList<>();
String resourceStorageType =
PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE, ResUploadType.NONE.name());
PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE, ResUploadType.LOCAL.name());
if (isAdmin(loginUser) && StringUtils.isBlank(fullName)) {
// list all tenants' resources to admin users in the root directory
List<User> userList = userMapper.selectList(null);
@ -668,6 +649,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
putMsg(result, Status.STORE_OPERATE_CREATE_ERROR);
// throw new ServiceException(String.format("create resource directory: %s failed.", fullName));
}
putMsg(result, Status.SUCCESS);
} catch (Exception e) {
log.error("create resource directory {} failed", fullName);
putMsg(result, Status.STORE_OPERATE_CREATE_ERROR);
@ -868,11 +850,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String resTenantCode) throws IOException {
Result<Object> result = new Result<>();
result = checkResourceUploadStartupState();
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
User user = userMapper.selectById(loginUser.getId());
if (user == null) {
log.error("user {} not exists", loginUser.getId());
@ -1089,10 +1066,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
@Override
public Result<Object> readResource(User loginUser, String fullName, String resTenantCode,
int skipLineNum, int limit) {
Result<Object> result = checkResourceUploadStartupState();
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
Result<Object> result = new Result<>();
User user = userMapper.selectById(loginUser.getId());
if (user == null) {
@ -1166,11 +1140,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String content, String currentDir) {
Result<Object> result = new Result<>();
result = checkResourceUploadStartupState();
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
User user = userMapper.selectById(loginUser.getId());
if (user == null) {
log.error("user {} not exists", loginUser.getId());
@ -1254,19 +1223,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
permissionPostHandle(authorizationType, loginUser.getId(), Collections.singletonList(resourceId), log);
}
private Result<Object> checkResourceUploadStartupState() {
Result<Object> result = new Result<>();
putMsg(result, Status.SUCCESS);
// if resource upload startup
if (!PropertyUtils.isResourceStorageStartup()) {
log.error("Storage does not start up, resource upload startup state: {}.",
PropertyUtils.isResourceStorageStartup());
putMsg(result, Status.STORAGE_NOT_STARTUP);
return result;
}
return result;
}
private Result<Object> verifyResource(User loginUser, ResourceType type, String fullName, int pid) {
Result<Object> result = verifyResourceName(fullName, type, loginUser);
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
@ -1307,11 +1263,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
@Transactional
public Result<Object> updateResourceContent(User loginUser, String fullName, String resTenantCode,
String content) {
Result<Object> result = checkResourceUploadStartupState();
if (!result.getCode().equals(Status.SUCCESS.getCode())) {
return result;
}
Result<Object> result = new Result<>();
User user = userMapper.selectById(loginUser.getId());
if (user == null) {
log.error("user {} not exists", loginUser.getId());
@ -1414,18 +1366,10 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
/**
* download file
* @return resource content
* @throws IOException exception
*/
@Override
public org.springframework.core.io.Resource downloadResource(User loginUser,
String fullName) throws IOException {
// if resource upload startup
if (!PropertyUtils.isResourceStorageStartup()) {
log.warn("Storage does not start up, resource upload startup state: {}.",
PropertyUtils.isResourceStorageStartup());
throw new ServiceException("hdfs not startup");
}
String fullName) {
if (fullName.endsWith("/")) {
log.error("resource id {} is directory,can't download it", fullName);
throw new ServiceException("can't download directory");
@ -1694,6 +1638,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
@Override
public Result<Object> queryResourceBaseDir(User loginUser, ResourceType type) {
Result<Object> result = new Result<>();
if (storageOperate == null) {
putMsg(result, Status.SUCCESS);
result.setData(EMPTY_STRING);
return result;
}
User user = userMapper.selectById(loginUser.getId());
if (user == null) {
log.error("user {} not exists", loginUser.getId());

14
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java

@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.api.utils.RegexUtils;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Schedule;
@ -160,10 +159,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
createTenantValid(tenant);
tenantMapper.insert(tenant);
// if storage startup
if (PropertyUtils.isResourceStorageStartup()) {
storageOperate.createTenantDirIfNotExists(tenantCode);
}
storageOperate.createTenantDirIfNotExists(tenantCode);
permissionPostHandle(AuthorizationType.TENANT, loginUser.getId(), Collections.singletonList(tenant.getId()),
log);
result.put(Constants.DATA_LIST, tenant);
@ -233,8 +229,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
// updateProcessInstance tenant
// if the tenant code is modified, the original resource needs to be copied to the new tenant.
if (!Objects.equals(existsTenant.getTenantCode(), updateTenant.getTenantCode())
&& PropertyUtils.isResourceStorageStartup()) {
if (!Objects.equals(existsTenant.getTenantCode(), updateTenant.getTenantCode())) {
storageOperate.createTenantDirIfNotExists(tenantCode);
}
int update = tenantMapper.updateById(updateTenant);
@ -292,10 +287,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
throw new ServiceException(Status.DELETE_TENANT_BY_ID_FAIL_USERS, userList.size());
}
// if resource upload startup
if (PropertyUtils.isResourceStorageStartup()) {
storageOperate.deleteTenant(tenant.getTenantCode());
}
storageOperate.deleteTenant(tenant.getTenantCode());
int delete = tenantMapper.deleteById(id);
if (delete > 0) {

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java

@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ResourceMapper;
@ -104,13 +103,6 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR);
return result;
}
// if resource upload startup
if (!PropertyUtils.isResourceStorageStartup()) {
log.error("Storage does not start up, resource upload startup state: {}.",
PropertyUtils.isResourceStorageStartup());
putMsg(result, Status.HDFS_NOT_STARTUP);
return result;
}
// verify udf func name exist
if (checkUdfFuncNameExists(funcName)) {
@ -242,14 +234,6 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
return result;
}
// if resource upload startup
if (!PropertyUtils.isResourceStorageStartup()) {
log.error("Storage does not start up, resource upload startup state: {}.",
PropertyUtils.isResourceStorageStartup());
putMsg(result, Status.HDFS_NOT_STARTUP);
return result;
}
// verify udfFuncName is exist
if (!funcName.equals(udf.getFuncName())) {
if (checkUdfFuncNameExists(funcName)) {

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java

@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.EncryptionUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.dao.entity.AlertGroup;
import org.apache.dolphinscheduler.dao.entity.DatasourceUser;
import org.apache.dolphinscheduler.dao.entity.K8sNamespaceUser;
@ -184,10 +183,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
User user = createUser(userName, userPassword, email, tenantId, phone, queue, state);
Tenant tenant = tenantMapper.queryById(tenantId);
// resource upload startup
if (PropertyUtils.isResourceStorageStartup()) {
storageOperate.createTenantDirIfNotExists(tenant.getTenantCode());
}
storageOperate.createTenantDirIfNotExists(tenant.getTenantCode());
log.info("User is created and id is {}.", user.getId());
result.put(Constants.DATA_LIST, user);

1
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java

@ -54,7 +54,6 @@ public class TenantControllerTest extends AbstractControllerTest {
paramsMap.add("tenantCode", "hayden");
paramsMap.add("queueId", "1");
paramsMap.add("description", "tenant description");
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
MvcResult mvcResult = mockMvc.perform(post("/tenants")
.header(SESSION_ID, sessionId)

78
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -159,13 +159,11 @@ public class ResourcesServiceTest {
@Test
public void testCreateResource() {
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
User user = new User();
user.setId(1);
user.setUserType(UserType.GENERAL_USER);
// CURRENT_LOGIN_USER_TENANT_NOT_EXIST
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(null);
Assertions.assertThrows(ServiceException.class,
@ -176,17 +174,9 @@ public class ResourcesServiceTest {
user.setTenantId(1);
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
// HDFS_NOT_STARTUP
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
Result<Object> result = resourcesService.createResource(user, "ResourcesServiceTest",
ResourceType.FILE, null, "/");
logger.info(result.toString());
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// RESOURCE_FILE_IS_EMPTY
MockMultipartFile mockMultipartFile = new MockMultipartFile("test.pdf", "".getBytes());
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
result = resourcesService.createResource(user, "ResourcesServiceTest",
Result result = resourcesService.createResource(user, "ResourcesServiceTest",
ResourceType.FILE, mockMultipartFile, "/");
logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_FILE_IS_EMPTY.getMsg(), result.getMsg());
@ -223,62 +213,45 @@ public class ResourcesServiceTest {
@Test
public void testCreateDirecotry() {
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
User user = new User();
user.setId(1);
user.setUserType(UserType.GENERAL_USER);
// HDFS_NOT_STARTUP
Result result = resourcesService.createDirectory(user, "directoryTest",
ResourceType.FILE, -1, "/");
logger.info(result.toString());
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// RESOURCE_EXIST
user.setId(1);
user.setTenantId(1);
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(storageOperate.getResDir("123")).thenReturn("/dolphinscheduler/123/resources/");
try {
Mockito.when(storageOperate.exists("/dolphinscheduler/123/resources/directoryTest")).thenReturn(true);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(resourcesMapper.existResource("/directoryTest", 0)).thenReturn(true);
result = resourcesService.createDirectory(user, "directoryTest", ResourceType.FILE, -1, "/");
Result result = resourcesService.createDirectory(user, "directoryTest", ResourceType.FILE, -1, "/");
logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_EXIST.getMsg(), result.getMsg());
}
@Test
public void testUpdateResource() {
User user = new User();
user.setId(1);
user.setUserType(UserType.GENERAL_USER);
user.setTenantId(1);
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(storageOperate.getResDir("123")).thenReturn("/dolphinscheduler/123/resources/");
// HDFS_NOT_STARTUP
Result result = resourcesService.updateResource(user, "ResourcesServiceTest",
"123", "ResourcesServiceTest", ResourceType.FILE, null);
logger.info(result.toString());
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// USER_NO_OPERATION_PERM
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
user.setUserType(UserType.GENERAL_USER);
// tenant who have access to resource is 123,
Tenant tenantWNoPermission = new Tenant();
tenantWNoPermission.setTenantCode("321");
Mockito.when(tenantMapper.queryById(1)).thenReturn(tenantWNoPermission);
result = resourcesService.updateResource(user,
Result result = resourcesService.updateResource(user,
"/dolphinscheduler/123/resources/ResourcesServiceTest",
"123",
"ResourcesServiceTest", ResourceType.FILE, null);
@ -422,14 +395,8 @@ public class ResourcesServiceTest {
User loginUser = new User();
loginUser.setId(0);
loginUser.setUserType(UserType.GENERAL_USER);
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
// HDFS_NOT_STARTUP
Result result = resourcesService.delete(loginUser, "", "");
logger.info(result.toString());
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// TENANT_NOT_EXIST
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
loginUser.setUserType(UserType.ADMIN_USER);
loginUser.setTenantId(2);
Mockito.when(userMapper.selectById(loginUser.getId())).thenReturn(loginUser);
@ -440,7 +407,7 @@ public class ResourcesServiceTest {
Mockito.when(storageOperate.getFileStatus("/dolphinscheduler/123/resources/ResourcesServiceTest",
null, "123", null))
.thenReturn(getStorageEntityResource());
result = resourcesService.delete(loginUser, "/dolphinscheduler/123/resources/ResNotExist", "123");
Result result = resourcesService.delete(loginUser, "/dolphinscheduler/123/resources/ResNotExist", "123");
logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), result.getMsg());
@ -482,24 +449,15 @@ public class ResourcesServiceTest {
@Test
public void testReadResource() {
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
// HDFS_NOT_STARTUP
Result result = resourcesService.readResource(getUser(), "", "", 1, 10);
logger.info(result.toString());
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// RESOURCE_NOT_EXIST
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(getUser().getTenantId())).thenReturn(getTenant());
result = resourcesService.readResource(getUser(), "", "", 1, 10);
Result result = resourcesService.readResource(getUser(), "", "", 1, 10);
logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_FILE_NOT_EXIST.getCode(), (int) result.getCode());
// RESOURCE_SUFFIX_NOT_SUPPORT_VIEW
Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class");
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
result = resourcesService.readResource(getUser(), "", "", 1, 10);
logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW.getMsg(), result.getMsg());
@ -535,22 +493,14 @@ public class ResourcesServiceTest {
@Test
public void testOnlineCreateResource() {
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
User user = getUser();
user.setId(1);
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
// HDFS_NOT_STARTUP
Result result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "content",
"/");
logger.info(result.toString());
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// RESOURCE_SUFFIX_NOT_SUPPORT_VIEW
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class");
result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "content",
Result result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "content",
"/");
logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW.getMsg(), result.getMsg());
@ -568,7 +518,6 @@ public class ResourcesServiceTest {
@Test
public void testCreateOrUpdateResource() throws Exception {
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
User user = getUser();
Mockito.when(userMapper.queryByUserNameAccurately(user.getUserName())).thenReturn(getUser());
@ -592,15 +541,7 @@ public class ResourcesServiceTest {
@Test
public void testUpdateResourceContent() {
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
// HDFS_NOT_STARTUP
Result result = resourcesService.updateResourceContent(getUser(), "", "", "content");
logger.info(result.toString());
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// RESOURCE_NOT_EXIST
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
@ -612,14 +553,13 @@ public class ResourcesServiceTest {
logger.error(e.getMessage() + " Resource path: {}", "", e);
}
result = resourcesService.updateResourceContent(getUser(),
Result result = resourcesService.updateResourceContent(getUser(),
"/dolphinscheduler/123/resources/ResourcesServiceTest.jar",
"123", "content");
logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), result.getMsg());
// RESOURCE_SUFFIX_NOT_SUPPORT_VIEW
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class");
try {
Mockito.when(storageOperate.getFileStatus("", "", "123", ResourceType.FILE))
@ -668,7 +608,6 @@ public class ResourcesServiceTest {
@Test
public void testDownloadResource() {
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(userMapper.selectById(1)).thenReturn(getUser());
org.springframework.core.io.Resource resourceMock = Mockito.mock(org.springframework.core.io.Resource.class);
@ -887,9 +826,6 @@ public class ResourcesServiceTest {
@Test
public void testCatFile() {
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
// SUCCESS
try {
List<String> list = storageOperate.vimFile(Mockito.any(), Mockito.anyString(), eq(1), eq(10));

23
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java

@ -109,17 +109,8 @@ public class UdfFuncServiceTest {
Mockito.when(
resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.UDF, null, 0, serviceLogger))
.thenReturn(true);
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
// hdfs not start
Result result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", "", UdfType.HIVE);
logger.info(result.toString());
Assertions.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(), result.getMsg());
// resource not exist
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest",
Result result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", "", UdfType.HIVE);
logger.info(result.toString());
@ -162,8 +153,6 @@ public class UdfFuncServiceTest {
@Test
public void testUpdateUdfFunc() {
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
Mockito.when(udfFuncMapper.selectUdfById(1)).thenReturn(getUdfFunc());
// UDF_FUNCTION_NOT_EXIST
@ -177,22 +166,12 @@ public class UdfFuncServiceTest {
logger.info(result.toString());
Assertions.assertTrue(Status.UDF_FUNCTION_NOT_EXIST.getCode() == result.getCode());
// HDFS_NOT_STARTUP
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.UDF, new Object[]{1}, 0,
serviceLogger)).thenReturn(true);
result = udfFuncService.updateUdfFunc(getLoginUser(), 1, "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, "");
logger.info(result.toString());
Assertions.assertTrue(Status.HDFS_NOT_STARTUP.getCode() == result.getCode());
// RESOURCE_NOT_EXIST
Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.UDF, 1,
ApiFuncIdentificationConstant.UDF_FUNCTION_UPDATE, serviceLogger)).thenReturn(true);
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.UDF, new Object[]{11}, 0,
serviceLogger)).thenReturn(true);
Mockito.when(udfFuncMapper.selectUdfById(11)).thenReturn(getUdfFunc());
Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
result = udfFuncService.updateUdfFunc(getLoginUser(), 11, "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, "");

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java

@ -21,5 +21,5 @@ package org.apache.dolphinscheduler.common.enums;
* data base types
*/
public enum ResUploadType {
LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE
LOCAL, HDFS, S3, OSS, GCS, ABS, OBS
}

8
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java

@ -18,11 +18,9 @@
package org.apache.dolphinscheduler.common.utils;
import static org.apache.dolphinscheduler.common.constants.Constants.COMMON_PROPERTIES_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_STORAGE_TYPE;
import org.apache.dolphinscheduler.common.config.IPropertyDelegate;
import org.apache.dolphinscheduler.common.config.ImmutablePriorityPropertyDelegate;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import java.util.HashMap;
import java.util.Map;
@ -56,12 +54,6 @@ public class PropertyUtils {
return Strings.isNullOrEmpty(val) ? val : val.toUpperCase();
}
public static boolean isResourceStorageStartup() {
String resourceStorageType = getString(RESOURCE_STORAGE_TYPE, ResUploadType.NONE.name());
ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType.toUpperCase());
return resUploadType != ResUploadType.NONE;
}
public static Integer getInt(String key) {
return propertyDelegate.getInt(key.trim());
}

2
dolphinscheduler-common/src/main/resources/common.properties

@ -21,7 +21,7 @@ data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
# please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless
# use shared file mount point
resource.storage.type=LOCAL

5
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java

@ -38,11 +38,6 @@ public class PropertyUtilsTest {
Assertions.assertNotNull(PropertyUtils.getString(Constants.FS_DEFAULT_FS));
}
@Test
public void getResUploadStartupState() {
Assertions.assertTrue(PropertyUtils.isResourceStorageStartup());
}
@Test
public void getSet() {
Set<String> networkInterface = PropertyUtils.getSet("networkInterface", value -> {

2
dolphinscheduler-common/src/test/resources/common.properties

@ -27,7 +27,7 @@ data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
# please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless
# use shared file mount point
resource.storage.type=LOCAL

25
dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java

@ -61,16 +61,12 @@ import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@Slf4j
public class HdfsStorageOperator implements Closeable, StorageOperate {
@ -78,17 +74,6 @@ public class HdfsStorageOperator implements Closeable, StorageOperate {
private static HdfsStorageProperties hdfsProperties = new HdfsStorageProperties();
private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
private static final LoadingCache<String, HdfsStorageOperator> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(HdfsStorageProperties.getKerberosExpireTime(), TimeUnit.HOURS)
.build(new CacheLoader<String, HdfsStorageOperator>() {
@Override
public HdfsStorageOperator load(String key) {
return new HdfsStorageOperator(hdfsProperties);
}
});
private volatile boolean yarnEnabled = false;
private Configuration configuration;
@ -105,10 +90,6 @@ public class HdfsStorageOperator implements Closeable, StorageOperate {
initHdfsPath();
}
public static HdfsStorageOperator getInstance() {
return cache.getUnchecked(HADOOP_UTILS_KEY);
}
/**
* init dolphinscheduler root path in hdfs
*/
@ -127,7 +108,7 @@ public class HdfsStorageOperator implements Closeable, StorageOperate {
/**
* init hadoop configuration
*/
private void init() throws NullPointerException {
public void init() throws NullPointerException {
try {
configuration = new HdfsConfiguration();
@ -267,8 +248,8 @@ public class HdfsStorageOperator implements Closeable, StorageOperate {
@Override
public void createTenantDirIfNotExists(String tenantCode) throws IOException {
getInstance().mkdir(tenantCode, getHdfsResDir(tenantCode));
getInstance().mkdir(tenantCode, getHdfsUdfDir(tenantCode));
mkdir(tenantCode, getHdfsResDir(tenantCode));
mkdir(tenantCode, getHdfsUdfDir(tenantCode));
}
@Override

4
dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorFactory.java

@ -28,7 +28,9 @@ public class HdfsStorageOperatorFactory implements StorageOperateFactory {
@Override
public StorageOperate createStorageOperate() {
return HdfsStorageOperator.getInstance();
HdfsStorageOperator hdfsOperator = new HdfsStorageOperator();
hdfsOperator.init();
return hdfsOperator;
}
@Override

5
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

@ -18,10 +18,8 @@
package org.apache.dolphinscheduler.server.worker.utils;
import org.apache.dolphinscheduler.common.constants.TenantConstants;
import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -125,9 +123,6 @@ public class TaskExecutionCheckerUtils {
log.warn("Resource file : {} already exists will not download again ", resFile.getName());
}
});
if (!downloadFiles.isEmpty() && !PropertyUtils.isResourceStorageStartup()) {
throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
}
if (CollectionUtils.isNotEmpty(downloadFiles)) {
for (Pair<String, String> fileDownload : downloadFiles) {

Loading…
Cancel
Save