Browse Source

[improv][Resource Center] Implement getResourceFileName in StorageOperator (#14097)

3.2.0-release
Aaron Wang 2 years ago committed by GitHub
parent
commit
1e3ae7b942
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  2. 2
      dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java
  3. 10
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java
  4. 7
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/main/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperator.java
  5. 13
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperatorTest.java
  6. 24
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java
  7. 7
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java
  8. 13
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java
  9. 24
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java
  10. 13
      dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java
  11. 14
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java
  12. 6
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -1534,7 +1534,7 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
String tenantCode = getTenantCode(user); String tenantCode = getTenantCode(user);
String baseFolder = storageOperate.getResourceFileName(tenantCode, "DATA_TRANSFER"); String baseFolder = storageOperate.getResourceFullName(tenantCode, "DATA_TRANSFER");
LocalDateTime now = LocalDateTime.now(); LocalDateTime now = LocalDateTime.now();
now = now.minus(days, ChronoUnit.DAYS); now = now.minus(days, ChronoUnit.DAYS);

2
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskCacheUtils.java

@ -178,7 +178,7 @@ public class TaskCacheUtils {
public static String getValCheckSum(Property fileProperty, TaskExecutionContext context, public static String getValCheckSum(Property fileProperty, TaskExecutionContext context,
StorageOperate storageOperate) { StorageOperate storageOperate) {
String resourceCRCPath = fileProperty.getValue() + CRC_SUFFIX; String resourceCRCPath = fileProperty.getValue() + CRC_SUFFIX;
String resourceCRCWholePath = storageOperate.getResourceFileName(context.getTenantCode(), resourceCRCPath); String resourceCRCWholePath = storageOperate.getResourceFullName(context.getTenantCode(), resourceCRCPath);
String targetPath = String.format("%s/%s", context.getExecutePath(), resourceCRCPath); String targetPath = String.format("%s/%s", context.getExecutePath(), resourceCRCPath);
log.info("{} --- Remote:{} to Local:{}", "CRC file", resourceCRCWholePath, targetPath); log.info("{} --- Remote:{} to Local:{}", "CRC file", resourceCRCWholePath, targetPath);
String crcString = ""; String crcString = "";

10
dolphinscheduler-storage-plugin/dolphinscheduler-storage-api/src/main/java/org/apache/dolphinscheduler/plugin/storage/api/StorageOperate.java

@ -60,19 +60,19 @@ public interface StorageOperate {
boolean mkdir(String tenantCode, String path) throws IOException; boolean mkdir(String tenantCode, String path) throws IOException;
/** /**
* get the path of the resource file * get the path of the resource file (fullName)
* @param tenantCode * @param tenantCode
* @param fullName * @param fileName
* @return * @return
*/ */
String getResourceFileName(String tenantCode, String fullName); String getResourceFullName(String tenantCode, String fileName);
/** /**
* get the path of the resource file excluding the base path. * get the path of the resource file excluding the base path (fileName)
* @param fullName * @param fullName
* @return * @return
*/ */
String getResourceFileName(String fullName); String getResourceFileName(String tenantCode, String fullName);
/** /**
* get the path of the file * get the path of the file

7
dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/main/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperator.java

@ -122,7 +122,7 @@ public class GcsStorageOperator implements Closeable, StorageOperate {
} }
@Override @Override
public String getResourceFileName(String tenantCode, String fileName) { public String getResourceFullName(String tenantCode, String fileName) {
if (fileName.startsWith(FOLDER_SEPARATOR)) { if (fileName.startsWith(FOLDER_SEPARATOR)) {
fileName.replaceFirst(FOLDER_SEPARATOR, EMPTY_STRING); fileName.replaceFirst(FOLDER_SEPARATOR, EMPTY_STRING);
} }
@ -130,8 +130,9 @@ public class GcsStorageOperator implements Closeable, StorageOperate {
} }
@Override @Override
public String getResourceFileName(String fullName) { public String getResourceFileName(String tenantCode, String fullName) {
return null; String resDir = getResDir(tenantCode);
return fullName.replaceFirst(resDir, "");
} }
@Override @Override

13
dolphinscheduler-storage-plugin/dolphinscheduler-storage-gcs/src/test/java/org/apache/dolphinscheduler/plugin/storage/gcs/GcsStorageOperatorTest.java

@ -146,11 +146,20 @@ public class GcsStorageOperatorTest {
Assertions.assertTrue(isSuccess); Assertions.assertTrue(isSuccess);
} }
@Test
public void getResourceFullName() {
final String expectedResourceFullName =
String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK);
final String resourceFullName = gcsStorageOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK);
Assertions.assertEquals(expectedResourceFullName, resourceFullName);
}
@Test @Test
public void getResourceFileName() { public void getResourceFileName() {
final String expectedResourceFileName = final String expectedResourceFileName = FILE_NAME_MOCK;
final String resourceFullName =
String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK);
final String resourceFileName = gcsStorageOperator.getResourceFileName(TENANT_CODE_MOCK, FILE_NAME_MOCK); final String resourceFileName = gcsStorageOperator.getResourceFileName(TENANT_CODE_MOCK, resourceFullName);
Assertions.assertEquals(expectedResourceFileName, resourceFileName); Assertions.assertEquals(expectedResourceFileName, resourceFileName);
} }

24
dolphinscheduler-storage-plugin/dolphinscheduler-storage-hdfs/src/main/java/org/apache/dolphinscheduler/plugin/storage/hdfs/HdfsStorageOperator.java

@ -56,7 +56,6 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.LinkedList; import java.util.LinkedList;
@ -69,7 +68,6 @@ import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Joiner;
import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache; import com.google.common.cache.LoadingCache;
@ -298,28 +296,14 @@ public class HdfsStorageOperator implements Closeable, StorageOperate {
} }
@Override @Override
public String getResourceFileName(String tenantCode, String fullName) { public String getResourceFullName(String tenantCode, String fullName) {
return getHdfsResourceFileName(tenantCode, fullName); return getHdfsResourceFileName(tenantCode, fullName);
} }
@Override @Override
public String getResourceFileName(String fullName) { public String getResourceFileName(String tenantCode, String fullName) {
// here is a quick fix here to get fileName. We get the resource upload path and String resDir = getResDir(tenantCode);
// get the index of the first appearance of resource upload path. The index is put return fullName.replaceFirst(resDir, "");
// in the start index of the substring function and get the result substring containing
// tenantcode and "resource" directory and the fileName.
// Then we split the result substring
// with "/" and join all elements except the first two elements because they are
// tenantCode and "resource" directory.
String resourceUploadPath =
RESOURCE_UPLOAD_PATH.endsWith(FOLDER_SEPARATOR) ? StringUtils.chop(RESOURCE_UPLOAD_PATH)
: RESOURCE_UPLOAD_PATH;
// +1 because we want to skip the "/" after resource upload path as well.
String pathContainingTenantNResource = fullName.substring(
fullName.indexOf(resourceUploadPath)
+ resourceUploadPath.length() + 1);
String[] fileNameArr = pathContainingTenantNResource.split(FOLDER_SEPARATOR);
return Joiner.on(FOLDER_SEPARATOR).join(Arrays.stream(fileNameArr).skip(2).collect(Collectors.toList()));
} }
@Override @Override

7
dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/main/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperator.java

@ -175,7 +175,7 @@ public class OssStorageOperator implements Closeable, StorageOperate {
} }
@Override @Override
public String getResourceFileName(String tenantCode, String fileName) { public String getResourceFullName(String tenantCode, String fileName) {
if (fileName.startsWith(FOLDER_SEPARATOR)) { if (fileName.startsWith(FOLDER_SEPARATOR)) {
fileName = fileName.replaceFirst(FOLDER_SEPARATOR, ""); fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
} }
@ -183,8 +183,9 @@ public class OssStorageOperator implements Closeable, StorageOperate {
} }
@Override @Override
public String getResourceFileName(String fullName) { public String getResourceFileName(String tenantCode, String fullName) {
return null; String resDir = getResDir(tenantCode);
return fullName.replaceFirst(resDir, "");
} }
@Override @Override

13
dolphinscheduler-storage-plugin/dolphinscheduler-storage-oss/src/test/java/org/apache/dolphinscheduler/plugin/storage/oss/OssStorageOperatorTest.java

@ -158,11 +158,20 @@ public class OssStorageOperatorTest {
Assertions.assertTrue(isSuccess); Assertions.assertTrue(isSuccess);
} }
@Test
public void getResourceFullName() {
final String expectedResourceFullName =
String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK);
final String resourceFullName = ossOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK);
Assertions.assertEquals(expectedResourceFullName, resourceFullName);
}
@Test @Test
public void getResourceFileName() { public void getResourceFileName() {
final String expectedResourceFileName = final String expectedResourceFileName = FILE_NAME_MOCK;
final String resourceFullName =
String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK);
final String resourceFileName = ossOperator.getResourceFileName(TENANT_CODE_MOCK, FILE_NAME_MOCK); final String resourceFileName = ossOperator.getResourceFileName(TENANT_CODE_MOCK, resourceFullName);
Assertions.assertEquals(expectedResourceFileName, resourceFileName); Assertions.assertEquals(expectedResourceFileName, resourceFileName);
} }

24
dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/main/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperator.java

@ -46,7 +46,6 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -76,7 +75,6 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.transfer.MultipleFileDownload; import com.amazonaws.services.s3.transfer.MultipleFileDownload;
import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder; import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.google.common.base.Joiner;
@Slf4j @Slf4j
@Data @Data
@ -182,7 +180,7 @@ public class S3StorageOperator implements Closeable, StorageOperate {
} }
@Override @Override
public String getResourceFileName(String tenantCode, String fileName) { public String getResourceFullName(String tenantCode, String fileName) {
if (fileName.startsWith(FOLDER_SEPARATOR)) { if (fileName.startsWith(FOLDER_SEPARATOR)) {
fileName = fileName.replaceFirst(FOLDER_SEPARATOR, ""); fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
} }
@ -190,23 +188,9 @@ public class S3StorageOperator implements Closeable, StorageOperate {
} }
@Override @Override
public String getResourceFileName(String fullName) { public String getResourceFileName(String tenantCode, String fullName) {
// here is a quick fix here to get fileName. We get the resource upload path and String resDir = getResDir(tenantCode);
// get the index of the first appearance of resource upload path. The index is put return fullName.replaceFirst(resDir, "");
// in the start index of the substring function and get the result substring containing
// tenantcode and "resource" directory and the fileName.
// Then we split the result substring
// with "/" and join all elements except the first two elements because they are
// tenantCode and "resource" directory.
String resourceUploadPath =
RESOURCE_UPLOAD_PATH.endsWith(FOLDER_SEPARATOR) ? StringUtils.chop(RESOURCE_UPLOAD_PATH)
: RESOURCE_UPLOAD_PATH;
// +1 because we want to skip the "/" after resource upload path as well.
String pathContainingTenantNResource = fullName.substring(
fullName.indexOf(resourceUploadPath)
+ resourceUploadPath.length() + 1);
String[] fileNameArr = pathContainingTenantNResource.split(FOLDER_SEPARATOR);
return Joiner.on(FOLDER_SEPARATOR).join(Arrays.stream(fileNameArr).skip(2).collect(Collectors.toList()));
} }
@Override @Override

13
dolphinscheduler-storage-plugin/dolphinscheduler-storage-s3/src/test/java/org/apache/dolphinscheduler/plugin/storage/s3/S3StorageOperatorTest.java

@ -164,11 +164,20 @@ public class S3StorageOperatorTest {
Assertions.assertTrue(isSuccess); Assertions.assertTrue(isSuccess);
} }
@Test
public void getResourceFullName() {
final String expectedResourceFullName =
String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK);
final String resourceFullName = s3StorageOperator.getResourceFullName(TENANT_CODE_MOCK, FILE_NAME_MOCK);
Assertions.assertEquals(expectedResourceFullName, resourceFullName);
}
@Test @Test
public void getResourceFileName() { public void getResourceFileName() {
final String expectedResourceFileName = final String expectedResourceFileName = FILE_NAME_MOCK;
final String resourceFullName =
String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK); String.format("dolphinscheduler/%s/resources/%s", TENANT_CODE_MOCK, FILE_NAME_MOCK);
final String resourceFileName = s3StorageOperator.getResourceFileName(TENANT_CODE_MOCK, FILE_NAME_MOCK); final String resourceFileName = s3StorageOperator.getResourceFileName(TENANT_CODE_MOCK, resourceFullName);
Assertions.assertEquals(expectedResourceFileName, resourceFileName); Assertions.assertEquals(expectedResourceFileName, resourceFileName);
} }

14
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskExecutionCheckerUtils.java

@ -109,16 +109,19 @@ public class TaskExecutionCheckerUtils {
public static void downloadResourcesIfNeeded(StorageOperate storageOperate, public static void downloadResourcesIfNeeded(StorageOperate storageOperate,
TaskExecutionContext taskExecutionContext) { TaskExecutionContext taskExecutionContext) {
String execLocalPath = taskExecutionContext.getExecutePath(); String execLocalPath = taskExecutionContext.getExecutePath();
String tenant = taskExecutionContext.getTenantCode();
Map<String, String> projectRes = taskExecutionContext.getResources(); Map<String, String> projectRes = taskExecutionContext.getResources();
if (MapUtils.isEmpty(projectRes)) { if (MapUtils.isEmpty(projectRes)) {
return; return;
} }
List<Pair<String, String>> downloadFiles = new ArrayList<>(); List<Pair<String, String>> downloadFiles = new ArrayList<>();
projectRes.forEach((key, value) -> { projectRes.keySet().forEach(fullName -> {
File resFile = new File(execLocalPath, key); String fileName = storageOperate.getResourceFileName(tenant, fullName);
projectRes.put(fullName, fileName);
File resFile = new File(execLocalPath, fileName);
boolean notExist = !resFile.exists(); boolean notExist = !resFile.exists();
if (notExist) { if (notExist) {
downloadFiles.add(Pair.of(key, value)); downloadFiles.add(Pair.of(fullName, fileName));
} else { } else {
log.info("file : {} exists ", resFile.getName()); log.info("file : {} exists ", resFile.getName());
} }
@ -131,10 +134,7 @@ public class TaskExecutionCheckerUtils {
for (Pair<String, String> fileDownload : downloadFiles) { for (Pair<String, String> fileDownload : downloadFiles) {
try { try {
String fullName = fileDownload.getLeft(); String fullName = fileDownload.getLeft();
// we do not actually get & need tenantCode with this implementation right now. String fileName = fileDownload.getRight();
String tenantCode = fileDownload.getRight();
// TODO: Need a better way to get fileName because this implementation is tricky.
String fileName = storageOperate.getResourceFileName(fullName);
log.info("get resource file from path:{}", fullName); log.info("get resource file from path:{}", fullName);
long resourceDownloadStartTime = System.currentTimeMillis(); long resourceDownloadStartTime = System.currentTimeMillis();

6
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java

@ -101,9 +101,9 @@ public class TaskFilesTransferUtils {
try { try {
// upload file to storage // upload file to storage
String resourceWholePath = String resourceWholePath =
storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath); storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath);
String resourceCRCWholePath = String resourceCRCWholePath =
storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourceCRCPath); storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourceCRCPath);
log.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath); log.info("{} --- Local:{} to Remote:{}", property, srcPath, resourceWholePath);
storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true); storageOperate.upload(taskExecutionContext.getTenantCode(), srcPath, resourceWholePath, false, true);
log.info("{} --- Local:{} to Remote:{}", "CRC file", srcCRCPath, resourceCRCWholePath); log.info("{} --- Local:{} to Remote:{}", "CRC file", srcCRCPath, resourceCRCWholePath);
@ -176,7 +176,7 @@ public class TaskFilesTransferUtils {
try { try {
String resourceWholePath = String resourceWholePath =
storageOperate.getResourceFileName(taskExecutionContext.getTenantCode(), resourcePath); storageOperate.getResourceFullName(taskExecutionContext.getTenantCode(), resourcePath);
log.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath); log.info("{} --- Remote:{} to Local:{}", property, resourceWholePath, downloadPath);
storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false, storageOperate.download(taskExecutionContext.getTenantCode(), resourceWholePath, downloadPath, false,
true); true);

Loading…
Cancel
Save