Browse Source

[Fix-13434][Resource Center] fix the NPE when use OSS as the resource center (#13435)

3.2.0-release
Rick Cheng 2 years ago committed by GitHub
parent
commit
8a59ab44eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  2. 189
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java
  3. 40
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java
  4. 41
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java
  5. 2
      dolphinscheduler-ui/src/locales/zh_CN/resource.ts

6
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -714,6 +714,8 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String defaultPath = ""; String defaultPath = "";
List<StorageEntity> resourcesList = new ArrayList<>(); List<StorageEntity> resourcesList = new ArrayList<>();
String resourceStorageType =
PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE, ResUploadType.NONE.name());
if (isAdmin(loginUser) && StringUtils.isBlank(fullName)) { if (isAdmin(loginUser) && StringUtils.isBlank(fullName)) {
// list all tenants' resources to admin users in the root directory // list all tenants' resources to admin users in the root directory
@ -735,7 +737,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
logger.error(e.getMessage() + " Resource path: {}", defaultPath, e); logger.error(e.getMessage() + " Resource path: {}", defaultPath, e);
putMsg(result, Status.RESOURCE_NOT_EXIST); putMsg(result, Status.RESOURCE_NOT_EXIST);
throw new ServiceException(String.format(e.getMessage() + throw new ServiceException(String.format(e.getMessage() +
" make sure resource path: %s exists in hdfs", defaultPath)); " make sure resource path: %s exists in %s", defaultPath, resourceStorageType));
} }
} }
} }
@ -755,7 +757,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
logger.error(e.getMessage() + " Resource path: {}", fullName, e); logger.error(e.getMessage() + " Resource path: {}", fullName, e);
putMsg(result, Status.RESOURCE_NOT_EXIST); putMsg(result, Status.RESOURCE_NOT_EXIST);
throw new ServiceException(String.format(e.getMessage() + throw new ServiceException(String.format(e.getMessage() +
" make sure resource path: %s exists in hdfs", defaultPath)); " make sure resource path: %s exists in %s", defaultPath, resourceStorageType));
} }
} }

189
dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java

@ -44,7 +44,11 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -56,8 +60,13 @@ import org.slf4j.LoggerFactory;
import com.aliyun.oss.OSS; import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSException; import com.aliyun.oss.OSSException;
import com.aliyun.oss.ServiceException;
import com.aliyun.oss.model.Bucket; import com.aliyun.oss.model.Bucket;
import com.aliyun.oss.model.DeleteObjectsRequest;
import com.aliyun.oss.model.ListObjectsV2Request;
import com.aliyun.oss.model.ListObjectsV2Result;
import com.aliyun.oss.model.OSSObject; import com.aliyun.oss.model.OSSObject;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectMetadata; import com.aliyun.oss.model.ObjectMetadata;
import com.aliyun.oss.model.PutObjectRequest; import com.aliyun.oss.model.PutObjectRequest;
@ -190,8 +199,20 @@ public class OssStorageOperator implements Closeable, StorageOperate {
} }
@Override @Override
public boolean delete(String filePath, List<String> childrenPathArray, boolean recursive) throws IOException { public boolean delete(String fullName, List<String> childrenPathList, boolean recursive) throws IOException {
return false; // append the resource fullName to the list for deletion.
childrenPathList.add(fullName);
DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName)
.withKeys(childrenPathList);
try {
ossClient.deleteObjects(deleteObjectsRequest);
} catch (Exception e) {
logger.error("delete objects error", e);
return false;
}
return true;
} }
@Override @Override
@ -288,19 +309,168 @@ public class OssStorageOperator implements Closeable, StorageOperate {
@Override @Override
public List<StorageEntity> listFilesStatusRecursively(String path, String defaultPath, String tenantCode, public List<StorageEntity> listFilesStatusRecursively(String path, String defaultPath, String tenantCode,
ResourceType type) { ResourceType type) {
return null; List<StorageEntity> storageEntityList = new ArrayList<>();
LinkedList<StorageEntity> foldersToFetch = new LinkedList<>();
StorageEntity initialEntity = null;
try {
initialEntity = getFileStatus(path, defaultPath, tenantCode, type);
} catch (Exception e) {
logger.error("error while listing files status recursively, path: {}", path, e);
return storageEntityList;
}
foldersToFetch.add(initialEntity);
while (!foldersToFetch.isEmpty()) {
String pathToExplore = foldersToFetch.pop().getFullName();
try {
List<StorageEntity> tempList = listFilesStatus(pathToExplore, defaultPath, tenantCode, type);
for (StorageEntity temp : tempList) {
if (temp.isDirectory()) {
foldersToFetch.add(temp);
}
}
storageEntityList.addAll(tempList);
} catch (Exception e) {
logger.error("error while listing files status recursively, path: {}", pathToExplore, e);
}
}
return storageEntityList;
} }
@Override @Override
public List<StorageEntity> listFilesStatus(String path, String defaultPath, String tenantCode, public List<StorageEntity> listFilesStatus(String path, String defaultPath, String tenantCode,
ResourceType type) throws Exception { ResourceType type) throws Exception {
return null; List<StorageEntity> storageEntityList = new ArrayList<>();
ListObjectsV2Result result = null;
String nextContinuationToken = null;
do {
try {
ListObjectsV2Request request = new ListObjectsV2Request();
request.setBucketName(bucketName);
request.setPrefix(path);
request.setDelimiter(FOLDER_SEPARATOR);
request.setContinuationToken(nextContinuationToken);
result = ossClient.listObjectsV2(request);
} catch (Exception e) {
throw new ServiceException("Get OSS file list exception", e);
}
List<OSSObjectSummary> summaries = result.getObjectSummaries();
for (OSSObjectSummary summary : summaries) {
if (!summary.getKey().endsWith(FOLDER_SEPARATOR)) {
// the path is a file
String[] aliasArr = summary.getKey().split(FOLDER_SEPARATOR);
String alias = aliasArr[aliasArr.length - 1];
String fileName = StringUtils.difference(defaultPath, summary.getKey());
StorageEntity entity = new StorageEntity();
entity.setAlias(alias);
entity.setFileName(fileName);
entity.setFullName(summary.getKey());
entity.setDirectory(false);
entity.setDescription("");
entity.setUserName(tenantCode);
entity.setType(type);
entity.setSize(summary.getSize());
entity.setCreateTime(summary.getLastModified());
entity.setUpdateTime(summary.getLastModified());
entity.setPfullName(path);
storageEntityList.add(entity);
}
}
for (String commonPrefix : result.getCommonPrefixes()) {
// the paths in commonPrefix are directories
String suffix = StringUtils.difference(path, commonPrefix);
String fileName = StringUtils.difference(defaultPath, commonPrefix);
StorageEntity entity = new StorageEntity();
entity.setAlias(suffix);
entity.setFileName(fileName);
entity.setFullName(commonPrefix);
entity.setDirectory(true);
entity.setDescription("");
entity.setUserName(tenantCode);
entity.setType(type);
entity.setSize(0);
entity.setCreateTime(null);
entity.setUpdateTime(null);
entity.setPfullName(path);
storageEntityList.add(entity);
}
nextContinuationToken = result.getNextContinuationToken();
} while (result.isTruncated());
return storageEntityList;
} }
@Override @Override
public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode, public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode,
ResourceType type) throws Exception { ResourceType type) throws Exception {
return null; ListObjectsV2Request request = new ListObjectsV2Request();
request.setBucketName(bucketName);
request.setPrefix(path);
request.setDelimiter(FOLDER_SEPARATOR);
ListObjectsV2Result result;
try {
result = ossClient.listObjectsV2(request);
} catch (Exception e) {
throw new ServiceException("Get OSS file list exception", e);
}
List<OSSObjectSummary> summaries = result.getObjectSummaries();
if (path.endsWith(FOLDER_SEPARATOR)) {
// the path is a directory that may or may not exist in OSS
String alias = findDirAlias(path);
String fileName = StringUtils.difference(defaultPath, path);
StorageEntity entity = new StorageEntity();
entity.setAlias(alias);
entity.setFileName(fileName);
entity.setFullName(path);
entity.setDirectory(true);
entity.setDescription("");
entity.setUserName(tenantCode);
entity.setType(type);
entity.setSize(0);
return entity;
} else {
// the path is a file
if (summaries.size() > 0) {
OSSObjectSummary summary = summaries.get(0);
String[] aliasArr = summary.getKey().split(FOLDER_SEPARATOR);
String alias = aliasArr[aliasArr.length - 1];
String fileName = StringUtils.difference(defaultPath, summary.getKey());
StorageEntity entity = new StorageEntity();
entity.setAlias(alias);
entity.setFileName(fileName);
entity.setFullName(summary.getKey());
entity.setDirectory(false);
entity.setDescription("");
entity.setUserName(tenantCode);
entity.setType(type);
entity.setSize(summary.getSize());
entity.setCreateTime(summary.getLastModified());
entity.setUpdateTime(summary.getLastModified());
return entity;
}
}
throw new FileNotFoundException("Object is not found in OSS Bucket: " + bucketName);
} }
@Override @Override
@ -360,4 +530,13 @@ public class OssStorageOperator implements Closeable, StorageOperate {
protected OSS buildOssClient() { protected OSS buildOssClient() {
return OssClientFactory.buildOssClient(ossConnection); return OssClientFactory.buildOssClient(ossConnection);
} }
private String findDirAlias(String dirPath) {
if (!dirPath.endsWith(FOLDER_SEPARATOR)) {
return dirPath;
}
Path path = Paths.get(dirPath);
return path.getName(path.getNameCount() - 1) + FOLDER_SEPARATOR;
}
} }

40
dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java

@ -27,18 +27,24 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import com.aliyun.oss.OSS; import com.aliyun.oss.OSS;
import com.aliyun.oss.model.ListObjectsV2Request;
import com.aliyun.oss.model.ListObjectsV2Result;
@ExtendWith(MockitoExtension.class) @ExtendWith(MockitoExtension.class)
public class OssStorageOperatorTest { public class OssStorageOperatorTest {
@ -53,6 +59,10 @@ public class OssStorageOperatorTest {
private static final String FILE_NAME_MOCK = "FILE_NAME_MOCK"; private static final String FILE_NAME_MOCK = "FILE_NAME_MOCK";
private static final String FILE_PATH_MOCK = "FILE_PATH_MOCK"; private static final String FILE_PATH_MOCK = "FILE_PATH_MOCK";
private static final String FULL_NAME = "/tmp/dir1/";
private static final String DEFAULT_PATH = "/tmp/";
@Mock @Mock
private OSS ossClientMock; private OSS ossClientMock;
@ -248,4 +258,34 @@ public class OssStorageOperatorTest {
ossOperator.deleteDir(DIR_MOCK); ossOperator.deleteDir(DIR_MOCK);
verify(ossClientMock, times(1)).deleteObject(anyString(), anyString()); verify(ossClientMock, times(1)).deleteObject(anyString(), anyString());
} }
@Test
public void testGetFileStatus() throws Exception {
doReturn(new ListObjectsV2Result()).when(ossClientMock).listObjectsV2(Mockito.any(ListObjectsV2Request.class));
StorageEntity entity = ossOperator.getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
Assertions.assertEquals(FULL_NAME, entity.getFullName());
Assertions.assertEquals("dir1/", entity.getFileName());
}
@Test
public void testListFilesStatus() throws Exception {
doReturn(new ListObjectsV2Result()).when(ossClientMock).listObjectsV2(Mockito.any(ListObjectsV2Request.class));
List<StorageEntity> result =
ossOperator.listFilesStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
Assertions.assertEquals(0, result.size());
}
@Test
public void testListFilesStatusRecursively() throws Exception {
StorageEntity entity = new StorageEntity();
entity.setFullName(FULL_NAME);
doReturn(entity).when(ossOperator).getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
doReturn(Collections.EMPTY_LIST).when(ossOperator).listFilesStatus(anyString(), anyString(), anyString(),
Mockito.any(ResourceType.class));
List<StorageEntity> result =
ossOperator.listFilesStatusRecursively(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE);
Assertions.assertEquals(0, result.size());
}
} }

41
dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java

@ -43,6 +43,8 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -409,34 +411,31 @@ public class S3StorageOperator implements Closeable, StorageOperate {
public List<StorageEntity> listFilesStatusRecursively(String path, String defaultPath, String tenantCode, public List<StorageEntity> listFilesStatusRecursively(String path, String defaultPath, String tenantCode,
ResourceType type) { ResourceType type) {
List<StorageEntity> storageEntityList = new ArrayList<>(); List<StorageEntity> storageEntityList = new ArrayList<>();
LinkedList<StorageEntity> foldersToFetch = new LinkedList<>(); LinkedList<StorageEntity> foldersToFetch = new LinkedList<>();
do { StorageEntity initialEntity = null;
String pathToExplore = ""; try {
if (foldersToFetch.size() == 0) { initialEntity = getFileStatus(path, defaultPath, tenantCode, type);
pathToExplore = path; } catch (Exception e) {
} else { logger.error("error while listing files status recursively, path: {}", path, e);
pathToExplore = foldersToFetch.pop().getFullName(); return storageEntityList;
} }
foldersToFetch.add(initialEntity);
while (!foldersToFetch.isEmpty()) {
String pathToExplore = foldersToFetch.pop().getFullName();
try { try {
List<StorageEntity> tempList = listFilesStatus(pathToExplore, defaultPath, tenantCode, type); List<StorageEntity> tempList = listFilesStatus(pathToExplore, defaultPath, tenantCode, type);
for (StorageEntity temp : tempList) { for (StorageEntity temp : tempList) {
if (temp.isDirectory()) { if (temp.isDirectory()) {
foldersToFetch.add(temp); foldersToFetch.add(temp);
} }
} }
storageEntityList.addAll(tempList); storageEntityList.addAll(tempList);
} catch (AmazonServiceException e) { } catch (Exception e) {
logger.error("Resource path: {}", pathToExplore, e); logger.error("error while listing files status recursively, path: {}", pathToExplore, e);
// return the resources fetched before error occurs.
return storageEntityList;
} }
}
} while (foldersToFetch.size() != 0);
return storageEntityList; return storageEntityList;
@ -585,16 +584,12 @@ public class S3StorageOperator implements Closeable, StorageOperate {
* a directory is a path ending with "/" * a directory is a path ending with "/"
*/ */
private String findDirAlias(String myStr) { private String findDirAlias(String myStr) {
if (!myStr.endsWith("/")) { if (!myStr.endsWith(FOLDER_SEPARATOR)) {
// Make sure system won't crush down if someone accidentally misuse the function. // Make sure system won't crush down if someone accidentally misuse the function.
return myStr; return myStr;
} }
int lastIndex = myStr.lastIndexOf("/");
String subbedString = myStr.substring(0, lastIndex);
int secondLastIndex = subbedString.lastIndexOf("/");
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(myStr, secondLastIndex + 1, lastIndex + 1);
return stringBuilder.toString(); Path path = Paths.get(myStr);
return path.getName(path.getNameCount() - 1) + FOLDER_SEPARATOR;
} }
} }

2
dolphinscheduler-ui/src/locales/zh_CN/resource.ts

@ -24,6 +24,7 @@ export default {
enter_keyword_tips: '请输入关键词', enter_keyword_tips: '请输入关键词',
name: '名称', name: '名称',
user_name: '所属用户', user_name: '所属用户',
tenant_name: '所属租户',
whether_directory: '是否文件夹', whether_directory: '是否文件夹',
file_name: '文件名称', file_name: '文件名称',
description: '描述', description: '描述',
@ -58,6 +59,7 @@ export default {
upload_udf_resources: '上传UDF资源', upload_udf_resources: '上传UDF资源',
udf_source_name: 'UDF资源名称', udf_source_name: 'UDF资源名称',
user_name: '所属用户', user_name: '所属用户',
tenant_name: '所属租户',
whether_directory: '是否文件夹', whether_directory: '是否文件夹',
file_name: '文件名称', file_name: '文件名称',
file_size: '文件大小', file_size: '文件大小',

Loading…
Cancel
Save