Browse Source

Support get property from env/jvm/common.properties (#14645)

3.2.1-prepare
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
9ce8871158
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  2. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java
  3. 8
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java
  4. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java
  5. 2
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java
  6. 44
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java
  7. 8
      dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java
  8. 7
      dolphinscheduler-bom/pom.xml
  9. 5
      dolphinscheduler-common/pom.xml
  10. 91
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/IPropertyDelegate.java
  11. 137
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/ImmutablePriorityPropertyDelegate.java
  12. 80
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/ImmutablePropertyDelegate.java
  13. 263
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
  14. 40
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/config/ImmutablePriorityPropertyDelegateTest.java
  15. 87
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/config/ImmutablePropertyDelegateTest.java
  16. 14
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
  17. 2
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
  18. 199
      dolphinscheduler-common/src/test/resources/common.properties
  19. 4
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java
  20. 2
      dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtilsTest.java
  21. 2
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java
  22. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
  23. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

12
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -374,9 +374,9 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
throw new ServiceException((String.format("Get file status fail, resource path: %s", resourceFullName))); throw new ServiceException((String.format("Get file status fail, resource path: %s", resourceFullName)));
} }
if (!PropertyUtils.getResUploadStartupState()) { if (!PropertyUtils.isResourceStorageStartup()) {
log.error("Storage does not start up, resource upload startup state: {}.", log.error("Storage does not start up, resource upload startup state: {}.",
PropertyUtils.getResUploadStartupState()); PropertyUtils.isResourceStorageStartup());
putMsg(result, Status.STORAGE_NOT_STARTUP); putMsg(result, Status.STORAGE_NOT_STARTUP);
return result; return result;
} }
@ -1258,9 +1258,9 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
Result<Object> result = new Result<>(); Result<Object> result = new Result<>();
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
// if resource upload startup // if resource upload startup
if (!PropertyUtils.getResUploadStartupState()) { if (!PropertyUtils.isResourceStorageStartup()) {
log.error("Storage does not start up, resource upload startup state: {}.", log.error("Storage does not start up, resource upload startup state: {}.",
PropertyUtils.getResUploadStartupState()); PropertyUtils.isResourceStorageStartup());
putMsg(result, Status.STORAGE_NOT_STARTUP); putMsg(result, Status.STORAGE_NOT_STARTUP);
return result; return result;
} }
@ -1420,9 +1420,9 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
public org.springframework.core.io.Resource downloadResource(User loginUser, public org.springframework.core.io.Resource downloadResource(User loginUser,
String fullName) throws IOException { String fullName) throws IOException {
// if resource upload startup // if resource upload startup
if (!PropertyUtils.getResUploadStartupState()) { if (!PropertyUtils.isResourceStorageStartup()) {
log.warn("Storage does not start up, resource upload startup state: {}.", log.warn("Storage does not start up, resource upload startup state: {}.",
PropertyUtils.getResUploadStartupState()); PropertyUtils.isResourceStorageStartup());
throw new ServiceException("hdfs not startup"); throw new ServiceException("hdfs not startup");
} }

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java

@ -161,7 +161,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
tenantMapper.insert(tenant); tenantMapper.insert(tenant);
// if storage startup // if storage startup
if (PropertyUtils.getResUploadStartupState()) { if (PropertyUtils.isResourceStorageStartup()) {
storageOperate.createTenantDirIfNotExists(tenantCode); storageOperate.createTenantDirIfNotExists(tenantCode);
} }
permissionPostHandle(AuthorizationType.TENANT, loginUser.getId(), Collections.singletonList(tenant.getId()), permissionPostHandle(AuthorizationType.TENANT, loginUser.getId(), Collections.singletonList(tenant.getId()),
@ -234,7 +234,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
// updateProcessInstance tenant // updateProcessInstance tenant
// if the tenant code is modified, the original resource needs to be copied to the new tenant. // if the tenant code is modified, the original resource needs to be copied to the new tenant.
if (!Objects.equals(existsTenant.getTenantCode(), updateTenant.getTenantCode()) if (!Objects.equals(existsTenant.getTenantCode(), updateTenant.getTenantCode())
&& PropertyUtils.getResUploadStartupState()) { && PropertyUtils.isResourceStorageStartup()) {
storageOperate.createTenantDirIfNotExists(tenantCode); storageOperate.createTenantDirIfNotExists(tenantCode);
} }
int update = tenantMapper.updateById(updateTenant); int update = tenantMapper.updateById(updateTenant);
@ -293,7 +293,7 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService
} }
// if resource upload startup // if resource upload startup
if (PropertyUtils.getResUploadStartupState()) { if (PropertyUtils.isResourceStorageStartup()) {
storageOperate.deleteTenant(tenant.getTenantCode()); storageOperate.deleteTenant(tenant.getTenantCode());
} }

8
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UdfFuncServiceImpl.java

@ -105,9 +105,9 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
return result; return result;
} }
// if resource upload startup // if resource upload startup
if (!PropertyUtils.getResUploadStartupState()) { if (!PropertyUtils.isResourceStorageStartup()) {
log.error("Storage does not start up, resource upload startup state: {}.", log.error("Storage does not start up, resource upload startup state: {}.",
PropertyUtils.getResUploadStartupState()); PropertyUtils.isResourceStorageStartup());
putMsg(result, Status.HDFS_NOT_STARTUP); putMsg(result, Status.HDFS_NOT_STARTUP);
return result; return result;
} }
@ -243,9 +243,9 @@ public class UdfFuncServiceImpl extends BaseServiceImpl implements UdfFuncServic
} }
// if resource upload startup // if resource upload startup
if (!PropertyUtils.getResUploadStartupState()) { if (!PropertyUtils.isResourceStorageStartup()) {
log.error("Storage does not start up, resource upload startup state: {}.", log.error("Storage does not start up, resource upload startup state: {}.",
PropertyUtils.getResUploadStartupState()); PropertyUtils.isResourceStorageStartup());
putMsg(result, Status.HDFS_NOT_STARTUP); putMsg(result, Status.HDFS_NOT_STARTUP);
return result; return result;
} }

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/UsersServiceImpl.java

@ -185,7 +185,7 @@ public class UsersServiceImpl extends BaseServiceImpl implements UsersService {
Tenant tenant = tenantMapper.queryById(tenantId); Tenant tenant = tenantMapper.queryById(tenantId);
// resource upload startup // resource upload startup
if (PropertyUtils.getResUploadStartupState()) { if (PropertyUtils.isResourceStorageStartup()) {
storageOperate.createTenantDirIfNotExists(tenant.getTenantCode()); storageOperate.createTenantDirIfNotExists(tenant.getTenantCode());
} }

2
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/TenantControllerTest.java

@ -54,7 +54,7 @@ public class TenantControllerTest extends AbstractControllerTest {
paramsMap.add("tenantCode", "hayden"); paramsMap.add("tenantCode", "hayden");
paramsMap.add("queueId", "1"); paramsMap.add("queueId", "1");
paramsMap.add("description", "tenant description"); paramsMap.add("description", "tenant description");
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
MvcResult mvcResult = mockMvc.perform(post("/tenants") MvcResult mvcResult = mockMvc.perform(post("/tenants")
.header(SESSION_ID, sessionId) .header(SESSION_ID, sessionId)

44
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ResourcesServiceTest.java

@ -159,13 +159,13 @@ public class ResourcesServiceTest {
@Test @Test
public void testCreateResource() { public void testCreateResource() {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
User user = new User(); User user = new User();
user.setId(1); user.setId(1);
user.setUserType(UserType.GENERAL_USER); user.setUserType(UserType.GENERAL_USER);
// CURRENT_LOGIN_USER_TENANT_NOT_EXIST // CURRENT_LOGIN_USER_TENANT_NOT_EXIST
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser()); Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(null); Mockito.when(tenantMapper.queryById(1)).thenReturn(null);
Assertions.assertThrows(ServiceException.class, Assertions.assertThrows(ServiceException.class,
@ -177,7 +177,7 @@ public class ResourcesServiceTest {
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
// HDFS_NOT_STARTUP // HDFS_NOT_STARTUP
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
Result<Object> result = resourcesService.createResource(user, "ResourcesServiceTest", Result<Object> result = resourcesService.createResource(user, "ResourcesServiceTest",
ResourceType.FILE, null, "/"); ResourceType.FILE, null, "/");
logger.info(result.toString()); logger.info(result.toString());
@ -185,7 +185,7 @@ public class ResourcesServiceTest {
// RESOURCE_FILE_IS_EMPTY // RESOURCE_FILE_IS_EMPTY
MockMultipartFile mockMultipartFile = new MockMultipartFile("test.pdf", "".getBytes()); MockMultipartFile mockMultipartFile = new MockMultipartFile("test.pdf", "".getBytes());
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
result = resourcesService.createResource(user, "ResourcesServiceTest", result = resourcesService.createResource(user, "ResourcesServiceTest",
ResourceType.FILE, mockMultipartFile, "/"); ResourceType.FILE, mockMultipartFile, "/");
logger.info(result.toString()); logger.info(result.toString());
@ -223,7 +223,7 @@ public class ResourcesServiceTest {
@Test @Test
public void testCreateDirecotry() { public void testCreateDirecotry() {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
User user = new User(); User user = new User();
user.setId(1); user.setId(1);
user.setUserType(UserType.GENERAL_USER); user.setUserType(UserType.GENERAL_USER);
@ -238,14 +238,14 @@ public class ResourcesServiceTest {
user.setTenantId(1); user.setTenantId(1);
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser()); Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(storageOperate.getResDir("123")).thenReturn("/dolphinscheduler/123/resources/"); Mockito.when(storageOperate.getResDir("123")).thenReturn("/dolphinscheduler/123/resources/");
try { try {
Mockito.when(storageOperate.exists("/dolphinscheduler/123/resources/directoryTest")).thenReturn(true); Mockito.when(storageOperate.exists("/dolphinscheduler/123/resources/directoryTest")).thenReturn(true);
} catch (IOException e) { } catch (IOException e) {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
} }
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(resourcesMapper.existResource("/directoryTest", 0)).thenReturn(true); Mockito.when(resourcesMapper.existResource("/directoryTest", 0)).thenReturn(true);
result = resourcesService.createDirectory(user, "directoryTest", ResourceType.FILE, -1, "/"); result = resourcesService.createDirectory(user, "directoryTest", ResourceType.FILE, -1, "/");
logger.info(result.toString()); logger.info(result.toString());
@ -260,7 +260,7 @@ public class ResourcesServiceTest {
user.setUserType(UserType.GENERAL_USER); user.setUserType(UserType.GENERAL_USER);
user.setTenantId(1); user.setTenantId(1);
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser()); Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(storageOperate.getResDir("123")).thenReturn("/dolphinscheduler/123/resources/"); Mockito.when(storageOperate.getResDir("123")).thenReturn("/dolphinscheduler/123/resources/");
@ -272,7 +272,7 @@ public class ResourcesServiceTest {
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// USER_NO_OPERATION_PERM // USER_NO_OPERATION_PERM
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
user.setUserType(UserType.GENERAL_USER); user.setUserType(UserType.GENERAL_USER);
// tenant who have access to resource is 123, // tenant who have access to resource is 123,
Tenant tenantWNoPermission = new Tenant(); Tenant tenantWNoPermission = new Tenant();
@ -422,14 +422,14 @@ public class ResourcesServiceTest {
User loginUser = new User(); User loginUser = new User();
loginUser.setId(0); loginUser.setId(0);
loginUser.setUserType(UserType.GENERAL_USER); loginUser.setUserType(UserType.GENERAL_USER);
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
// HDFS_NOT_STARTUP // HDFS_NOT_STARTUP
Result result = resourcesService.delete(loginUser, "", ""); Result result = resourcesService.delete(loginUser, "", "");
logger.info(result.toString()); logger.info(result.toString());
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// TENANT_NOT_EXIST // TENANT_NOT_EXIST
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
loginUser.setUserType(UserType.ADMIN_USER); loginUser.setUserType(UserType.ADMIN_USER);
loginUser.setTenantId(2); loginUser.setTenantId(2);
Mockito.when(userMapper.selectById(loginUser.getId())).thenReturn(loginUser); Mockito.when(userMapper.selectById(loginUser.getId())).thenReturn(loginUser);
@ -482,7 +482,7 @@ public class ResourcesServiceTest {
@Test @Test
public void testReadResource() { public void testReadResource() {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
// HDFS_NOT_STARTUP // HDFS_NOT_STARTUP
Result result = resourcesService.readResource(getUser(), "", "", 1, 10); Result result = resourcesService.readResource(getUser(), "", "", 1, 10);
@ -490,7 +490,7 @@ public class ResourcesServiceTest {
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// RESOURCE_NOT_EXIST // RESOURCE_NOT_EXIST
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser()); Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(getUser().getTenantId())).thenReturn(getTenant()); Mockito.when(tenantMapper.queryById(getUser().getTenantId())).thenReturn(getTenant());
result = resourcesService.readResource(getUser(), "", "", 1, 10); result = resourcesService.readResource(getUser(), "", "", 1, 10);
@ -499,7 +499,7 @@ public class ResourcesServiceTest {
// RESOURCE_SUFFIX_NOT_SUPPORT_VIEW // RESOURCE_SUFFIX_NOT_SUPPORT_VIEW
Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class"); Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class");
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
result = resourcesService.readResource(getUser(), "", "", 1, 10); result = resourcesService.readResource(getUser(), "", "", 1, 10);
logger.info(result.toString()); logger.info(result.toString());
Assertions.assertEquals(Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW.getMsg(), result.getMsg()); Assertions.assertEquals(Status.RESOURCE_SUFFIX_NOT_SUPPORT_VIEW.getMsg(), result.getMsg());
@ -535,7 +535,7 @@ public class ResourcesServiceTest {
@Test @Test
public void testOnlineCreateResource() { public void testOnlineCreateResource() {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
User user = getUser(); User user = getUser();
user.setId(1); user.setId(1);
Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser()); Mockito.when(userMapper.selectById(user.getId())).thenReturn(getUser());
@ -548,7 +548,7 @@ public class ResourcesServiceTest {
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// RESOURCE_SUFFIX_NOT_SUPPORT_VIEW // RESOURCE_SUFFIX_NOT_SUPPORT_VIEW
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class"); Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class");
result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "content", result = resourcesService.onlineCreateResource(user, ResourceType.FILE, "test", "jar", "content",
"/"); "/");
@ -568,7 +568,7 @@ public class ResourcesServiceTest {
@Test @Test
public void testCreateOrUpdateResource() throws Exception { public void testCreateOrUpdateResource() throws Exception {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
User user = getUser(); User user = getUser();
Mockito.when(userMapper.queryByUserNameAccurately(user.getUserName())).thenReturn(getUser()); Mockito.when(userMapper.queryByUserNameAccurately(user.getUserName())).thenReturn(getUser());
@ -592,7 +592,7 @@ public class ResourcesServiceTest {
@Test @Test
public void testUpdateResourceContent() { public void testUpdateResourceContent() {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
// HDFS_NOT_STARTUP // HDFS_NOT_STARTUP
Result result = resourcesService.updateResourceContent(getUser(), "", "", "content"); Result result = resourcesService.updateResourceContent(getUser(), "", "", "content");
@ -600,7 +600,7 @@ public class ResourcesServiceTest {
Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg()); Assertions.assertEquals(Status.STORAGE_NOT_STARTUP.getMsg(), result.getMsg());
// RESOURCE_NOT_EXIST // RESOURCE_NOT_EXIST
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser()); Mockito.when(userMapper.selectById(getUser().getId())).thenReturn(getUser());
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
@ -619,7 +619,7 @@ public class ResourcesServiceTest {
Assertions.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), result.getMsg()); Assertions.assertEquals(Status.RESOURCE_NOT_EXIST.getMsg(), result.getMsg());
// RESOURCE_SUFFIX_NOT_SUPPORT_VIEW // RESOURCE_SUFFIX_NOT_SUPPORT_VIEW
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class"); Mockito.when(FileUtils.getResourceViewSuffixes()).thenReturn("class");
try { try {
Mockito.when(storageOperate.getFileStatus("", "", "123", ResourceType.FILE)) Mockito.when(storageOperate.getFileStatus("", "", "123", ResourceType.FILE))
@ -668,7 +668,7 @@ public class ResourcesServiceTest {
@Test @Test
public void testDownloadResource() { public void testDownloadResource() {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant()); Mockito.when(tenantMapper.queryById(1)).thenReturn(getTenant());
Mockito.when(userMapper.selectById(1)).thenReturn(getUser()); Mockito.when(userMapper.selectById(1)).thenReturn(getUser());
org.springframework.core.io.Resource resourceMock = Mockito.mock(org.springframework.core.io.Resource.class); org.springframework.core.io.Resource resourceMock = Mockito.mock(org.springframework.core.io.Resource.class);
@ -888,7 +888,7 @@ public class ResourcesServiceTest {
@Test @Test
public void testCatFile() { public void testCatFile() {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
// SUCCESS // SUCCESS
try { try {

8
dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/UdfFuncServiceTest.java

@ -109,7 +109,7 @@ public class UdfFuncServiceTest {
Mockito.when( Mockito.when(
resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.UDF, null, 0, serviceLogger)) resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.UDF, null, 0, serviceLogger))
.thenReturn(true); .thenReturn(true);
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
// hdfs not start // hdfs not start
Result result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest", Result result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String", "org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
@ -117,7 +117,7 @@ public class UdfFuncServiceTest {
logger.info(result.toString()); logger.info(result.toString());
Assertions.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(), result.getMsg()); Assertions.assertEquals(Status.HDFS_NOT_STARTUP.getMsg(), result.getMsg());
// resource not exist // resource not exist
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest", result = udfFuncService.createUdfFunction(getLoginUser(), "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String", "org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
@ -163,7 +163,7 @@ public class UdfFuncServiceTest {
@Test @Test
public void testUpdateUdfFunc() { public void testUpdateUdfFunc() {
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(false); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(false);
Mockito.when(udfFuncMapper.selectUdfById(1)).thenReturn(getUdfFunc()); Mockito.when(udfFuncMapper.selectUdfById(1)).thenReturn(getUdfFunc());
// UDF_FUNCTION_NOT_EXIST // UDF_FUNCTION_NOT_EXIST
@ -192,7 +192,7 @@ public class UdfFuncServiceTest {
Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.UDF, new Object[]{11}, 0, Mockito.when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.UDF, new Object[]{11}, 0,
serviceLogger)).thenReturn(true); serviceLogger)).thenReturn(true);
Mockito.when(udfFuncMapper.selectUdfById(11)).thenReturn(getUdfFunc()); Mockito.when(udfFuncMapper.selectUdfById(11)).thenReturn(getUdfFunc());
Mockito.when(PropertyUtils.getResUploadStartupState()).thenReturn(true); Mockito.when(PropertyUtils.isResourceStorageStartup()).thenReturn(true);
result = udfFuncService.updateUdfFunc(getLoginUser(), 11, "UdfFuncServiceTest", result = udfFuncService.updateUdfFunc(getLoginUser(), 11, "UdfFuncServiceTest",
"org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String", "org.apache.dolphinscheduler.api.service.UdfFuncServiceTest", "String",
"UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, ""); "UdfFuncServiceTest", "UdfFuncServiceTest", UdfType.HIVE, "");

7
dolphinscheduler-bom/pom.xml

@ -115,6 +115,7 @@
<casdoor.version>1.6.0</casdoor.version> <casdoor.version>1.6.0</casdoor.version>
<azure-sdk-bom.version>1.2.10</azure-sdk-bom.version> <azure-sdk-bom.version>1.2.10</azure-sdk-bom.version>
<protobuf.version>3.17.2</protobuf.version> <protobuf.version>3.17.2</protobuf.version>
<system-lambda.version>1.2.1</system-lambda.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
@ -895,6 +896,12 @@
<artifactId>protobuf-java</artifactId> <artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version> <version>${protobuf.version}</version>
</dependency> </dependency>
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-lambda</artifactId>
<version>${system-lambda.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
<profiles> <profiles>

5
dolphinscheduler-common/pom.xml

@ -130,6 +130,11 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.github.stefanbirkner</groupId>
<artifactId>system-lambda</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.google.cloud</groupId> <groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId> <artifactId>google-cloud-storage</artifactId>

91
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/IPropertyDelegate.java

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.config;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
public interface IPropertyDelegate {
String get(String key);
String get(String key, String defaultValue);
Set<String> getPropertyKeys();
default Optional<String> getOptional(String key) {
return getOptional(key, Function.identity());
}
default Integer getInt(String key) {
return get(key, Integer::parseInt);
}
default Integer getInt(String key, Integer defaultValue) {
return get(key, Integer::parseInt, defaultValue);
}
default Long getLong(String key) {
return get(key, Long::parseLong);
}
default Long getLong(String key, Long defaultValue) {
return get(key, Long::parseLong, defaultValue);
}
default Double getDouble(String key) {
return get(key, Double::parseDouble);
}
default Double getDouble(String key, Double defaultValue) {
return get(key, Double::parseDouble, defaultValue);
}
default Boolean getBoolean(String key) {
return get(key, Boolean::parseBoolean);
}
default Boolean getBoolean(String key, Boolean defaultValue) {
return get(key, Boolean::parseBoolean, defaultValue);
}
default <T> T get(String key, Function<String, T> transformFunction) {
String value = get(key);
if (value == null) {
return null;
}
return transformFunction.apply(value);
}
default <T> T get(String key, Function<String, T> transformFunction, T defaultValue) {
String value = get(key);
if (value == null) {
return defaultValue;
}
try {
return transformFunction.apply(value);
} catch (Exception ignored) {
return defaultValue;
}
}
default <T> Optional<T> getOptional(String key, Function<String, T> transformFunction) {
return Optional.ofNullable(get(key, transformFunction));
}
}

137
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/ImmutablePriorityPropertyDelegate.java

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.config;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
/**
* This class will get the property by the priority of the following: env > jvm > properties.
*/
@Slf4j
public class ImmutablePriorityPropertyDelegate extends ImmutablePropertyDelegate {
private static final Map<String, Optional<ConfigValue<String>>> configValueMap = new ConcurrentHashMap<>();
public ImmutablePriorityPropertyDelegate(String propertyAbsolutePath) {
super(propertyAbsolutePath);
}
@Override
public String get(String key) {
Optional<ConfigValue<String>> configValue = configValueMap.computeIfAbsent(key, k -> {
Optional<ConfigValue<String>> value = getConfigValueFromEnv(key);
if (value.isPresent()) {
log.debug("Override config value from env, key: {} actualKey: {}, value: {}",
k,
value.get().getActualKey(), value.get().getValue());
return value;
}
value = getConfigValueFromJvm(key);
if (value.isPresent()) {
log.debug("Override config value from jvm, key: {} actualKey: {}, value: {}",
k, value.get().getActualKey(), value.get().getValue());
return value;
}
value = getConfigValueFromProperties(key);
value.ifPresent(
stringConfigValue -> log.debug("Get config value from properties, key: {} actualKey: {}, value: {}",
k, stringConfigValue.getActualKey(), stringConfigValue.getValue()));
return value;
});
return configValue.map(ConfigValue::getValue).orElse(null);
}
@Override
public String get(String key, String defaultValue) {
String value = get(key);
if (value == null) {
return defaultValue;
}
return value;
}
@Override
public Set<String> getPropertyKeys() {
Set<String> propertyKeys = new HashSet<>();
propertyKeys.addAll(super.getPropertyKeys());
propertyKeys.addAll(System.getProperties().stringPropertyNames());
propertyKeys.addAll(System.getenv().keySet());
return propertyKeys;
}
private Optional<ConfigValue<String>> getConfigValueFromEnv(String key) {
String value = System.getenv(key);
if (value != null) {
return Optional.of(ConfigValue.fromEnv(key, value));
}
String envVarKey = String.valueOf(key).replaceAll("[.-]", "_").toUpperCase();
String envVarVal = System.getenv(envVarKey);
if (envVarVal != null) {
return Optional.of(ConfigValue.fromEnv(key, envVarVal));
}
return Optional.empty();
}
private Optional<ConfigValue<String>> getConfigValueFromJvm(String key) {
String value = System.getProperty(key);
if (value != null) {
return Optional.of(ConfigValue.fromJvm(key, value));
}
return Optional.empty();
}
private Optional<ConfigValue<String>> getConfigValueFromProperties(String key) {
String value = super.get(key);
if (value != null) {
return Optional.of(ConfigValue.fromProperties(key, value));
}
return Optional.empty();
}
@Data
@AllArgsConstructor
public static final class ConfigValue<T> {
private String actualKey;
private T value;
private boolean fromProperties;
private boolean fromJvm;
private boolean fromEnv;
public static <T> ConfigValue<T> fromProperties(String actualKey, T value) {
return new ConfigValue<>(actualKey, value, true, false, false);
}
public static <T> ConfigValue<T> fromJvm(String actualKey, T value) {
return new ConfigValue<>(actualKey, value, false, true, false);
}
public static <T> ConfigValue<T> fromEnv(String actualKey, T value) {
return new ConfigValue<>(actualKey, value, false, false, true);
}
}
}

80
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/ImmutablePropertyDelegate.java

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.config;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
/**
* This class is used to get the properties from the classpath.
*/
@Slf4j
public class ImmutablePropertyDelegate implements IPropertyDelegate {
private static final String COMMON_PROPERTIES_NAME = "/common.properties";
private final Properties properties;
public ImmutablePropertyDelegate() {
this(COMMON_PROPERTIES_NAME);
}
public ImmutablePropertyDelegate(String... propertyAbsolutePath) {
properties = new Properties();
// read from classpath
for (String fileName : propertyAbsolutePath) {
try (InputStream fis = getClass().getResourceAsStream(fileName)) {
Properties subProperties = new Properties();
subProperties.load(fis);
properties.putAll(subProperties);
} catch (IOException e) {
log.error("Load property: {} error, please check if the file exist under classpath",
propertyAbsolutePath, e);
System.exit(1);
}
}
printProperties();
}
public ImmutablePropertyDelegate(Properties properties) {
this.properties = properties;
}
@Override
public String get(String key) {
return properties.getProperty(key);
}
@Override
public String get(String key, String defaultValue) {
return properties.getProperty(key, defaultValue);
}
@Override
public Set<String> getPropertyKeys() {
return properties.stringPropertyNames();
}
private void printProperties() {
properties.forEach((k, v) -> log.debug("Get property {} -> {}", k, v));
}
}

263
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java

@ -18,241 +18,80 @@
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import static org.apache.dolphinscheduler.common.constants.Constants.COMMON_PROPERTIES_PATH; import static org.apache.dolphinscheduler.common.constants.Constants.COMMON_PROPERTIES_PATH;
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_STORAGE_TYPE;
import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.config.IPropertyDelegate;
import org.apache.dolphinscheduler.common.config.ImmutablePriorityPropertyDelegate;
import org.apache.dolphinscheduler.common.enums.ResUploadType; import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Strings; import com.google.common.base.Strings;
@Slf4j @Slf4j
@UtilityClass
public class PropertyUtils { public class PropertyUtils {
private static final Properties properties = new Properties(); // todo: add another implementation for zookeeper/etcd/consul/xx
private static final IPropertyDelegate propertyDelegate =
private PropertyUtils() { new ImmutablePriorityPropertyDelegate(COMMON_PROPERTIES_PATH);
throw new UnsupportedOperationException("Construct PropertyUtils");
}
static {
loadPropertyFile(COMMON_PROPERTIES_PATH);
}
public static synchronized void loadPropertyFile(String... propertyFiles) {
for (String fileName : propertyFiles) {
try (InputStream fis = PropertyUtils.class.getResourceAsStream(fileName);) {
Properties subProperties = new Properties();
subProperties.load(fis);
subProperties.forEach((k, v) -> {
log.debug("Get property {} -> {}", k, v);
});
properties.putAll(subProperties);
} catch (IOException e) {
log.error(e.getMessage(), e);
System.exit(1);
}
}
// Override from system properties
System.getProperties().forEach((k, v) -> {
final String key = String.valueOf(k);
log.info("Overriding property from system property: {}", key);
PropertyUtils.setValue(key, String.valueOf(v));
});
}
/**
* @return judge whether resource upload startup
*/
public static boolean getResUploadStartupState() {
String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(
Strings.isNullOrEmpty(resUploadStartupType) ? ResUploadType.NONE.name() : resUploadStartupType);
return resUploadType != ResUploadType.NONE;
}
/**
* get property value
*
* @param key property name
* @return property value
*/
public static String getString(String key) { public static String getString(String key) {
if (key == null) { return propertyDelegate.get(key.trim());
return null;
}
return properties.getProperty(key.trim());
} }
/**
* get property value with upper case
*
* @param key property name
* @return property value with upper case
*/
public static String getUpperCaseString(String key) {
String val = getString(key);
return Strings.isNullOrEmpty(val) ? val : val.toUpperCase();
}
/**
* get property value
*
* @param key property name
* @param defaultVal default value
* @return property value
*/
public static String getString(String key, String defaultVal) { public static String getString(String key, String defaultVal) {
String val = getString(key); String val = getString(key);
return Strings.isNullOrEmpty(val) ? defaultVal : val; return Strings.isNullOrEmpty(val) ? defaultVal : val;
} }
/** public static String getUpperCaseString(String key) {
* get property value String val = getString(key);
* return Strings.isNullOrEmpty(val) ? val : val.toUpperCase();
* @param key property name
* @return get property int value , if key == null, then return -1
*/
public static int getInt(String key) {
return getInt(key, -1);
}
/**
* @param key key
* @param defaultValue default value
* @return property value
*/
public static int getInt(String key, int defaultValue) {
String value = getString(key);
if (Strings.isNullOrEmpty(value)) {
return defaultValue;
}
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
log.info(e.getMessage(), e);
}
return defaultValue;
} }
/** public static boolean isResourceStorageStartup() {
* get property value String resourceStorageType = getString(RESOURCE_STORAGE_TYPE, ResUploadType.NONE.name());
* ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType.toUpperCase());
* @param key property name return resUploadType != ResUploadType.NONE;
* @return property value
*/
public static boolean getBoolean(String key) {
return getBoolean(key, false);
} }
/** public static Integer getInt(String key) {
* get property value return propertyDelegate.getInt(key.trim());
*
* @param key property name
* @param defaultValue default value
* @return property value
*/
public static Boolean getBoolean(String key, boolean defaultValue) {
String value = getString(key);
return Strings.isNullOrEmpty(value) ? defaultValue : Boolean.parseBoolean(value);
} }
/** public static Integer getInt(String key, int defaultValue) {
* get property long value return propertyDelegate.getInt(key, defaultValue);
*
* @param key key
* @param defaultValue default value
* @return property value
*/
public static long getLong(String key, long defaultValue) {
String value = getString(key);
if (Strings.isNullOrEmpty(value)) {
return defaultValue;
} }
try { public static Boolean getBoolean(String key) {
return Long.parseLong(value); return propertyDelegate.getBoolean(key);
} catch (NumberFormatException e) {
log.info(e.getMessage(), e);
}
return defaultValue;
} }
/** public static Boolean getBoolean(String key, Boolean defaultValue) {
* @param key key return propertyDelegate.getBoolean(key, defaultValue);
* @return property value
*/
public static long getLong(String key) {
return getLong(key, -1);
} }
/** public static Long getLong(String key) {
* @param key key return propertyDelegate.getLong(key);
* @param defaultValue default value
* @return property value
*/
public static double getDouble(String key, double defaultValue) {
String value = getString(key);
if (Strings.isNullOrEmpty(value)) {
return defaultValue;
} }
try { public static Long getLong(String key, Long defaultValue) {
return Double.parseDouble(value); return propertyDelegate.getLong(key, defaultValue);
} catch (NumberFormatException e) {
log.info(e.getMessage(), e);
}
return defaultValue;
} }
/** public static Double getDouble(String key) {
* get array return propertyDelegate.getDouble(key);
*
* @param key property name
* @param splitStr separator
* @return property value through array
*/
public static String[] getArray(String key, String splitStr) {
String value = getString(key);
if (Strings.isNullOrEmpty(value)) {
return new String[0];
}
return value.split(splitStr);
}
/**
* @param key key
* @param type type
* @param defaultValue default value
* @param <T> T
* @return get enum value
*/
public static <T extends Enum<T>> T getEnum(String key, Class<T> type,
T defaultValue) {
String value = getString(key);
if (Strings.isNullOrEmpty(value)) {
return defaultValue;
} }
try { public static Double getDouble(String key, Double defaultValue) {
return Enum.valueOf(type, value); return propertyDelegate.getDouble(key, defaultValue);
} catch (IllegalArgumentException e) {
log.info(e.getMessage(), e);
}
return defaultValue;
} }
/** /**
@ -261,47 +100,17 @@ public class PropertyUtils {
* @param prefix prefix to search * @param prefix prefix to search
* @return all properties with specified prefix * @return all properties with specified prefix
*/ */
public static Map<String, String> getPrefixedProperties(String prefix) { public static Map<String, String> getByPrefix(String prefix) {
Map<String, String> matchedProperties = new HashMap<>(); Map<String, String> matchedProperties = new HashMap<>();
for (String propName : properties.stringPropertyNames()) { for (String propName : propertyDelegate.getPropertyKeys()) {
if (propName.startsWith(prefix)) { if (propName.startsWith(prefix)) {
matchedProperties.put(propName, properties.getProperty(propName)); matchedProperties.put(propName, propertyDelegate.get(propName));
} }
} }
return matchedProperties; return matchedProperties;
} }
/**
* set value
* @param key key
* @param value value
*/
public static void setValue(String key, String value) {
properties.setProperty(key, value);
}
public static Map<String, String> getPropertiesByPrefix(String prefix) {
if (Strings.isNullOrEmpty(prefix)) {
return null;
}
Set<Object> keys = properties.keySet();
if (CollectionUtils.isEmpty(keys)) {
return null;
}
Map<String, String> propertiesMap = new HashMap<>();
keys.forEach(k -> {
if (k.toString().contains(prefix)) {
propertiesMap.put(k.toString().replaceFirst(prefix + ".", ""), properties.getProperty((String) k));
}
});
return propertiesMap;
}
public static <T> Set<T> getSet(String key, Function<String, Set<T>> transformFunction, Set<T> defaultValue) { public static <T> Set<T> getSet(String key, Function<String, Set<T>> transformFunction, Set<T> defaultValue) {
String value = (String) properties.get(key); return propertyDelegate.get(key, transformFunction, defaultValue);
if (StringUtils.isEmpty(value)) {
return defaultValue;
}
return transformFunction.apply(value);
} }
} }

40
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/config/ImmutablePriorityPropertyDelegateTest.java

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.config;
import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
import static org.apache.dolphinscheduler.common.constants.Constants.COMMON_PROPERTIES_PATH;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class ImmutablePriorityPropertyDelegateTest {
private final ImmutablePriorityPropertyDelegate immutablePriorityPropertyDelegate =
new ImmutablePriorityPropertyDelegate(COMMON_PROPERTIES_PATH);
@Test
void getOverrideFromEnv() throws Exception {
withEnvironmentVariable("string.property", "valueFromEnv")
.execute(() -> {
String value = immutablePriorityPropertyDelegate.get("string.property");
Assertions.assertEquals("valueFromEnv", value);
});
}
}

87
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/config/ImmutablePropertyDelegateTest.java

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.config;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class ImmutablePropertyDelegateTest {
private static final ImmutablePropertyDelegate immutablePropertyDelegate = new ImmutablePropertyDelegate();
@Test
void get() {
Assertions.assertNull(immutablePropertyDelegate.get("null"));
}
@Test
void testGetDefaultValue() {
Assertions.assertEquals("default", immutablePropertyDelegate.get("null", "default"));
}
@Test
void getPropertyKeys() {
Assertions.assertNotNull(immutablePropertyDelegate.getPropertyKeys());
}
@Test
void getOptional() {
Assertions.assertFalse(immutablePropertyDelegate.getOptional("null").isPresent());
}
@Test
void getInt() {
Assertions.assertEquals(1, immutablePropertyDelegate.getInt("int.property"));
}
@Test
void getIntDefault() {
Assertions.assertEquals(2, immutablePropertyDelegate.getInt("int2.property", 2));
}
@Test
void getLong() {
Assertions.assertEquals(1, immutablePropertyDelegate.getLong("long.property"));
}
@Test
void getLongDefault() {
Assertions.assertEquals(2, immutablePropertyDelegate.getLong("long2.property", 2L));
}
@Test
void getDouble() {
Assertions.assertEquals(1.1, immutablePropertyDelegate.getDouble("double.property"));
}
@Test
void getDoubleDefault() {
Assertions.assertEquals(2.2, immutablePropertyDelegate.getDouble("double2.property", 2.2d));
}
@Test
void getBoolean() {
Assertions.assertEquals(true, immutablePropertyDelegate.getBoolean("boolean.property"));
}
@Test
void getBooleanDefault() {
Assertions.assertEquals(false, immutablePropertyDelegate.getBoolean("boolean2.property", false));
}
}

14
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java

@ -19,8 +19,6 @@ package org.apache.dolphinscheduler.common.utils;
import static org.apache.dolphinscheduler.common.constants.DateConstants.YYYYMMDDHHMMSS; import static org.apache.dolphinscheduler.common.constants.DateConstants.YYYYMMDDHHMMSS;
import org.apache.dolphinscheduler.common.constants.DataSourceConstants;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -65,18 +63,6 @@ public class FileUtilsTest {
} }
} }
@Test
public void testSetValue() {
try {
PropertyUtils.setValue(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, "true");
Assertions.assertTrue(PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE));
PropertyUtils.setValue(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE, "false");
Assertions.assertFalse(PropertyUtils.getBoolean(DataSourceConstants.DATASOURCE_ENCRYPTION_ENABLE));
} catch (Exception e) {
Assertions.fail();
}
}
@Test @Test
public void testWriteContent2File() throws FileNotFoundException { public void testWriteContent2File() throws FileNotFoundException {
// file exists, fmt is invalid // file exists, fmt is invalid

2
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java

@ -40,7 +40,7 @@ public class PropertyUtilsTest {
@Test @Test
public void getResUploadStartupState() { public void getResUploadStartupState() {
Assertions.assertTrue(PropertyUtils.getResUploadStartupState()); Assertions.assertTrue(PropertyUtils.isResourceStorageStartup());
} }
@Test @Test

199
dolphinscheduler-common/src/test/resources/common.properties

@ -0,0 +1,199 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
string.property=string
int.property=1
long.property=1
double.property=1.1
boolean.property=true
# user data local directory path, please make sure the directory exists and have read write permissions
data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# resource storage type: LOCAL, HDFS, S3, OSS, GCS, ABS, NONE. LOCAL type is a specific type of HDFS with "resource.hdfs.fs.defaultFS = file:///" configuration
# please notice that LOCAL mode does not support reading and writing in distributed mode, which mean you can only use your resource in one machine, unless
# use shared file mount point
resource.storage.type=LOCAL
# resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/dolphinscheduler
# The Azure client ID (Azure Application (client) ID)
resource.azure.client.id=minioadmin
# The Azure client secret in the Azure application
resource.azure.client.secret=minioadmin
# The Azure data factory subscription ID
resource.azure.subId=minioadmin
# The Azure tenant id in the Azure Active Directory
resource.azure.tenant.id=minioadmin
# The query interval
resource.query.interval=10000
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=minioadmin
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=minioadmin
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=cn-north-1
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
resource.aws.s3.bucket.name=dolphinscheduler
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
resource.aws.s3.endpoint=http://localhost:9000
# alibaba cloud access key id, required if you set resource.storage.type=OSS
resource.alibaba.cloud.access.key.id=<your-access-key-id>
# alibaba cloud access key secret, required if you set resource.storage.type=OSS
resource.alibaba.cloud.access.key.secret=<your-access-key-secret>
# alibaba cloud region, required if you set resource.storage.type=OSS
resource.alibaba.cloud.region=cn-hangzhou
# oss bucket name, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.bucket.name=dolphinscheduler
# oss bucket endpoint, required if you set resource.storage.type=OSS
resource.alibaba.cloud.oss.endpoint=https://oss-cn-hangzhou.aliyuncs.com
# the location of the google cloud credential, required if you set resource.storage.type=GCS
resource.google.cloud.storage.credential=/path/to/credential
# gcs bucket name, required if you set resource.storage.type=GCS
resource.google.cloud.storage.bucket.name=<your-bucket>
# abs container name, required if you set resource.storage.type=ABS
resource.azure.blob.storage.container.name=<your-container>
# abs account name, required if you set resource.storage.type=ABS
resource.azure.blob.storage.account.name=<your-account-name>
# abs connection string, required if you set resource.storage.type=ABS
resource.azure.blob.storage.connection.string=<your-connection-string>
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
resource.hdfs.fs.defaultFS=hdfs://mycluster:8020
# whether to startup kerberos
hadoop.security.authentication.startup.state=false
# java.security.krb5.conf path
java.security.krb5.conf.path=/opt/krb5.conf
# login user from keytab username
login.user.keytab.username=hdfs-mycluster@ESZ.COM
# login user from keytab path
login.user.keytab.path=/opt/hdfs.headless.keytab
# kerberos expire time, the unit is hour
kerberos.expire.time=2
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# if resourcemanager HA is enabled or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ds1 to actual resourcemanager hostname
yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s
# datasource encryption enable
datasource.encryption.enable=false
# datasource encryption salt
datasource.encryption.salt=!@#$%^&*
# data quality option
data-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar
#data-quality.error.output.path=/tmp/data-quality-error-data
# Network IP gets priority, default inner outer
# Whether hive SQL is executed in the same session
support.hive.oneSession=false
# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions
sudo.enable=true
# network interface preferred like eth0, default: empty
#dolphin.scheduler.network.interface.preferred=
# network interface restrict like docker0,docker1 , default: docker0
dolphin.scheduler.network.interface.restrict=docker0
# network IP gets priority, default: inner outer
#dolphin.scheduler.network.priority.strategy=default
# system env path
#dolphinscheduler.env.path=dolphinscheduler_env.sh
# development state
development.state=false
# rpc port
alert.rpc.port=50052
# set path of conda.sh
conda.path=/opt/anaconda3/etc/profile.d/conda.sh
# Task resource limit state
task.resource.limit.state=false
# mlflow task plugin preset repository
ml.mlflow.preset_repository=https://github.com/apache/dolphinscheduler-mlflow
# mlflow task plugin preset repository version
ml.mlflow.preset_repository_version="main"
# way to collect applicationId: log(original regex match), aop
appId.collect=log
# The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profile
shell.env_source_list=
# The interceptor type of Shell task, e.g. bash, sh, cmd
shell.interceptor.type=bash
# Whether to enable remote logging
remote.logging.enable=false
# if remote.logging.enable = true, set the target of remote logging
remote.logging.target=OSS
# if remote.logging.enable = true, set the log base directory
remote.logging.base.dir=logs
# if remote.logging.enable = true, set the number of threads to send logs to remote storage
remote.logging.thread.pool.size=10
# oss access key id, required if you set remote.logging.target=OSS
remote.logging.oss.access.key.id=<access.key.id>
# oss access key secret, required if you set remote.logging.target=OSS
remote.logging.oss.access.key.secret=<access.key.secret>
# oss bucket name, required if you set remote.logging.target=OSS
remote.logging.oss.bucket.name=<bucket.name>
# oss endpoint, required if you set remote.logging.target=OSS
remote.logging.oss.endpoint=<endpoint>
# s3 access key id, required if you set remote.logging.target=S3
remote.logging.s3.access.key.id=<access.key.id>
# s3 access key secret, required if you set remote.logging.target=S3
remote.logging.s3.access.key.secret=<access.key.secret>
# s3 bucket name, required if you set remote.logging.target=S3
remote.logging.s3.bucket.name=<bucket.name>
# s3 endpoint, required if you set remote.logging.target=S3
remote.logging.s3.endpoint=<endpoint>
# s3 region, required if you set remote.logging.target=S3
remote.logging.s3.region=<region>
# the location of the google cloud credential, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.credential=/path/to/credential
# gcs bucket name, required if you set remote.logging.target=GCS
remote.logging.google.cloud.storage.bucket.name=<your-bucket>

4
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/main/java/org/apache/dolphinscheduler/plugin/datasource/api/plugin/DataSourceClientProvider.java

@ -43,13 +43,11 @@ import com.google.common.cache.RemovalListener;
@Slf4j @Slf4j
public class DataSourceClientProvider { public class DataSourceClientProvider {
private static final long duration = PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24);
// We use the cache here to avoid creating a new datasource client every time, // We use the cache here to avoid creating a new datasource client every time,
// One DataSourceClient corresponds to one unique datasource. // One DataSourceClient corresponds to one unique datasource.
private static final Cache<String, PooledDataSourceClient> POOLED_DATASOURCE_CLIENT_CACHE = private static final Cache<String, PooledDataSourceClient> POOLED_DATASOURCE_CLIENT_CACHE =
CacheBuilder.newBuilder() CacheBuilder.newBuilder()
.expireAfterWrite(duration, TimeUnit.HOURS) .expireAfterWrite(PropertyUtils.getLong(TaskConstants.KERBEROS_EXPIRE_TIME, 24L), TimeUnit.HOURS)
.removalListener((RemovalListener<String, PooledDataSourceClient>) notification -> { .removalListener((RemovalListener<String, PooledDataSourceClient>) notification -> {
try (PooledDataSourceClient closedClient = notification.getValue()) { try (PooledDataSourceClient closedClient = notification.getValue()) {
log.info("Datasource: {} is removed from cache due to expire", notification.getKey()); log.info("Datasource: {} is removed from cache due to expire", notification.getKey());

2
dolphinscheduler-datasource-plugin/dolphinscheduler-datasource-api/src/test/java/org/apache/dolphinscheduler/plugin/datasource/api/utils/CommonUtilsTest.java

@ -115,8 +115,6 @@ public class CommonUtilsTest {
Mockito.when(PropertyUtils.getBoolean(DATASOURCE_ENCRYPTION_ENABLE, false)) Mockito.when(PropertyUtils.getBoolean(DATASOURCE_ENCRYPTION_ENABLE, false))
.thenAnswer((Answer<Boolean>) invocation -> Boolean.TRUE); .thenAnswer((Answer<Boolean>) invocation -> Boolean.TRUE);
PropertyUtils.setValue(DATASOURCE_ENCRYPTION_ENABLE, "true");
mockedPasswordUtils.when(() -> PasswordUtils.decodePassword("bnVsbE1USXpORFUy")).thenReturn("123456"); mockedPasswordUtils.when(() -> PasswordUtils.decodePassword("bnVsbE1USXpORFUy")).thenReturn("123456");
mockedPasswordUtils.when(() -> PasswordUtils.decodePassword("bnVsbElWRkJXbGhUVjBBPQ==")) mockedPasswordUtils.when(() -> PasswordUtils.decodePassword("bnVsbElWRkJXbGhUVjBBPQ=="))
.thenReturn("!QAZXSW@"); .thenReturn("!QAZXSW@");

2
dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java

@ -140,7 +140,7 @@ public class HdfsStorageOperator implements Closeable, StorageOperate {
// first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file // first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
// the default is the local file system // the default is the local file system
if (StringUtils.isNotBlank(defaultFS)) { if (StringUtils.isNotBlank(defaultFS)) {
Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs."); Map<String, String> fsRelatedProps = PropertyUtils.getByPrefix("fs.");
configuration.set(Constants.HDFS_DEFAULT_FS, defaultFS); configuration.set(Constants.HDFS_DEFAULT_FS, defaultFS);
fsRelatedProps.forEach((key, value) -> configuration.set(key, value)); fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
} else { } else {

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java

@ -120,7 +120,7 @@ public abstract class BaseLinuxShellInterceptorBuilder<T extends BaseLinuxShellI
} }
private List<String> bootstrapCommandInSudoMode() { private List<String> bootstrapCommandInSudoMode() {
if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) { if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE, false)) {
return bootstrapCommandInResourceLimitMode(); return bootstrapCommandInResourceLimitMode();
} }
List<String> bootstrapCommand = new ArrayList<>(); List<String> bootstrapCommand = new ArrayList<>();

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

@ -129,7 +129,7 @@ public class TaskExecutionCheckerUtils {
log.info("file : {} exists ", resFile.getName()); log.info("file : {} exists ", resFile.getName());
} }
}); });
if (!downloadFiles.isEmpty() && !PropertyUtils.getResUploadStartupState()) { if (!downloadFiles.isEmpty() && !PropertyUtils.isResourceStorageStartup()) {
throw new StorageOperateNoConfiguredException("Storage service config does not exist!"); throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
} }

Loading…
Cancel
Save