diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java index 98d7bfae36..d7439053ce 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java @@ -66,16 +66,7 @@ import org.springframework.web.multipart.MultipartFile; import java.io.IOException; import java.rmi.ServerException; import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.regex.Matcher; import java.util.stream.Collectors; @@ -327,6 +318,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe return result; } + if (!PropertyUtils.getResUploadStartupState()){ + putMsg(result, Status.STORAGE_NOT_STARTUP); + return result; + } + if (resource.isDirectory() && storageOperate.returnStorageType().equals(ResUploadType.S3) && !resource.getFileName().equals(name)) { putMsg(result, Status.S3_CANNOT_RENAME); return result; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java index 9a7ea53fe0..14a74d69f0 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java @@ -25,7 +25,9 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; -import static org.apache.dolphinscheduler.common.Constants.*; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE; +import static org.apache.dolphinscheduler.common.Constants.STORAGE_HDFS; +import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3; /** diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/StorageOperateNoConfiguredException.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/StorageOperateNoConfiguredException.java new file mode 100644 index 0000000000..3aaf7980d0 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/StorageOperateNoConfiguredException.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.exception; + +/** + * exception for store + */ +public class StorageOperateNoConfiguredException extends RuntimeException { + + public StorageOperateNoConfiguredException() { + } + + public StorageOperateNoConfiguredException(String message) { + super(message); + } + + public StorageOperateNoConfiguredException(String message, Throwable cause) { + super(message, cause); + } + + public StorageOperateNoConfiguredException(Throwable cause) { + super(cause); + } + + public StorageOperateNoConfiguredException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java deleted file mode 100644 index 184a264899..0000000000 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.common.storage; - -import org.apache.dolphinscheduler.common.enums.ResUploadType; - -import java.util.EnumMap; -import java.util.Objects; -import java.util.ServiceLoader; -/** - * @author Storage Operate Manager - */ -public class StorageOperateManager { - - public static final EnumMap OPERATE_MAP = new EnumMap<>(ResUploadType.class); - - static { - ServiceLoader load = ServiceLoader.load(StorageOperate.class); - for (StorageOperate storageOperate : load) { - OPERATE_MAP.put(storageOperate.returnStorageType(), storageOperate); - } - } - - public static StorageOperate getStorageOperate(ResUploadType resUploadType) { - if (Objects.isNull(resUploadType)){ - resUploadType = ResUploadType.HDFS; - } - StorageOperate storageOperate = OPERATE_MAP.get(resUploadType); - if (Objects.isNull(storageOperate)){ - storageOperate = OPERATE_MAP.get(ResUploadType.HDFS); - } - return storageOperate; - } -} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index 8ac51c67ba..ec4286e8a1 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -30,14 +30,16 @@ import org.apache.dolphinscheduler.common.storage.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.client.cli.RMAdminCLI; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; import java.io.*; import java.nio.charset.StandardCharsets; @@ -50,13 +52,15 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.dolphinscheduler.common.Constants.*; +import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR; +import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF; /** * hadoop utils * single instance */ -@Component public class HadoopUtils implements Closeable, StorageOperate { private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java index 8c316c158e..d253392cdf 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java @@ -69,7 +69,7 @@ public class PropertyUtils { */ public static boolean getResUploadStartupState() { String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE); - ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); + ResUploadType resUploadType = ResUploadType.valueOf(StringUtils.isEmpty(resUploadStartupType) ? ResUploadType.NONE.name() : resUploadStartupType); return resUploadType != ResUploadType.NONE; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java index 80fb464661..13e17c3bba 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java @@ -24,7 +24,11 @@ import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.*; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.transfer.MultipleFileDownload; import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerBuilder; @@ -36,17 +40,30 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.jets3t.service.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; -import java.io.*; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.dolphinscheduler.common.Constants.*; +import static org.apache.dolphinscheduler.common.Constants.AWS_END_POINT; +import static org.apache.dolphinscheduler.common.Constants.BUCKET_NAME; +import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR; +import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF; +import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3; -@Component public class S3Utils implements Closeable, StorageOperate { private static final Logger logger = LoggerFactory.getLogger(S3Utils.class); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/exception/ExceptionTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/exception/ExceptionTest.java new file mode 100644 index 0000000000..6a670c9b61 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/exception/ExceptionTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.exception; + + +import org.junit.Assert; +import org.junit.Test; + +public class ExceptionTest { + + @Test + public void testException(){ + final String message = "Test"; + RuntimeException time = new RuntimeException(message); + + Assert.assertNull(new BaseException().getMessage()); + Assert.assertNotNull(new BaseException(message).getMessage()); + Assert.assertNotNull(new BaseException(message, time).getMessage()); + Assert.assertNotNull(new BaseException(time).getCause()); + Assert.assertNotNull(new BaseException(message, time, false, false).getMessage()); + + Assert.assertNull(new StorageOperateNoConfiguredException().getMessage()); + Assert.assertNotNull(new StorageOperateNoConfiguredException(message).getMessage()); + Assert.assertNotNull(new StorageOperateNoConfiguredException(message, time).getMessage()); + Assert.assertNotNull(new StorageOperateNoConfiguredException(time).getCause()); + Assert.assertNotNull(new StorageOperateNoConfiguredException(message, time, false, false).getMessage()); + } +} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java deleted file mode 100644 index 05f2e45a73..0000000000 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dolphinscheduler.common.storage; - -import org.apache.dolphinscheduler.common.enums.ResUploadType; -import org.apache.dolphinscheduler.common.utils.HadoopUtils; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.MockitoJUnitRunner; - -import java.util.EnumMap; - -/** - * @author StorageOperateManagerTest - */ -@RunWith(MockitoJUnitRunner.class) -public class StorageOperateManagerTest { - - @Mock - private HadoopUtils hadoopUtils; - - @Test - public void testManager() { - StorageOperateManager mock = Mockito.mock(StorageOperateManager.class); - Assert.assertNotNull(mock); - - EnumMap storageOperateMap = StorageOperateManager.OPERATE_MAP; - storageOperateMap.put(ResUploadType.HDFS, hadoopUtils); - - StorageOperate storageOperate = StorageOperateManager.getStorageOperate(ResUploadType.HDFS); - Assert.assertNotNull(storageOperate); - } -} diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java index 14279b33d5..df8050af41 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java @@ -18,6 +18,8 @@ package org.apache.dolphinscheduler.common.utils; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.spi.enums.ResUploadType; +import org.junit.Assert; import org.junit.Test; import static org.junit.Assert.assertNotNull; @@ -28,4 +30,9 @@ public class PropertyUtilsTest { public void getString() { assertNotNull(PropertyUtils.getString(Constants.FS_DEFAULT_FS)); } + + @Test + public void getResUploadStartupState(){ + Assert.assertFalse(PropertyUtils.getResUploadStartupState()); + } } \ No newline at end of file diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index 0d2e4880aa..a376587aad 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -18,8 +18,6 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ResUploadType; -import org.apache.dolphinscheduler.common.storage.StorageOperateManager; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; @@ -163,7 +161,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { } // submit task to manager - boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager, StorageOperateManager.getStorageOperate(ResUploadType.HDFS))); + boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager)); if (!offer) { logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}", workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId()); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index b58675b899..cc15eb62f4 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -19,13 +19,12 @@ package org.apache.dolphinscheduler.server.worker.runner; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; +import org.apache.commons.lang3.tuple.Pair; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException; import org.apache.dolphinscheduler.common.storage.StorageOperate; -import org.apache.dolphinscheduler.common.utils.CommonUtils; -import org.apache.dolphinscheduler.common.utils.DateUtils; -import org.apache.dolphinscheduler.common.utils.JSONUtils; -import org.apache.dolphinscheduler.common.utils.LoggerUtils; +import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -44,11 +43,7 @@ import org.apache.commons.lang.StringUtils; import java.io.File; import java.io.IOException; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -115,13 +110,11 @@ public class TaskExecuteThread implements Runnable, Delayed { public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, AlertClientService alertClientService, - TaskPluginManager taskPluginManager, - StorageOperate storageOperate) { + TaskPluginManager taskPluginManager) { this.taskExecutionContext = taskExecutionContext; this.taskCallbackService = taskCallbackService; this.alertClientService = alertClientService; this.taskPluginManager = taskPluginManager; - this.storageOperate = storageOperate; } @Override @@ -147,7 +140,10 @@ public class TaskExecuteThread implements Runnable, Delayed { taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext); // copy hdfs/minio file to local - downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger); + List> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources()); + if (!fileDownloads.isEmpty()){ + downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads); + } taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); taskExecutionContext.setDefinedParams(getGlobalParamsMap()); @@ -277,34 +273,49 @@ public class TaskExecuteThread implements Runnable, Delayed { * download resource file * * @param execLocalPath execLocalPath - * @param projectRes projectRes + * @param fileDownloads projectRes * @param logger logger */ - private void downloadResource(String execLocalPath, Map projectRes, Logger logger) { - if (MapUtils.isEmpty(projectRes)) { - return; + public void downloadResource(String execLocalPath, Logger logger, List> fileDownloads) { + for (Pair fileDownload : fileDownloads) { + try { + // query the tenant code of the resource according to the name of the resource + String fullName = fileDownload.getLeft(); + String tenantCode = fileDownload.getRight(); + String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName); + logger.info("get resource file from hdfs :{}", resHdfsPath); + storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new ServiceException(e.getMessage()); + } } + } - Set> resEntries = projectRes.entrySet(); - - for (Map.Entry resource : resEntries) { - String fullName = resource.getKey(); - String tenantCode = resource.getValue(); - File resFile = new File(execLocalPath, fullName); - if (!resFile.exists()) { - try { - // query the tenant code of the resource according to the name of the resource - String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName); - logger.info("get resource file from hdfs :{}", resHdfsPath); - storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true); - } catch (Exception e) { - logger.error(e.getMessage(), e); - throw new ServiceException(e.getMessage()); - } - } else { + /** + * download resource check + * @param execLocalPath + * @param projectRes + * @return + */ + public List> downloadCheck(String execLocalPath, Map projectRes){ + if (MapUtils.isEmpty(projectRes)) { + return Collections.emptyList(); + } + List> downloadFile = new ArrayList<>(); + projectRes.forEach((key, value) -> { + File resFile = new File(execLocalPath, key); + boolean notExist = !resFile.exists(); + if (notExist){ + downloadFile.add(Pair.of(key, value)); + } else{ logger.info("file : {} exists ", resFile.getName()); } + }); + if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()){ + throw new StorageOperateNoConfiguredException("Storage service config does not exist!"); } + return downloadFile; } /** diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java new file mode 100644 index 0000000000..f847690a6c --- /dev/null +++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; +import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest; +import org.apache.dolphinscheduler.service.alert.AlertClientService; +import org.apache.dolphinscheduler.service.task.TaskPluginManager; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.powermock.modules.junit4.PowerMockRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@RunWith(PowerMockRunner.class) +public class TaskExecuteThreadTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class); + + @Mock + private TaskExecutionContext taskExecutionContext; + + @Mock + private TaskCallbackService taskCallbackService; + + @Mock + private AlertClientService alertClientService; + + @Mock + private TaskPluginManager taskPluginManager; + + @Test + public void checkTest(){ + TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager); + + String path = "/"; + Map projectRes = new HashMap<>(); + projectRes.put("shell", "shell.sh"); + List> downloads = new ArrayList<>(); + try{ + downloads = taskExecuteThread.downloadCheck(path, projectRes); + }catch (Exception e){ + Assert.assertNotNull(e); + } + downloads.add(Pair.of("shell", "shell.sh")); + try{ + taskExecuteThread.downloadResource(path, LOGGER, downloads); + }catch (Exception e){ + + } + } +}