From b12df72a0c59e3552dcb9af9971a897f725d602a Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Thu, 7 Sep 2023 14:09:15 +0800 Subject: [PATCH] [Fix][Resource Center] Remove NONE storage type and set LOCAL type as default (#14661) --- docs/docs/en/guide/resource/configuration.md | 4 +- docs/docs/zh/guide/resource/configuration.md | 2 +- .../service/impl/ResourcesServiceImpl.java | 77 ++++-------------- .../api/service/impl/TenantServiceImpl.java | 14 +--- .../api/service/impl/UdfFuncServiceImpl.java | 16 ---- .../api/service/impl/UsersServiceImpl.java | 6 +- .../api/controller/TenantControllerTest.java | 1 - .../api/service/ResourcesServiceTest.java | 78 ++----------------- .../api/service/UdfFuncServiceTest.java | 23 +----- .../common/enums/ResUploadType.java | 2 +- .../common/utils/PropertyUtils.java | 8 -- .../src/main/resources/common.properties | 2 +- .../common/utils/PropertyUtilsTest.java | 5 -- .../src/test/resources/common.properties | 2 +- .../storage/hdfs/HdfsStorageOperator.java | 25 +----- .../hdfs/HdfsStorageOperatorFactory.java | 4 +- .../utils/TaskExecutionCheckerUtils.java | 5 -- 17 files changed, 37 insertions(+), 237 deletions(-) diff --git a/docs/docs/en/guide/resource/configuration.md b/docs/docs/en/guide/resource/configuration.md index a4bb8cef42..112b5458cc 100644 --- a/docs/docs/en/guide/resource/configuration.md +++ b/docs/docs/en/guide/resource/configuration.md @@ -80,8 +80,8 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE -resource.storage.type=NONE +# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS +resource.storage.type=LOCAL # resource store on HDFS/S3/OSS path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended resource.storage.upload.base.path=/tmp/dolphinscheduler diff --git a/docs/docs/zh/guide/resource/configuration.md b/docs/docs/zh/guide/resource/configuration.md index 25bacf90a0..c5c7f85620 100644 --- a/docs/docs/zh/guide/resource/configuration.md +++ b/docs/docs/zh/guide/resource/configuration.md @@ -79,7 +79,7 @@ resource.aws.s3.endpoint= # user data local directory path, please make sure the directory exists and have read write permissions data.basedir.path=/tmp/dolphinscheduler -# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE +# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS resource.storage.type=LOCAL # resource store on HDFS/S3/OSS path, resource file will store to this hadoop hdfs path, self configuration, diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index b18bd685b6..b4a9bba80e 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.api.service.impl; import static org.apache.dolphinscheduler.common.constants.Constants.ALIAS; import static org.apache.dolphinscheduler.common.constants.Constants.CONTENT; +import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR; import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_SS; import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S; @@ -143,11 +144,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe int pid, String currentDir) { Result result = new Result<>(); - - result = checkResourceUploadStartupState(); - if (!result.getCode().equals(Status.SUCCESS.getCode())) { - return result; - } if (FileUtils.directoryTraversal(name)) { log.warn("Parameter name is invalid, name:{}.", RegexUtils.escapeNRT(name)); putMsg(result, Status.VERIFY_PARAMETER_NAME_FAILED); @@ -213,11 +209,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe String currentDir) { Result result = new Result<>(); - result = checkResourceUploadStartupState(); - if (!result.getCode().equals(Status.SUCCESS.getCode())) { - return result; - } - User user = userMapper.selectById(loginUser.getId()); if (user == null) { log.error("user {} not exists", loginUser.getId()); @@ -272,6 +263,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe ApiServerMetrics.recordApiResourceUploadSize(file.getSize()); log.info("Upload resource file complete, resourceName:{}, fileName:{}.", RegexUtils.escapeNRT(name), RegexUtils.escapeNRT(file.getOriginalFilename())); + putMsg(result, Status.SUCCESS); return result; } @@ -343,11 +335,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe MultipartFile file) { Result result = new Result<>(); - result = checkResourceUploadStartupState(); - if (!result.getCode().equals(Status.SUCCESS.getCode())) { - return result; - } - User user = userMapper.selectById(loginUser.getId()); if (user == null) { log.error("user {} not exists", loginUser.getId()); @@ -374,13 +361,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe throw new ServiceException((String.format("Get file status fail, resource path: %s", resourceFullName))); } - if (!PropertyUtils.isResourceStorageStartup()) { - log.error("Storage does not start up, resource upload startup state: {}.", - PropertyUtils.isResourceStorageStartup()); - putMsg(result, Status.STORAGE_NOT_STARTUP); - return result; - } - // TODO: deal with OSS if (resource.isDirectory() && storageOperate.returnStorageType().equals(ResUploadType.S3) && !resource.getFileName().equals(name)) { @@ -459,6 +439,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe try { log.info("start copy {} -> {}", originFullName, destHdfsFileName); storageOperate.copy(originFullName, destHdfsFileName, true, true); + putMsg(result, Status.SUCCESS); } catch (Exception e) { log.error(MessageFormat.format(" copy {0} -> {1} fail", originFullName, destHdfsFileName), e); putMsg(result, Status.HDFS_COPY_FAIL); @@ -596,7 +577,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe String defaultPath = ""; List resourcesList = new ArrayList<>(); String resourceStorageType = - PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE, ResUploadType.NONE.name()); + PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE, ResUploadType.LOCAL.name()); if (isAdmin(loginUser) && StringUtils.isBlank(fullName)) { // list all tenants' resources to admin users in the root directory List userList = userMapper.selectList(null); @@ -668,6 +649,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe putMsg(result, Status.STORE_OPERATE_CREATE_ERROR); // throw new ServiceException(String.format("create resource directory: %s failed.", fullName)); } + putMsg(result, Status.SUCCESS); } catch (Exception e) { log.error("create resource directory {} failed", fullName); putMsg(result, Status.STORE_OPERATE_CREATE_ERROR); @@ -868,11 +850,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe String resTenantCode) throws IOException { Result result = new Result<>(); - result = checkResourceUploadStartupState(); - if (!result.getCode().equals(Status.SUCCESS.getCode())) { - return result; - } - User user = userMapper.selectById(loginUser.getId()); if (user == null) { log.error("user {} not exists", loginUser.getId()); @@ -1089,10 +1066,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe @Override public Result readResource(User loginUser, String fullName, String resTenantCode, int skipLineNum, int limit) { - Result result = checkResourceUploadStartupState(); - if (!result.getCode().equals(Status.SUCCESS.getCode())) { - return result; - } + Result result = new Result<>(); User user = userMapper.selectById(loginUser.getId()); if (user == null) { @@ -1166,11 +1140,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe String content, String currentDir) { Result result = new Result<>(); - result = checkResourceUploadStartupState(); - if (!result.getCode().equals(Status.SUCCESS.getCode())) { - return result; - } - User user = userMapper.selectById(loginUser.getId()); if (user == null) { log.error("user {} not exists", loginUser.getId()); @@ -1254,19 +1223,6 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe permissionPostHandle(authorizationType, loginUser.getId(), Collections.singletonList(resourceId), log); } - private Result checkResourceUploadStartupState() { - Result result = new Result<>(); - putMsg(result, Status.SUCCESS); - // if resource upload startup - if (!PropertyUtils.isResourceStorageStartup()) { - log.error("Storage does not start up, resource upload startup state: {}.", - PropertyUtils.isResourceStorageStartup()); - putMsg(result, Status.STORAGE_NOT_STARTUP); - return result; - } - return result; - } - private Result verifyResource(User loginUser, ResourceType type, String fullName, int pid) { Result result = verifyResourceName(fullName, type, loginUser); if (!result.getCode().equals(Status.SUCCESS.getCode())) { @@ -1307,11 +1263,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe @Transactional public Result updateResourceContent(User loginUser, String fullName, String resTenantCode, String content) { - Result result = checkResourceUploadStartupState(); - if (!result.getCode().equals(Status.SUCCESS.getCode())) { - return result; - } - + Result result = new Result<>(); User user = userMapper.selectById(loginUser.getId()); if (user == null) { log.error("user {} not exists", loginUser.getId()); @@ -1414,18 +1366,10 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe /** * download file * @return resource content - * @throws IOException exception */ @Override public org.springframework.core.io.Resource downloadResource(User loginUser, - String fullName) throws IOException { - // if resource upload startup - if (!PropertyUtils.isResourceStorageStartup()) { - log.warn("Storage does not start up, resource upload startup state: {}.", - PropertyUtils.isResourceStorageStartup()); - throw new ServiceException("hdfs not startup"); - } - + String fullName) { if (fullName.endsWith("/")) { log.error("resource id {} is directory,can't download it", fullName); throw new ServiceException("can't download directory"); @@ -1694,6 +1638,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe @Override public Result queryResourceBaseDir(User loginUser, ResourceType type) { Result result = new Result<>(); + if (storageOperate == null) { + putMsg(result, Status.SUCCESS); + result.setData(EMPTY_STRING); + return result; + } User user = userMapper.selectById(loginUser.getId()); if (user == null) { log.error("user {} not exists", loginUser.getId()); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index cde330e7f6..fc5fc73159 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -31,7 +31,6 @@ import org.apache.dolphinscheduler.api.utils.RegexUtils; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.enums.AuthorizationType; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Queue; import org.apache.dolphinscheduler.dao.entity.Schedule; @@ -160,10 +159,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService createTenantValid(tenant); tenantMapper.insert(tenant); - // if storage startup - if (PropertyUtils.isResourceStorageStartup()) { - storageOperate.createTenantDirIfNotExists(tenantCode); - } + storageOperate.createTenantDirIfNotExists(tenantCode); permissionPostHandle(AuthorizationType.TENANT, loginUser.getId(), Collections.singletonList(tenant.getId()), log); result.put(Constants.DATA_LIST, tenant); @@ -233,8 +229,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService // updateProcessInstance tenant // if the tenant code is modified, the original resource needs to be copied to the new tenant. - if (!Objects.equals(existsTenant.getTenantCode(), updateTenant.getTenantCode()) - && PropertyUtils.isResourceStorageStartup()) { + if (!Objects.equals(existsTenant.getTenantCode(), updateTenant.getTenantCode())) { storageOperate.createTenantDirIfNotExists(tenantCode); } int update = tenantMapper.updateById(updateTenant); @@ -292,10 +287,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService throw new ServiceException(Status.DELETE_TENANT_BY_ID_FAIL_USERS, userList.size()); } - // if resource upload startup - if (PropertyUtils.isResourceStorageStartup()) { - storageOperate.deleteTenant(tenant.getTenantCode()); - } + storageOperate.deleteTenant(tenant.getTenantCode()); int delete = tenantMapper.deleteById(id); if (delete > 0) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java index 071827d579..2bd9f3845b 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java @@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.api.utils.PageInfo; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.UdfType; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.dao.entity.UdfFunc; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ResourceMapper; @@ -104,13 +103,6 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic putMsg(result, Status.DESCRIPTION_TOO_LONG_ERROR); return result; } - // if resource upload startup - if (!PropertyUtils.isResourceStorageStartup()) { - log.error("Storage does not start up, resource upload startup state: {}.", - PropertyUtils.isResourceStorageStartup()); - putMsg(result, Status.HDFS_NOT_STARTUP); - return result; - } // verify udf func name exist if (checkUdfFuncNameExists(funcName)) { @@ -242,14 +234,6 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic return result; } - // if resource upload startup - if (!PropertyUtils.isResourceStorageStartup()) { - log.error("Storage does not start up, resource upload startup state: {}.", - PropertyUtils.isResourceStorageStartup()); - putMsg(result, Status.HDFS_NOT_STARTUP); - return result; - } - // verify udfFuncName is exist if (!funcName.equals(udf.getFuncName())) { if (checkUdfFuncNameExists(funcName)) { diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java index 91a5a78afc..27f8e5f8ef 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.utils.EncryptionUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.dao.entity.AlertGroup; import org.apache.dolphinscheduler.dao.entity.DatasourceUser; import org.apache.dolphinscheduler.dao.entity.K8sNamespaceUser; @@ -184,10 +183,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService { User user = createUser(userName, userPassword, email, tenantId, phone, queue, state); Tenant tenant = tenantMapper.queryById(tenantId); - // resource upload startup - if (PropertyUtils.isResourceStorageStartup()) { - storageOperate.createTenantDirIfNotExists(tenant.getTenantCode()); - } + storageOperate.createTenantDirIfNotExists(tenant.getTenantCode()); log.info("User is created and id is {}.", user.getId()); result.put(Constants.DATA_LIST, user); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java index 6273360639..310a413345 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java @@ -54,7 +54,6 @@ public class TenantControllerTest extends AbstractControllerTest { paramsMap.add("tenantCode", "hayden"); paramsMap.add("queueId", "1"); paramsMap.add("description", "tenant description"); - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); MvcResult mvcResult = mockMvc.perform(post("/tenants") .header(SESSION_ID, sessionId) diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java index d682008e35..1295a8f3f3 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java @@ -159,13 +159,11 @@ public class ResourcesServiceTest { @Test public void testCreateResource() { - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); User user = new User(); user.setId(1); user.setUserType(UserType.GENERAL_USER); // CURRENT_LOGIN_USER_TENANT_NOT_EXIST - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser()); Mockito.when(tenantMapper.queryById(1)).thenReturn(null); Assertions.assertThrows(ServiceException.class, @@ -176,17 +174,9 @@ public class ResourcesServiceTest { user.setTenantId(1); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); - // HDFS_NOT_STARTUP - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); - Result result = resourcesService.createResource(user, "ResourcesServiceTest", - ResourceType.FILE, null, "/"); - logger.info(result.toString()); - Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); - // RESOURCE_FILE_IS_EMPTY MockMultipartFile mockMultipartFile = new MockMultipartFile("test.pdf", "".getBytes()); - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); - result = resourcesService.createResource(user, "ResourcesServiceTest", + Result result = resourcesService.createResource(user, "ResourcesServiceTest", ResourceType.FILE, mockMultipartFile, "/"); logger.info(result.toString()); Assertions.assertEquals(Status.RESOURCE_FILE_IS_EMPTY.getMsg(), result.getMsg()); @@ -223,62 +213,45 @@ public class ResourcesServiceTest { @Test public void testCreateDirecotry() { - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); User user = new User(); user.setId(1); user.setUserType(UserType.GENERAL_USER); - // HDFS_NOT_STARTUP - Result result = resourcesService.createDirectory(user, "directoryTest", - ResourceType.FILE, -1, "/"); - logger.info(result.toString()); - Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); // RESOURCE_EXIST user.setId(1); user.setTenantId(1); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser()); - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); Mockito.when(storageOperate.getResDir("123")).thenReturn("/dolphinscheduler/123/resources/"); try { Mockito.when(storageOperate.exists("/dolphinscheduler/123/resources/directoryTest")).thenReturn(true); } catch (IOException e) { logger.error(e.getMessage(), e); } - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); Mockito.when(resourcesMapper.existResource("/directoryTest", 0)).thenReturn(true); - result = resourcesService.createDirectory(user, "directoryTest", ResourceType.FILE, -1, "/"); + Result result = resourcesService.createDirectory(user, "directoryTest", ResourceType.FILE, -1, "/"); logger.info(result.toString()); Assertions.assertEquals(Status.RESOURCE_EXIST.getMsg(), result.getMsg()); } @Test public void testUpdateResource() { - User user = new User(); user.setId(1); user.setUserType(UserType.GENERAL_USER); user.setTenantId(1); - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser()); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); Mockito.when(storageOperate.getResDir("123")).thenReturn("/dolphinscheduler/123/resources/"); - // HDFS_NOT_STARTUP - Result result = resourcesService.updateResource(user, "ResourcesServiceTest", - "123", "ResourcesServiceTest", ResourceType.FILE, null); - logger.info(result.toString()); - Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); - // USER_NO_OPERATION_PERM - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); user.setUserType(UserType.GENERAL_USER); // tenant who have access to resource is 123, Tenant tenantWNoPermission = new Tenant(); tenantWNoPermission.setTenantCode("321"); Mockito.when(tenantMapper.queryById(1)).thenReturn(tenantWNoPermission); - result = resourcesService.updateResource(user, + Result result = resourcesService.updateResource(user, "/dolphinscheduler/123/resources/ResourcesServiceTest", "123", "ResourcesServiceTest", ResourceType.FILE, null); @@ -422,14 +395,8 @@ public class ResourcesServiceTest { User loginUser = new User(); loginUser.setId(0); loginUser.setUserType(UserType.GENERAL_USER); - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); - // HDFS_NOT_STARTUP - Result result = resourcesService.delete(loginUser, "", ""); - logger.info(result.toString()); - Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); // TENANT_NOT_EXIST - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); loginUser.setUserType(UserType.ADMIN_USER); loginUser.setTenantId(2); Mockito.when(userMapper.selectById(loginUser.getId())).thenReturn(loginUser); @@ -440,7 +407,7 @@ public class ResourcesServiceTest { Mockito.when(storageOperate.getFileStatus("/dolphinscheduler/123/resources/ResourcesServiceTest", null, "123", null)) .thenReturn(getStorageEntityResource()); - result = resourcesService.delete(loginUser, "/dolphinscheduler/123/resources/ResNotExist", "123"); + Result result = resourcesService.delete(loginUser, "/dolphinscheduler/123/resources/ResNotExist", "123"); logger.info(result.toString()); Assertions.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), result.getMsg()); @@ -482,24 +449,15 @@ public class ResourcesServiceTest { @Test public void testReadResource() { - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); - - // HDFS_NOT_STARTUP - Result result = resourcesService.readResource(getUser(), "", "", 1, 10); - logger.info(result.toString()); - Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); - // RESOURCE_NOT_EXIST - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser()); Mockito.when(tenantMapper.queryById(getUser().getTenantId())).thenReturn(getTenant()); - result = resourcesService.readResource(getUser(), "", "", 1, 10); + Result result = resourcesService.readResource(getUser(), "", "", 1, 10); logger.info(result.toString()); Assertions.assertEquals(Status.RESOURCE_FILE_NOT_EXIST.getCode(), (int) result.getCode()); // RESOURCE_SUFFIX_NOT_SUPPORT_VIEW Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class"); - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); result = resourcesService.readResource(getUser(), "", "", 1, 10); logger.info(result.toString()); Assertions.assertEquals(Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW.getMsg(), result.getMsg()); @@ -535,22 +493,14 @@ public class ResourcesServiceTest { @Test public void testOnlineCreateResource() { - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); User user = getUser(); user.setId(1); Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser()); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); - // HDFS_NOT_STARTUP - Result result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "content", - "/"); - logger.info(result.toString()); - Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); - // RESOURCE_SUFFIX_NOT_SUPPORT_VIEW - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class"); - result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "content", + Result result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "content", "/"); logger.info(result.toString()); Assertions.assertEquals(Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW.getMsg(), result.getMsg()); @@ -568,7 +518,6 @@ public class ResourcesServiceTest { @Test public void testCreateOrUpdateResource() throws Exception { - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); User user = getUser(); Mockito.when(userMapper.queryByUserNameAccurately(user.getUserName())).thenReturn(getUser()); @@ -592,15 +541,7 @@ public class ResourcesServiceTest { @Test public void testUpdateResourceContent() { - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); - - // HDFS_NOT_STARTUP - Result result = resourcesService.updateResourceContent(getUser(), "", "", "content"); - logger.info(result.toString()); - Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); - // RESOURCE_NOT_EXIST - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser()); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); @@ -612,14 +553,13 @@ public class ResourcesServiceTest { logger.error(e.getMessage() + " Resource path: {}", "", e); } - result = resourcesService.updateResourceContent(getUser(), + Result result = resourcesService.updateResourceContent(getUser(), "/dolphinscheduler/123/resources/ResourcesServiceTest.jar", "123", "content"); logger.info(result.toString()); Assertions.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), result.getMsg()); // RESOURCE_SUFFIX_NOT_SUPPORT_VIEW - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class"); try { Mockito.when(storageOperate.getFileStatus("", "", "123", ResourceType.FILE)) @@ -668,7 +608,6 @@ public class ResourcesServiceTest { @Test public void testDownloadResource() { - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); Mockito.when(userMapper.selectById(1)).thenReturn(getUser()); org.springframework.core.io.Resource resourceMock = Mockito.mock(org.springframework.core.io.Resource.class); @@ -887,9 +826,6 @@ public class ResourcesServiceTest { @Test public void testCatFile() { - - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); - // SUCCESS try { List list = storageOperate.vimFile(Mockito.any(), Mockito.anyString(), eq(1), eq(10)); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java index a47dfc7b52..e32d68a0e0 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java @@ -109,17 +109,8 @@ public class UdfFuncServiceTest { Mockito.when( resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.UDF, null, 0, serviceLogger)) .thenReturn(true); - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); - // hdfs not start - Result result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest", - "org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String", - "UdfFuncServiceTest", "UdfFuncServiceTest", "", UdfType.HIVE); - logger.info(result.toString()); - Assertions.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(), result.getMsg()); // resource not exist - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); - - result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest", + Result result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest", "org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String", "UdfFuncServiceTest", "UdfFuncServiceTest", "", UdfType.HIVE); logger.info(result.toString()); @@ -162,8 +153,6 @@ public class UdfFuncServiceTest { @Test public void testUpdateUdfFunc() { - - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false); Mockito.when(udfFuncMapper.selectUdfById(1)).thenReturn(getUdfFunc()); // UDF_FUNCTION_NOT_EXIST @@ -177,22 +166,12 @@ public class UdfFuncServiceTest { logger.info(result.toString()); Assertions.assertTrue(Status.UDF_FUNCTION_NOT_EXIST.getCode() == result.getCode()); - // HDFS_NOT_STARTUP - Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.UDF, new Object[]{1}, 0, - serviceLogger)).thenReturn(true); - result = udfFuncService.updateUdfFunc(getLoginUser(), 1, "UdfFuncServiceTest", - "org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String", - "UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, ""); - logger.info(result.toString()); - Assertions.assertTrue(Status.HDFS_NOT_STARTUP.getCode() == result.getCode()); - // RESOURCE_NOT_EXIST Mockito.when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.UDF, 1, ApiFuncIdentificationConstant.UDF_FUNCTION_UPDATE, serviceLogger)).thenReturn(true); Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.UDF, new Object[]{11}, 0, serviceLogger)).thenReturn(true); Mockito.when(udfFuncMapper.selectUdfById(11)).thenReturn(getUdfFunc()); - Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true); result = udfFuncService.updateUdfFunc(getLoginUser(), 11, "UdfFuncServiceTest", "org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String", "UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, ""); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java index 143cb12a35..d60a657002 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ResUploadType.java @@ -21,5 +21,5 @@ package org.apache.dolphinscheduler.common.enums; * data base types */ public enum ResUploadType { - LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE + LOCAL, HDFS, S3, OSS, GCS, ABS, OBS } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java index 668e6ea9fd..8289b14479 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java @@ -18,11 +18,9 @@ package org.apache.dolphinscheduler.common.utils; import static org.apache.dolphinscheduler.common.constants.Constants.COMMON_PROPERTIES_PATH; -import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_STORAGE_TYPE; import org.apache.dolphinscheduler.common.config.IPropertyDelegate; import org.apache.dolphinscheduler.common.config.ImmutablePriorityPropertyDelegate; -import org.apache.dolphinscheduler.common.enums.ResUploadType; import java.util.HashMap; import java.util.Map; @@ -56,12 +54,6 @@ public class PropertyUtils { return Strings.isNullOrEmpty(val) ? val : val.toUpperCase(); } - public static boolean isResourceStorageStartup() { - String resourceStorageType = getString(RESOURCE_STORAGE_TYPE, ResUploadType.NONE.name()); - ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType.toUpperCase()); - return resUploadType != ResUploadType.NONE; - } - public static Integer getInt(String key) { return propertyDelegate.getInt(key.trim()); } diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index ea4c126beb..28ebf4571d 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -21,7 +21,7 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration +# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration # please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless # use shared file mount point resource.storage.type=LOCAL diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java index a67f867ee7..c915197844 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java @@ -38,11 +38,6 @@ public class PropertyUtilsTest { Assertions.assertNotNull(PropertyUtils.getString(Constants.FS_DEFAULT_FS)); } - @Test - public void getResUploadStartupState() { - Assertions.assertTrue(PropertyUtils.isResourceStorageStartup()); - } - @Test public void getSet() { Set networkInterface = PropertyUtils.getSet("networkInterface", value -> { diff --git a/dolphinscheduler-common/src/test/resources/common.properties b/dolphinscheduler-common/src/test/resources/common.properties index 15e17d837c..ef6cc3710e 100644 --- a/dolphinscheduler-common/src/test/resources/common.properties +++ b/dolphinscheduler-common/src/test/resources/common.properties @@ -27,7 +27,7 @@ data.basedir.path=/tmp/dolphinscheduler # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration +# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, OBS. LOCAL type is default type, and it's a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration # please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless # use shared file mount point resource.storage.type=LOCAL diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java index 659a85c727..04449fd0c3 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java @@ -61,16 +61,12 @@ import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; @Slf4j public class HdfsStorageOperator implements Closeable, StorageOperate { @@ -78,17 +74,6 @@ public class HdfsStorageOperator implements Closeable, StorageOperate { private static HdfsStorageProperties hdfsProperties = new HdfsStorageProperties(); private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY"; - private static final LoadingCache cache = CacheBuilder - .newBuilder() - .expireAfterWrite(HdfsStorageProperties.getKerberosExpireTime(), TimeUnit.HOURS) - .build(new CacheLoader() { - - @Override - public HdfsStorageOperator load(String key) { - return new HdfsStorageOperator(hdfsProperties); - } - }); - private volatile boolean yarnEnabled = false; private Configuration configuration; @@ -105,10 +90,6 @@ public class HdfsStorageOperator implements Closeable, StorageOperate { initHdfsPath(); } - public static HdfsStorageOperator getInstance() { - return cache.getUnchecked(HADOOP_UTILS_KEY); - } - /** * init dolphinscheduler root path in hdfs */ @@ -127,7 +108,7 @@ public class HdfsStorageOperator implements Closeable, StorageOperate { /** * init hadoop configuration */ - private void init() throws NullPointerException { + public void init() throws NullPointerException { try { configuration = new HdfsConfiguration(); @@ -267,8 +248,8 @@ public class HdfsStorageOperator implements Closeable, StorageOperate { @Override public void createTenantDirIfNotExists(String tenantCode) throws IOException { - getInstance().mkdir(tenantCode, getHdfsResDir(tenantCode)); - getInstance().mkdir(tenantCode, getHdfsUdfDir(tenantCode)); + mkdir(tenantCode, getHdfsResDir(tenantCode)); + mkdir(tenantCode, getHdfsUdfDir(tenantCode)); } @Override diff --git a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorFactory.java b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorFactory.java index 169a645f2d..d2a6ef0262 100644 --- a/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorFactory.java +++ b/dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperatorFactory.java @@ -28,7 +28,9 @@ public class HdfsStorageOperatorFactory implements StorageOperateFactory { @Override public StorageOperate createStorageOperate() { - return HdfsStorageOperator.getInstance(); + HdfsStorageOperator hdfsOperator = new HdfsStorageOperator(); + hdfsOperator.init(); + return hdfsOperator; } @Override diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java index a8beaa8356..4b9a5e0cee 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java @@ -18,10 +18,8 @@ package org.apache.dolphinscheduler.server.worker.utils; import org.apache.dolphinscheduler.common.constants.TenantConstants; -import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException; import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -125,9 +123,6 @@ public class TaskExecutionCheckerUtils { log.warn("Resource file : {} already exists will not download again ", resFile.getName()); } }); - if (!downloadFiles.isEmpty() && !PropertyUtils.isResourceStorageStartup()) { - throw new StorageOperateNoConfiguredException("Storage service config does not exist!"); - } if (CollectionUtils.isNotEmpty(downloadFiles)) { for (Pair fileDownload : downloadFiles) {