Browse Source

[Improvement-9609][Worker]The resource download method is selected according to the configurati… (#9636)

* The resource download method is selected according to the configuration and the service startup verification is added.

* common check CI fix

* Startup check changed to running check

* code smell

* Coordinate resources to increase test coverage.

* Split resource download method.

* Unit Test Coverage

Co-authored-by: WangJPLeo <wangjipeng@whaleops.com>
3.0.0/version-upgrade
WangJPLeo 3 years ago committed by GitHub
parent
commit
996790ce9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java
  2. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java
  3. 43
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/StorageOperateNoConfiguredException.java
  4. 48
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java
  5. 12
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  6. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
  7. 27
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java
  8. 43
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/exception/ExceptionTest.java
  9. 50
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java
  10. 7
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java
  11. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  12. 63
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
  13. 76
      dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

16
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ResourcesServiceImpl.java

@ -66,16 +66,7 @@ import org.springframework.web.multipart.MultipartFile;
import java.io.IOException; import java.io.IOException;
import java.rmi.ServerException; import java.rmi.ServerException;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.ArrayList; import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -327,6 +318,11 @@ public class ResourcesServiceImpl extends BaseServiceImpl implements ResourcesSe
return result; return result;
} }
if (!PropertyUtils.getResUploadStartupState()){
putMsg(result, Status.STORAGE_NOT_STARTUP);
return result;
}
if (resource.isDirectory() && storageOperate.returnStorageType().equals(ResUploadType.S3) && !resource.getFileName().equals(name)) { if (resource.isDirectory() && storageOperate.returnStorageType().equals(ResUploadType.S3) && !resource.getFileName().equals(name)) {
putMsg(result, Status.S3_CANNOT_RENAME); putMsg(result, Status.S3_CANNOT_RENAME);
return result; return result;

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/config/StoreConfiguration.java

@ -25,7 +25,9 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import static org.apache.dolphinscheduler.common.Constants.*; import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE;
import static org.apache.dolphinscheduler.common.Constants.STORAGE_HDFS;
import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3;
/** /**

43
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/exception/StorageOperateNoConfiguredException.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.exception;
/**
* exception for store
*/
public class StorageOperateNoConfiguredException extends RuntimeException {
public StorageOperateNoConfiguredException() {
}
public StorageOperateNoConfiguredException(String message) {
super(message);
}
public StorageOperateNoConfiguredException(String message, Throwable cause) {
super(message, cause);
}
public StorageOperateNoConfiguredException(Throwable cause) {
super(cause);
}
public StorageOperateNoConfiguredException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

48
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.storage;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import java.util.EnumMap;
import java.util.Objects;
import java.util.ServiceLoader;
/**
* @author Storage Operate Manager
*/
public class StorageOperateManager {
public static final EnumMap<ResUploadType, StorageOperate> OPERATE_MAP = new EnumMap<>(ResUploadType.class);
static {
ServiceLoader<StorageOperate> load = ServiceLoader.load(StorageOperate.class);
for (StorageOperate storageOperate : load) {
OPERATE_MAP.put(storageOperate.returnStorageType(), storageOperate);
}
}
public static StorageOperate getStorageOperate(ResUploadType resUploadType) {
if (Objects.isNull(resUploadType)){
resUploadType = ResUploadType.HDFS;
}
StorageOperate storageOperate = OPERATE_MAP.get(resUploadType);
if (Objects.isNull(storageOperate)){
storageOperate = OPERATE_MAP.get(ResUploadType.HDFS);
}
return storageOperate;
}
}

12
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -30,14 +30,16 @@ import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI; import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.*; import java.io.*;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -50,13 +52,15 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.*; import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
/** /**
* hadoop utils * hadoop utils
* single instance * single instance
*/ */
@Component
public class HadoopUtils implements Closeable, StorageOperate { public class HadoopUtils implements Closeable, StorageOperate {
private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class); private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java

@ -69,7 +69,7 @@ public class PropertyUtils {
*/ */
public static boolean getResUploadStartupState() { public static boolean getResUploadStartupState() {
String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE); String resUploadStartupType = PropertyUtils.getUpperCaseString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType); ResUploadType resUploadType = ResUploadType.valueOf(StringUtils.isEmpty(resUploadStartupType) ? ResUploadType.NONE.name() : resUploadStartupType);
return resUploadType != ResUploadType.NONE; return resUploadType != ResUploadType.NONE;
} }

27
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java

@ -24,7 +24,11 @@ import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions; import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.*; import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.transfer.MultipleFileDownload; import com.amazonaws.services.s3.transfer.MultipleFileDownload;
import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder; import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
@ -36,17 +40,30 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.jets3t.service.ServiceException; import org.jets3t.service.ServiceException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.*; import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.*; import static org.apache.dolphinscheduler.common.Constants.AWS_END_POINT;
import static org.apache.dolphinscheduler.common.Constants.BUCKET_NAME;
import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3;
@Component
public class S3Utils implements Closeable, StorageOperate { public class S3Utils implements Closeable, StorageOperate {
private static final Logger logger = LoggerFactory.getLogger(S3Utils.class); private static final Logger logger = LoggerFactory.getLogger(S3Utils.class);

43
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/exception/ExceptionTest.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.exception;
import org.junit.Assert;
import org.junit.Test;
public class ExceptionTest {
@Test
public void testException(){
final String message = "Test";
RuntimeException time = new RuntimeException(message);
Assert.assertNull(new BaseException().getMessage());
Assert.assertNotNull(new BaseException(message).getMessage());
Assert.assertNotNull(new BaseException(message, time).getMessage());
Assert.assertNotNull(new BaseException(time).getCause());
Assert.assertNotNull(new BaseException(message, time, false, false).getMessage());
Assert.assertNull(new StorageOperateNoConfiguredException().getMessage());
Assert.assertNotNull(new StorageOperateNoConfiguredException(message).getMessage());
Assert.assertNotNull(new StorageOperateNoConfiguredException(message, time).getMessage());
Assert.assertNotNull(new StorageOperateNoConfiguredException(time).getCause());
Assert.assertNotNull(new StorageOperateNoConfiguredException(message, time, false, false).getMessage());
}
}

50
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.storage;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.EnumMap;
/**
* @author StorageOperateManagerTest
*/
@RunWith(MockitoJUnitRunner.class)
public class StorageOperateManagerTest {
@Mock
private HadoopUtils hadoopUtils;
@Test
public void testManager() {
StorageOperateManager mock = Mockito.mock(StorageOperateManager.class);
Assert.assertNotNull(mock);
EnumMap<ResUploadType, StorageOperate> storageOperateMap = StorageOperateManager.OPERATE_MAP;
storageOperateMap.put(ResUploadType.HDFS, hadoopUtils);
StorageOperate storageOperate = StorageOperateManager.getStorageOperate(ResUploadType.HDFS);
Assert.assertNotNull(storageOperate);
}
}

7
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/PropertyUtilsTest.java

@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.common.utils; package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.spi.enums.ResUploadType;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
@ -28,4 +30,9 @@ public class PropertyUtilsTest {
public void getString() { public void getString() {
assertNotNull(PropertyUtils.getString(Constants.FS_DEFAULT_FS)); assertNotNull(PropertyUtils.getString(Constants.FS_DEFAULT_FS));
} }
@Test
public void getResUploadStartupState(){
Assert.assertFalse(PropertyUtils.getResUploadStartupState());
}
} }

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -18,8 +18,6 @@
package org.apache.dolphinscheduler.server.worker.processor; package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.storage.StorageOperateManager;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
@ -163,7 +161,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
} }
// submit task to manager // submit task to manager
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager, StorageOperateManager.getStorageOperate(ResUploadType.HDFS))); boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
if (!offer) { if (!offer) {
logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}", logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId()); workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());

63
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -19,13 +19,12 @@ package org.apache.dolphinscheduler.server.worker.runner;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH; import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.storage.StorageOperate; import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel; import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -44,11 +43,7 @@ import org.apache.commons.lang.StringUtils;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Date; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -115,13 +110,11 @@ public class TaskExecuteThread implements Runnable, Delayed {
public TaskExecuteThread(TaskExecutionContext taskExecutionContext, public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
TaskCallbackService taskCallbackService, TaskCallbackService taskCallbackService,
AlertClientService alertClientService, AlertClientService alertClientService,
TaskPluginManager taskPluginManager, TaskPluginManager taskPluginManager) {
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext; this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService; this.taskCallbackService = taskCallbackService;
this.alertClientService = alertClientService; this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager; this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
} }
@Override @Override
@ -147,7 +140,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext); taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
// copy hdfs/minio file to local // copy hdfs/minio file to local
downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger); List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources());
if (!fileDownloads.isEmpty()){
downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
}
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath()); taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setDefinedParams(getGlobalParamsMap()); taskExecutionContext.setDefinedParams(getGlobalParamsMap());
@ -277,23 +273,15 @@ public class TaskExecuteThread implements Runnable, Delayed {
* download resource file * download resource file
* *
* @param execLocalPath execLocalPath * @param execLocalPath execLocalPath
* @param projectRes projectRes * @param fileDownloads projectRes
* @param logger logger * @param logger logger
*/ */
private void downloadResource(String execLocalPath, Map<String, String> projectRes, Logger logger) { public void downloadResource(String execLocalPath, Logger logger, List<Pair<String, String>> fileDownloads) {
if (MapUtils.isEmpty(projectRes)) { for (Pair<String, String> fileDownload : fileDownloads) {
return;
}
Set<Map.Entry<String, String>> resEntries = projectRes.entrySet();
for (Map.Entry<String, String> resource : resEntries) {
String fullName = resource.getKey();
String tenantCode = resource.getValue();
File resFile = new File(execLocalPath, fullName);
if (!resFile.exists()) {
try { try {
// query the tenant code of the resource according to the name of the resource // query the tenant code of the resource according to the name of the resource
String fullName = fileDownload.getLeft();
String tenantCode = fileDownload.getRight();
String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName); String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName);
logger.info("get resource file from hdfs :{}", resHdfsPath); logger.info("get resource file from hdfs :{}", resHdfsPath);
storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true); storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true);
@ -301,10 +289,33 @@ public class TaskExecuteThread implements Runnable, Delayed {
logger.error(e.getMessage(), e); logger.error(e.getMessage(), e);
throw new ServiceException(e.getMessage()); throw new ServiceException(e.getMessage());
} }
}
}
/**
* download resource check
* @param execLocalPath
* @param projectRes
* @return
*/
public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes){
if (MapUtils.isEmpty(projectRes)) {
return Collections.emptyList();
}
List<Pair<String, String>> downloadFile = new ArrayList<>();
projectRes.forEach((key, value) -> {
File resFile = new File(execLocalPath, key);
boolean notExist = !resFile.exists();
if (notExist){
downloadFile.add(Pair.of(key, value));
} else{ } else{
logger.info("file : {} exists ", resFile.getName()); logger.info("file : {} exists ", resFile.getName());
} }
});
if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()){
throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
} }
return downloadFile;
} }
/** /**

76
dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThreadTest.java

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RunWith(PowerMockRunner.class)
public class TaskExecuteThreadTest {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
@Mock
private TaskExecutionContext taskExecutionContext;
@Mock
private TaskCallbackService taskCallbackService;
@Mock
private AlertClientService alertClientService;
@Mock
private TaskPluginManager taskPluginManager;
@Test
public void checkTest(){
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager);
String path = "/";
Map<String, String> projectRes = new HashMap<>();
projectRes.put("shell", "shell.sh");
List<Pair<String, String>> downloads = new ArrayList<>();
try{
downloads = taskExecuteThread.downloadCheck(path, projectRes);
}catch (Exception e){
Assert.assertNotNull(e);
}
downloads.add(Pair.of("shell", "shell.sh"));
try{
taskExecuteThread.downloadResource(path, LOGGER, downloads);
}catch (Exception e){
}
}
}
Loading…
Cancel
Save