Rick Cheng
1 year ago
committed by
GitHub
27 changed files with 970 additions and 27 deletions
@ -0,0 +1,14 @@
|
||||
Copyright(c) 2016 Microsoft Corporation |
||||
All rights reserved. |
||||
|
||||
MIT License |
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), |
||||
to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, |
||||
and / or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions : |
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. |
||||
|
||||
THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
||||
IN THE SOFTWARE. |
@ -0,0 +1,14 @@
|
||||
Copyright(c) 2016 Microsoft Corporation |
||||
All rights reserved. |
||||
|
||||
MIT License |
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), |
||||
to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, |
||||
and / or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions : |
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. |
||||
|
||||
THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS |
||||
IN THE SOFTWARE. |
@ -0,0 +1,47 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?> |
||||
<!-- |
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more |
||||
~ contributor license agreements. See the NOTICE file distributed with |
||||
~ this work for additional information regarding copyright ownership. |
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
~ (the "License"); you may not use this file except in compliance with |
||||
~ the License. You may obtain a copy of the License at |
||||
~ |
||||
~ http://www.apache.org/licenses/LICENSE-2.0 |
||||
~ |
||||
~ Unless required by applicable law or agreed to in writing, software |
||||
~ distributed under the License is distributed on an "AS IS" BASIS, |
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
~ See the License for the specific language governing permissions and |
||||
~ limitations under the License. |
||||
--> |
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
||||
<modelVersion>4.0.0</modelVersion> |
||||
<parent> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-plugin</artifactId> |
||||
<version>dev-SNAPSHOT</version> |
||||
</parent> |
||||
|
||||
<artifactId>dolphinscheduler-storage-abs</artifactId> |
||||
|
||||
<dependencies> |
||||
<dependency> |
||||
<groupId>com.azure</groupId> |
||||
<artifactId>azure-storage-blob</artifactId> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-storage-api</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
|
||||
<dependency> |
||||
<groupId>org.apache.dolphinscheduler</groupId> |
||||
<artifactId>dolphinscheduler-task-api</artifactId> |
||||
<version>${project.version}</version> |
||||
</dependency> |
||||
</dependencies> |
||||
</project> |
@ -0,0 +1,488 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.storage.abs; |
||||
|
||||
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; |
||||
import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR; |
||||
import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S; |
||||
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_FILE; |
||||
import static org.apache.dolphinscheduler.common.constants.Constants.RESOURCE_TYPE_UDF; |
||||
|
||||
import org.apache.dolphinscheduler.common.constants.Constants; |
||||
import org.apache.dolphinscheduler.common.enums.ResUploadType; |
||||
import org.apache.dolphinscheduler.common.utils.PropertyUtils; |
||||
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; |
||||
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; |
||||
import org.apache.dolphinscheduler.spi.enums.ResourceType; |
||||
|
||||
import org.apache.commons.lang3.StringUtils; |
||||
|
||||
import java.io.BufferedReader; |
||||
import java.io.ByteArrayInputStream; |
||||
import java.io.Closeable; |
||||
import java.io.File; |
||||
import java.io.FileNotFoundException; |
||||
import java.io.IOException; |
||||
import java.io.InputStreamReader; |
||||
import java.nio.file.Files; |
||||
import java.nio.file.Path; |
||||
import java.nio.file.Paths; |
||||
import java.sql.Date; |
||||
import java.util.ArrayList; |
||||
import java.util.Collections; |
||||
import java.util.LinkedList; |
||||
import java.util.List; |
||||
import java.util.stream.Collectors; |
||||
import java.util.stream.Stream; |
||||
|
||||
import lombok.Data; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import com.azure.core.http.rest.PagedIterable; |
||||
import com.azure.storage.blob.BlobClient; |
||||
import com.azure.storage.blob.BlobContainerClient; |
||||
import com.azure.storage.blob.BlobServiceClient; |
||||
import com.azure.storage.blob.BlobServiceClientBuilder; |
||||
import com.azure.storage.blob.models.BlobContainerItem; |
||||
import com.azure.storage.blob.models.BlobItem; |
||||
import com.azure.storage.blob.specialized.BlockBlobClient; |
||||
|
||||
@Data |
||||
@Slf4j |
||||
public class AbsStorageOperator implements Closeable, StorageOperate { |
||||
|
||||
private BlobContainerClient blobContainerClient; |
||||
|
||||
private BlobServiceClient blobServiceClient; |
||||
|
||||
private String connectionString; |
||||
|
||||
private String storageAccountName; |
||||
|
||||
private String containerName; |
||||
|
||||
public AbsStorageOperator() { |
||||
|
||||
} |
||||
|
||||
public void init() { |
||||
containerName = readContainerName(); |
||||
connectionString = readConnectionString(); |
||||
storageAccountName = readAccountName(); |
||||
blobServiceClient = buildBlobServiceClient(); |
||||
blobContainerClient = buildBlobContainerClient(); |
||||
checkContainerNameExists(); |
||||
} |
||||
|
||||
protected BlobServiceClient buildBlobServiceClient() { |
||||
return new BlobServiceClientBuilder() |
||||
.endpoint("https://" + storageAccountName + ".blob.core.windows.net/") |
||||
.connectionString(connectionString) |
||||
.buildClient(); |
||||
} |
||||
|
||||
protected BlobContainerClient buildBlobContainerClient() { |
||||
return blobServiceClient.getBlobContainerClient(containerName); |
||||
} |
||||
|
||||
protected String readConnectionString() { |
||||
return PropertyUtils.getString(Constants.AZURE_BLOB_STORAGE_CONNECTION_STRING); |
||||
} |
||||
|
||||
protected String readContainerName() { |
||||
return PropertyUtils.getString(Constants.AZURE_BLOB_STORAGE_CONTAINER_NAME); |
||||
} |
||||
|
||||
protected String readAccountName() { |
||||
return PropertyUtils.getString(Constants.AZURE_BLOB_STORAGE_ACCOUNT_NAME); |
||||
} |
||||
|
||||
@Override |
||||
public void createTenantDirIfNotExists(String tenantCode) throws Exception { |
||||
mkdir(tenantCode, getAbsResDir(tenantCode)); |
||||
mkdir(tenantCode, getAbsUdfDir(tenantCode)); |
||||
} |
||||
|
||||
public String getAbsResDir(String tenantCode) { |
||||
return String.format("%s/" + RESOURCE_TYPE_FILE, getAbsTenantDir(tenantCode)); |
||||
} |
||||
|
||||
public String getAbsUdfDir(String tenantCode) { |
||||
return String.format("%s/" + RESOURCE_TYPE_UDF, getAbsTenantDir(tenantCode)); |
||||
} |
||||
|
||||
public String getAbsTenantDir(String tenantCode) { |
||||
return String.format(FORMAT_S_S, getGcsDataBasePath(), tenantCode); |
||||
} |
||||
|
||||
public String getGcsDataBasePath() { |
||||
if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) { |
||||
return EMPTY_STRING; |
||||
} else { |
||||
return RESOURCE_UPLOAD_PATH.replaceFirst(FOLDER_SEPARATOR, EMPTY_STRING); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public String getResDir(String tenantCode) { |
||||
return getAbsResDir(tenantCode) + FOLDER_SEPARATOR; |
||||
} |
||||
|
||||
@Override |
||||
public String getUdfDir(String tenantCode) { |
||||
return getAbsUdfDir(tenantCode) + FOLDER_SEPARATOR; |
||||
} |
||||
|
||||
@Override |
||||
public String getResourceFileName(String tenantCode, String fullName) { |
||||
String resDir = getResDir(tenantCode); |
||||
return fullName.replaceFirst(resDir, ""); |
||||
} |
||||
|
||||
@Override |
||||
public String getResourceFullName(String tenantCode, String fileName) { |
||||
if (fileName.startsWith(FOLDER_SEPARATOR)) { |
||||
fileName.replaceFirst(FOLDER_SEPARATOR, EMPTY_STRING); |
||||
} |
||||
return String.format(FORMAT_S_S, getAbsResDir(tenantCode), fileName); |
||||
} |
||||
|
||||
@Override |
||||
public String getFileName(ResourceType resourceType, String tenantCode, String fileName) { |
||||
if (fileName.startsWith(FOLDER_SEPARATOR)) { |
||||
fileName = fileName.replaceFirst(FOLDER_SEPARATOR, EMPTY_STRING); |
||||
} |
||||
return getDir(resourceType, tenantCode) + fileName; |
||||
} |
||||
|
||||
@Override |
||||
public void download(String tenantCode, String srcFilePath, String dstFilePath, |
||||
boolean overwrite) throws IOException { |
||||
File dstFile = new File(dstFilePath); |
||||
if (dstFile.isDirectory()) { |
||||
Files.delete(dstFile.toPath()); |
||||
} else { |
||||
Files.createDirectories(dstFile.getParentFile().toPath()); |
||||
} |
||||
|
||||
BlobClient blobClient = blobContainerClient.getBlobClient(srcFilePath); |
||||
blobClient.downloadToFile(dstFilePath, true); |
||||
} |
||||
|
||||
@Override |
||||
public boolean exists(String fullName) throws IOException { |
||||
return isObjectExists(fullName); |
||||
} |
||||
|
||||
protected boolean isObjectExists(String objectName) { |
||||
return blobContainerClient.getBlobClient(objectName).exists(); |
||||
} |
||||
|
||||
@Override |
||||
public boolean delete(String filePath, boolean recursive) throws IOException { |
||||
try { |
||||
if (isObjectExists(filePath)) { |
||||
blobContainerClient.getBlobClient(filePath).delete(); |
||||
} |
||||
return true; |
||||
} catch (Exception e) { |
||||
log.error("delete the object error,the resource path is {}", filePath); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public boolean delete(String fullName, List<String> childrenPathList, boolean recursive) throws IOException { |
||||
// append the resource fullName to the list for deletion.
|
||||
childrenPathList.add(fullName); |
||||
|
||||
boolean result = true; |
||||
for (String filePath : childrenPathList) { |
||||
if (!delete(filePath, recursive)) { |
||||
result = false; |
||||
} |
||||
} |
||||
|
||||
return result; |
||||
} |
||||
|
||||
@Override |
||||
public boolean copy(String srcPath, String dstPath, boolean deleteSource, boolean overwrite) throws IOException { |
||||
BlobClient srcBlobClient = blobContainerClient.getBlobClient(srcPath); |
||||
BlockBlobClient dstBlobClient = blobContainerClient.getBlobClient(dstPath).getBlockBlobClient(); |
||||
|
||||
dstBlobClient.uploadFromUrl(srcBlobClient.getBlobUrl(), overwrite); |
||||
|
||||
if (deleteSource) { |
||||
srcBlobClient.delete(); |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public boolean upload(String tenantCode, String srcFile, String dstPath, boolean deleteSource, |
||||
boolean overwrite) throws IOException { |
||||
try { |
||||
BlobClient blobClient = blobContainerClient.getBlobClient(dstPath); |
||||
blobClient.uploadFromFile(srcFile, overwrite); |
||||
|
||||
Path srcPath = Paths.get(srcFile); |
||||
if (deleteSource) { |
||||
Files.delete(srcPath); |
||||
} |
||||
return true; |
||||
} catch (Exception e) { |
||||
log.error("upload failed,the container is {},the filePath is {}", containerName, dstPath); |
||||
return false; |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException { |
||||
if (StringUtils.isBlank(filePath)) { |
||||
log.error("file path:{} is blank", filePath); |
||||
return Collections.emptyList(); |
||||
} |
||||
|
||||
BlobClient blobClient = blobContainerClient.getBlobClient(filePath); |
||||
try ( |
||||
BufferedReader bufferedReader = |
||||
new BufferedReader(new InputStreamReader( |
||||
new ByteArrayInputStream(blobClient.downloadContent().toBytes())))) { |
||||
Stream<String> stream = bufferedReader.lines().skip(skipLineNums).limit(limit); |
||||
return stream.collect(Collectors.toList()); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void deleteTenant(String tenantCode) throws Exception { |
||||
deleteTenantCode(tenantCode); |
||||
} |
||||
|
||||
protected void deleteTenantCode(String tenantCode) { |
||||
deleteDirectory(getResDir(tenantCode)); |
||||
deleteDirectory(getUdfDir(tenantCode)); |
||||
} |
||||
|
||||
@Override |
||||
public String getDir(ResourceType resourceType, String tenantCode) { |
||||
switch (resourceType) { |
||||
case UDF: |
||||
return getUdfDir(tenantCode); |
||||
case FILE: |
||||
return getResDir(tenantCode); |
||||
case ALL: |
||||
return getGcsDataBasePath(); |
||||
default: |
||||
return EMPTY_STRING; |
||||
} |
||||
|
||||
} |
||||
|
||||
protected void deleteDirectory(String directoryName) { |
||||
if (isObjectExists(directoryName)) { |
||||
blobContainerClient.getBlobClient(directoryName).delete(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public boolean mkdir(String tenantCode, String path) throws IOException { |
||||
String objectName = path + FOLDER_SEPARATOR; |
||||
if (!isObjectExists(objectName)) { |
||||
BlobClient blobClient = blobContainerClient.getBlobClient(objectName); |
||||
blobClient.upload(new ByteArrayInputStream(EMPTY_STRING.getBytes()), 0); |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
@Override |
||||
public void close() throws IOException { |
||||
|
||||
} |
||||
|
||||
@Override |
||||
public ResUploadType returnStorageType() { |
||||
return ResUploadType.ABS; |
||||
} |
||||
|
||||
@Override |
||||
public List<StorageEntity> listFilesStatusRecursively(String path, String defaultPath, String tenantCode, |
||||
ResourceType type) { |
||||
List<StorageEntity> storageEntityList = new ArrayList<>(); |
||||
LinkedList<StorageEntity> foldersToFetch = new LinkedList<>(); |
||||
|
||||
StorageEntity initialEntity = null; |
||||
try { |
||||
initialEntity = getFileStatus(path, defaultPath, tenantCode, type); |
||||
} catch (Exception e) { |
||||
log.error("error while listing files status recursively, path: {}", path, e); |
||||
return storageEntityList; |
||||
} |
||||
foldersToFetch.add(initialEntity); |
||||
|
||||
while (!foldersToFetch.isEmpty()) { |
||||
String pathToExplore = foldersToFetch.pop().getFullName(); |
||||
try { |
||||
List<StorageEntity> tempList = listFilesStatus(pathToExplore, defaultPath, tenantCode, type); |
||||
for (StorageEntity temp : tempList) { |
||||
if (temp.isDirectory()) { |
||||
foldersToFetch.add(temp); |
||||
} |
||||
} |
||||
storageEntityList.addAll(tempList); |
||||
} catch (Exception e) { |
||||
log.error("error while listing files stat:wus recursively, path: {}", pathToExplore, e); |
||||
} |
||||
} |
||||
|
||||
return storageEntityList; |
||||
} |
||||
|
||||
@Override |
||||
public List<StorageEntity> listFilesStatus(String path, String defaultPath, String tenantCode, |
||||
ResourceType type) throws Exception { |
||||
List<StorageEntity> storageEntityList = new ArrayList<>(); |
||||
|
||||
PagedIterable<BlobItem> blobItems; |
||||
blobItems = blobContainerClient.listBlobsByHierarchy(path); |
||||
if (blobItems == null) { |
||||
return storageEntityList; |
||||
} |
||||
|
||||
for (BlobItem blobItem : blobItems) { |
||||
if (path.equals(blobItem.getName())) { |
||||
continue; |
||||
} |
||||
if (blobItem.isPrefix()) { |
||||
String suffix = StringUtils.difference(path, blobItem.getName()); |
||||
String fileName = StringUtils.difference(defaultPath, blobItem.getName()); |
||||
StorageEntity entity = new StorageEntity(); |
||||
entity.setAlias(suffix); |
||||
entity.setFileName(fileName); |
||||
entity.setFullName(blobItem.getName()); |
||||
entity.setDirectory(true); |
||||
entity.setDescription(EMPTY_STRING); |
||||
entity.setUserName(tenantCode); |
||||
entity.setType(type); |
||||
entity.setSize(0); |
||||
entity.setCreateTime(null); |
||||
entity.setUpdateTime(null); |
||||
entity.setPfullName(path); |
||||
|
||||
storageEntityList.add(entity); |
||||
} else { |
||||
String[] aliasArr = blobItem.getName().split("/"); |
||||
String alias = aliasArr[aliasArr.length - 1]; |
||||
String fileName = StringUtils.difference(defaultPath, blobItem.getName()); |
||||
|
||||
StorageEntity entity = new StorageEntity(); |
||||
entity.setAlias(alias); |
||||
entity.setFileName(fileName); |
||||
entity.setFullName(blobItem.getName()); |
||||
entity.setDirectory(false); |
||||
entity.setDescription(EMPTY_STRING); |
||||
entity.setUserName(tenantCode); |
||||
entity.setType(type); |
||||
entity.setSize(blobItem.getProperties().getContentLength()); |
||||
entity.setCreateTime(Date.from(blobItem.getProperties().getCreationTime().toInstant())); |
||||
entity.setUpdateTime(Date.from(blobItem.getProperties().getLastModified().toInstant())); |
||||
entity.setPfullName(path); |
||||
|
||||
storageEntityList.add(entity); |
||||
} |
||||
} |
||||
|
||||
return storageEntityList; |
||||
} |
||||
|
||||
@Override |
||||
public StorageEntity getFileStatus(String path, String defaultPath, String tenantCode, |
||||
ResourceType type) throws Exception { |
||||
if (path.endsWith(FOLDER_SEPARATOR)) { |
||||
// the path is a directory that may or may not exist
|
||||
String alias = findDirAlias(path); |
||||
String fileName = StringUtils.difference(defaultPath, path); |
||||
|
||||
StorageEntity entity = new StorageEntity(); |
||||
entity.setAlias(alias); |
||||
entity.setFileName(fileName); |
||||
entity.setFullName(path); |
||||
entity.setDirectory(true); |
||||
entity.setDescription(EMPTY_STRING); |
||||
entity.setUserName(tenantCode); |
||||
entity.setType(type); |
||||
entity.setSize(0); |
||||
|
||||
return entity; |
||||
} else { |
||||
if (isObjectExists(path)) { |
||||
BlobClient blobClient = blobContainerClient.getBlobClient(path); |
||||
|
||||
String[] aliasArr = blobClient.getBlobName().split(FOLDER_SEPARATOR); |
||||
String alias = aliasArr[aliasArr.length - 1]; |
||||
String fileName = StringUtils.difference(defaultPath, blobClient.getBlobName()); |
||||
|
||||
StorageEntity entity = new StorageEntity(); |
||||
entity.setAlias(alias); |
||||
entity.setFileName(fileName); |
||||
entity.setFullName(blobClient.getBlobName()); |
||||
entity.setDirectory(false); |
||||
entity.setDescription(EMPTY_STRING); |
||||
entity.setUserName(tenantCode); |
||||
entity.setType(type); |
||||
entity.setSize(blobClient.getProperties().getBlobSize()); |
||||
entity.setCreateTime(Date.from(blobClient.getProperties().getCreationTime().toInstant())); |
||||
entity.setUpdateTime(Date.from(blobClient.getProperties().getLastModified().toInstant())); |
||||
|
||||
return entity; |
||||
} else { |
||||
throw new FileNotFoundException("Object is not found in ABS container: " + containerName); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private String findDirAlias(String dirPath) { |
||||
if (!dirPath.endsWith(FOLDER_SEPARATOR)) { |
||||
return dirPath; |
||||
} |
||||
|
||||
Path path = Paths.get(dirPath); |
||||
return path.getName(path.getNameCount() - 1) + FOLDER_SEPARATOR; |
||||
} |
||||
|
||||
public void checkContainerNameExists() { |
||||
if (StringUtils.isBlank(containerName)) { |
||||
throw new IllegalArgumentException(containerName + " is blank"); |
||||
} |
||||
|
||||
boolean exist = false; |
||||
for (BlobContainerItem item : blobServiceClient.listBlobContainers()) { |
||||
if (containerName.equals(item.getName())) { |
||||
exist = true; |
||||
break; |
||||
} |
||||
} |
||||
|
||||
if (!exist) { |
||||
throw new IllegalArgumentException( |
||||
"containerName: " + containerName + " is not exists, you need to create them by yourself"); |
||||
} else { |
||||
log.info("containerName: {} has been found", containerName); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,40 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.storage.abs; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate; |
||||
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperateFactory; |
||||
import org.apache.dolphinscheduler.plugin.storage.api.StorageType; |
||||
|
||||
import com.google.auto.service.AutoService; |
||||
|
||||
@AutoService(StorageOperateFactory.class) |
||||
public class AbsStorageOperatorFactory implements StorageOperateFactory { |
||||
|
||||
@Override |
||||
public StorageOperate createStorageOperate() { |
||||
AbsStorageOperator absStorageOperator = new AbsStorageOperator(); |
||||
absStorageOperator.init(); |
||||
return absStorageOperator; |
||||
} |
||||
|
||||
@Override |
||||
public StorageType getStorageOperate() { |
||||
return StorageType.ABS; |
||||
} |
||||
} |
@ -0,0 +1,274 @@
|
||||
/* |
||||
* Licensed to the Apache Software Foundation (ASF) under one or more |
||||
* contributor license agreements. See the NOTICE file distributed with |
||||
* this work for additional information regarding copyright ownership. |
||||
* The ASF licenses this file to You under the Apache License, Version 2.0 |
||||
* (the "License"); you may not use this file except in compliance with |
||||
* the License. You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.apache.dolphinscheduler.plugin.storage.abs; |
||||
|
||||
import static org.apache.dolphinscheduler.common.constants.Constants.FOLDER_SEPARATOR; |
||||
import static org.apache.dolphinscheduler.common.constants.Constants.FORMAT_S_S; |
||||
import static org.mockito.ArgumentMatchers.anyString; |
||||
import static org.mockito.Mockito.doNothing; |
||||
import static org.mockito.Mockito.doReturn; |
||||
import static org.mockito.Mockito.times; |
||||
import static org.mockito.Mockito.verify; |
||||
|
||||
import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity; |
||||
import org.apache.dolphinscheduler.spi.enums.ResourceType; |
||||
|
||||
import java.io.IOException; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
|
||||
import org.junit.jupiter.api.Assertions; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.junit.jupiter.api.extension.ExtendWith; |
||||
import org.mockito.Mock; |
||||
import org.mockito.Mockito; |
||||
import org.mockito.junit.jupiter.MockitoExtension; |
||||
|
||||
import com.azure.storage.blob.BlobClient; |
||||
import com.azure.storage.blob.BlobContainerClient; |
||||
import com.azure.storage.blob.BlobServiceClient; |
||||
import com.azure.storage.blob.specialized.BlockBlobClient; |
||||
|
||||
@ExtendWith(MockitoExtension.class) |
||||
public class AbsStorageOperatorTest { |
||||
|
||||
private static final String CONNECTION_STRING_MOCK = "CONNECTION_STRING_MOCK"; |
||||
|
||||
private static final String ACCOUNT_NAME_MOCK = "ACCOUNT_NAME_MOCK"; |
||||
|
||||
private static final String CONTAINER_NAME_MOCK = "CONTAINER_NAME_MOCK"; |
||||
|
||||
private static final String TENANT_CODE_MOCK = "TENANT_CODE_MOCK"; |
||||
|
||||
private static final String DIR_MOCK = "DIR_MOCK"; |
||||
|
||||
private static final String FILE_NAME_MOCK = "FILE_NAME_MOCK"; |
||||
|
||||
private static final String FILE_PATH_MOCK = "FILE_PATH_MOCK"; |
||||
|
||||
private static final String FULL_NAME = "/tmp/dir1/"; |
||||
|
||||
private static final String DEFAULT_PATH = "/tmp/"; |
||||
|
||||
@Mock |
||||
private BlobContainerClient blobContainerClient; |
||||
|
||||
@Mock |
||||
private BlobServiceClient blobServiceClient; |
||||
|
||||
@Mock |
||||
private BlockBlobClient blockBlobClient; |
||||
|
||||
@Mock |
||||
private BlobClient blobClient; |
||||
|
||||
private AbsStorageOperator absStorageOperator; |
||||
|
||||
@BeforeEach |
||||
public void setUp() throws Exception { |
||||
absStorageOperator = Mockito.spy(AbsStorageOperator.class); |
||||
Mockito.doReturn(CONNECTION_STRING_MOCK).when(absStorageOperator).readConnectionString(); |
||||
Mockito.doReturn(CONTAINER_NAME_MOCK).when(absStorageOperator).readContainerName(); |
||||
Mockito.doReturn(ACCOUNT_NAME_MOCK).when(absStorageOperator).readAccountName(); |
||||
Mockito.doReturn(blobContainerClient).when(absStorageOperator).buildBlobContainerClient(); |
||||
Mockito.doReturn(blobServiceClient).when(absStorageOperator).buildBlobServiceClient(); |
||||
Mockito.doNothing().when(absStorageOperator).checkContainerNameExists(); |
||||
|
||||
absStorageOperator.init(); |
||||
} |
||||
|
||||
@Test |
||||
public void testInit() throws Exception { |
||||
verify(absStorageOperator, times(1)).buildBlobServiceClient(); |
||||
verify(absStorageOperator, times(1)).buildBlobContainerClient(); |
||||
Assertions.assertEquals(CONNECTION_STRING_MOCK, absStorageOperator.getConnectionString()); |
||||
Assertions.assertEquals(CONTAINER_NAME_MOCK, absStorageOperator.getContainerName()); |
||||
Assertions.assertEquals(ACCOUNT_NAME_MOCK, absStorageOperator.getStorageAccountName()); |
||||
} |
||||
|
||||
@Test |
||||
public void createTenantResAndUdfDir() throws Exception { |
||||
doReturn(DIR_MOCK).when(absStorageOperator).getAbsResDir(TENANT_CODE_MOCK); |
||||
doReturn(DIR_MOCK).when(absStorageOperator).getAbsUdfDir(TENANT_CODE_MOCK); |
||||
doReturn(true).when(absStorageOperator).mkdir(TENANT_CODE_MOCK, DIR_MOCK); |
||||
absStorageOperator.createTenantDirIfNotExists(TENANT_CODE_MOCK); |
||||
verify(absStorageOperator, times(2)).mkdir(TENANT_CODE_MOCK, DIR_MOCK); |
||||
} |
||||
|
||||
@Test |
||||
public void getResDir() { |
||||
final String expectedResourceDir = String.format("dolphinscheduler/%s/resources/", TENANT_CODE_MOCK); |
||||
final String dir = absStorageOperator.getResDir(TENANT_CODE_MOCK); |
||||
Assertions.assertEquals(expectedResourceDir, dir); |
||||
} |
||||
|
||||
@Test |
||||
public void getUdfDir() { |
||||
final String expectedUdfDir = String.format("dolphinscheduler/%s/udfs/", TENANT_CODE_MOCK); |
||||
final String dir = absStorageOperator.getUdfDir(TENANT_CODE_MOCK); |
||||
Assertions.assertEquals(expectedUdfDir, dir); |
||||
} |
||||
|
||||
@Test |
||||
public void mkdirWhenDirExists() { |
||||
boolean isSuccess = false; |
||||
try { |
||||
final String key = DIR_MOCK + FOLDER_SEPARATOR; |
||||
Mockito.doReturn(true).when(absStorageOperator).isObjectExists(key); |
||||
isSuccess = absStorageOperator.mkdir(TENANT_CODE_MOCK, DIR_MOCK); |
||||
|
||||
} catch (IOException e) { |
||||
Assertions.fail("test failed due to unexpected IO exception"); |
||||
} |
||||
|
||||
Assertions.assertTrue(isSuccess); |
||||
} |
||||
|
||||
@Test |
||||
public void getResourceFullName() { |
||||
final String expectedResourceFileName = |
||||
String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); |
||||
final String resourceFileName = absStorageOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK); |
||||
Assertions.assertEquals(expectedResourceFileName, resourceFileName); |
||||
} |
||||
|
||||
@Test |
||||
public void getFileName() { |
||||
final String expectedFileName = |
||||
String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); |
||||
final String fileName = absStorageOperator.getFileName(ResourceType.FILE, TENANT_CODE_MOCK, FILE_NAME_MOCK); |
||||
Assertions.assertEquals(expectedFileName, fileName); |
||||
} |
||||
|
||||
@Test |
||||
public void exists() { |
||||
boolean doesExist = false; |
||||
doReturn(true).when(absStorageOperator).isObjectExists(FILE_NAME_MOCK); |
||||
try { |
||||
doesExist = absStorageOperator.exists(FILE_NAME_MOCK); |
||||
} catch (IOException e) { |
||||
Assertions.fail("unexpected IO exception in unit test"); |
||||
} |
||||
|
||||
Assertions.assertTrue(doesExist); |
||||
} |
||||
|
||||
@Test |
||||
public void delete() { |
||||
boolean isDeleted = false; |
||||
doReturn(true).when(absStorageOperator).isObjectExists(FILE_NAME_MOCK); |
||||
Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(Mockito.anyString()); |
||||
try { |
||||
isDeleted = absStorageOperator.delete(FILE_NAME_MOCK, true); |
||||
} catch (IOException e) { |
||||
Assertions.fail("unexpected IO exception in unit test"); |
||||
} |
||||
|
||||
Assertions.assertTrue(isDeleted); |
||||
verify(blobClient, times(1)).delete(); |
||||
} |
||||
|
||||
@Test |
||||
public void copy() { |
||||
boolean isSuccess = false; |
||||
Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(Mockito.anyString()); |
||||
Mockito.doReturn(blockBlobClient).when(blobClient).getBlockBlobClient(); |
||||
try { |
||||
isSuccess = absStorageOperator.copy(FILE_PATH_MOCK, FILE_PATH_MOCK, false, false); |
||||
} catch (IOException e) { |
||||
Assertions.fail("unexpected IO exception in unit test"); |
||||
} |
||||
|
||||
Assertions.assertTrue(isSuccess); |
||||
} |
||||
|
||||
@Test |
||||
public void deleteTenant() { |
||||
doNothing().when(absStorageOperator).deleteTenantCode(anyString()); |
||||
try { |
||||
absStorageOperator.deleteTenant(TENANT_CODE_MOCK); |
||||
} catch (Exception e) { |
||||
Assertions.fail("unexpected exception caught in unit test"); |
||||
} |
||||
|
||||
verify(absStorageOperator, times(1)).deleteTenantCode(anyString()); |
||||
} |
||||
|
||||
@Test |
||||
public void getGcsResDir() { |
||||
final String expectedGcsResDir = String.format("dolphinscheduler/%s/resources", TENANT_CODE_MOCK); |
||||
final String gcsResDir = absStorageOperator.getAbsResDir(TENANT_CODE_MOCK); |
||||
Assertions.assertEquals(expectedGcsResDir, gcsResDir); |
||||
} |
||||
|
||||
@Test |
||||
public void getGcsUdfDir() { |
||||
final String expectedGcsUdfDir = String.format("dolphinscheduler/%s/udfs", TENANT_CODE_MOCK); |
||||
final String gcsUdfDir = absStorageOperator.getAbsUdfDir(TENANT_CODE_MOCK); |
||||
Assertions.assertEquals(expectedGcsUdfDir, gcsUdfDir); |
||||
} |
||||
|
||||
@Test |
||||
public void getGcsTenantDir() { |
||||
final String expectedGcsTenantDir = String.format(FORMAT_S_S, DIR_MOCK, TENANT_CODE_MOCK); |
||||
doReturn(DIR_MOCK).when(absStorageOperator).getGcsDataBasePath(); |
||||
final String gcsTenantDir = absStorageOperator.getAbsTenantDir(TENANT_CODE_MOCK); |
||||
Assertions.assertEquals(expectedGcsTenantDir, gcsTenantDir); |
||||
} |
||||
|
||||
@Test |
||||
public void deleteDir() { |
||||
Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(Mockito.anyString()); |
||||
doReturn(true).when(absStorageOperator).isObjectExists(Mockito.any()); |
||||
absStorageOperator.deleteDirectory(DIR_MOCK); |
||||
verify(blobClient, times(1)).delete(); |
||||
} |
||||
|
||||
@Test |
||||
public void testGetFileStatus() throws Exception { |
||||
StorageEntity entity = |
||||
absStorageOperator.getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE); |
||||
Assertions.assertEquals(FULL_NAME, entity.getFullName()); |
||||
Assertions.assertEquals("dir1/", entity.getFileName()); |
||||
} |
||||
|
||||
@Test |
||||
public void testListFilesStatus() throws Exception { |
||||
Mockito.doReturn(null).when(blobContainerClient).listBlobsByHierarchy(Mockito.any()); |
||||
List<StorageEntity> result = |
||||
absStorageOperator.listFilesStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, ResourceType.FILE); |
||||
verify(blobContainerClient, times(1)).listBlobsByHierarchy(Mockito.any()); |
||||
} |
||||
|
||||
@Test |
||||
public void testListFilesStatusRecursively() throws Exception { |
||||
StorageEntity entity = new StorageEntity(); |
||||
entity.setFullName(FULL_NAME); |
||||
|
||||
doReturn(entity).when(absStorageOperator).getFileStatus(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, |
||||
ResourceType.FILE); |
||||
doReturn(Collections.EMPTY_LIST).when(absStorageOperator).listFilesStatus(anyString(), anyString(), anyString(), |
||||
Mockito.any(ResourceType.class)); |
||||
|
||||
List<StorageEntity> result = |
||||
absStorageOperator.listFilesStatusRecursively(FULL_NAME, DEFAULT_PATH, TENANT_CODE_MOCK, |
||||
ResourceType.FILE); |
||||
Assertions.assertEquals(0, result.size()); |
||||
} |
||||
} |
Loading…
Reference in new issue