Browse Source

cherry-pick S3 Resource center supports bucket customization (#12022)

Co-authored-by: guodong <guodongym@163.com>
3.0.1-release
Kerwin 2 years ago committed by GitHub
parent
commit
827eb60dbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      docs/docs/en/architecture/configuration.md
  2. 2
      docs/docs/en/faq.md
  3. 30
      docs/docs/en/guide/resource/configuration.md
  4. 38
      docs/docs/zh/architecture/configuration.md
  5. 2
      docs/docs/zh/faq.md
  6. 29
      docs/docs/zh/guide/resource/configuration.md
  7. 17
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  8. 38
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  9. 189
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java
  10. 34
      dolphinscheduler-common/src/main/resources/common.properties
  11. 18
      dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties
  12. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

27
docs/docs/en/architecture/configuration.md

@ -179,7 +179,32 @@ Note that DolphinScheduler also supports zookeeper related configuration through
### common.properties [hadoop、s3、yarn config properties]
Currently, common.properties mainly configures Hadoop,s3a related configurations. Configuration file location:
Currently, common.properties mainly configures Hadoop,s3a related configurations.
| Parameters | Default value | Description |
|--|--|--|
data.basedir.path | /tmp/dolphinscheduler | local directory used to store temp files
resource.storage.type | NONE | type of resource files: HDFS, S3, NONE
resource.storage.upload.base.path | /dolphinscheduler | storage path of resource files
resource.aws.access.key.id | minioadmin | access key id of S3
resource.aws.secret.access.key | minioadmin | secret access key of S3
resource.aws.region |us-east-1 | region of S3
resource.aws.s3.bucket.name | dolphinscheduler | bucket name of S3
resource.aws.s3.endpoint | http://minio:9000 | endpoint of S3
resource.hdfs.root.user | hdfs | configure users with corresponding permissions if storage type is HDFS
resource.hdfs.fs.defaultFS | hdfs://mycluster:8020 | If resource.storage.type=S3, then the request url would be similar to 's3a://dolphinscheduler'. Otherwise if resource.storage.type=HDFS and hadoop supports HA, copy core-site.xml and hdfs-site.xml into 'conf' directory
hadoop.security.authentication.startup.state | false | whether hadoop grant kerberos permission
java.security.krb5.conf.path | /opt/krb5.conf | kerberos config directory
login.user.keytab.username | hdfs-mycluster@ESZ.COM | kerberos username
login.user.keytab.path | /opt/hdfs.headless.keytab | kerberos user keytab
kerberos.expire.time | 2 | kerberos expire time,integer,the unit is hour
yarn.resourcemanager.ha.rm.ids | | specify the yarn resourcemanager url. if resourcemanager supports HA, input HA IP addresses (separated by comma), or input null for standalone
yarn.application.status.address | http://ds1:8088/ws/v1/cluster/apps/%s | keep default if ResourceManager supports HA or not use ResourceManager, or replace ds1 with corresponding hostname if ResourceManager in standalone mode
dolphinscheduler.env.path | env/dolphinscheduler_env.sh | load environment variables configs [eg: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state | false | specify whether in development state
task.resource.limit.state | false | specify whether in resource limit state
### application-api.properties [API-service log config]
|Service| Configuration file |
|--|--|

2
docs/docs/en/faq.md

@ -126,7 +126,7 @@ A: 1, if the replacement variable contains special characters, **use the \ tra
4, monitorServerState = "false", whether the service monitoring script is started, the default is not to start the service monitoring script. **If the service monitoring script is started, the master and worker services are monitored every 5 minutes, and if the machine is down, it will automatically restart.**
5, hdfsStartupSate="false", whether to enable HDFS resource upload function. The default is not enabled. **If it is not enabled, the resource center cannot be used.** If enabled, you need to configure the configuration of fs.defaultFS and yarn in conf/common/hadoop/hadoop.properties. If you use namenode HA, you need to copy core-site.xml and hdfs-site.xml to the conf root directory.
5, hdfsStartupSate="false", whether to enable HDFS resource upload function. The default is not enabled. **If it is not enabled, the resource center cannot be used.** If enabled, you need to configure the configuration of resource.hdfs.fs.defaultFS and yarn in conf/common/hadoop/hadoop.properties. If you use namenode HA, you need to copy core-site.xml and hdfs-site.xml to the conf root directory.
Note: **The 1.0.x version does not automatically create the hdfs root directory, you need to create it yourself, and you need to deploy the user with hdfs operation permission.**

30
docs/docs/en/guide/resource/configuration.md

@ -17,7 +17,11 @@ If you deploy DolphinScheduler in `Standalone` mode, you only need to configure
> NOTE: Please modify the value of `resource.storage.upload.base.path` if you do not want to use the default value as the base path.
## Use HDFS or Remote Object Storage
## HDFS Resource Configuration
When it is necessary to use the Resource Center to create or upload relevant files, all files and resources will be stored on HDFS. Therefore the following configuration is required.
## Configuring the common.properties
After version 3.0.0-alpha, if you want to upload resources to `Resource Center` connected to `HDFS` or `S3`, you need to configure `api-server/conf/common.properties` and `worker-server/conf/common.properties`.
@ -61,10 +65,26 @@ resource.aws.s3.bucket.name=dolphinscheduler
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
resource.aws.s3.endpoint=http://localhost:9000
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration,
# please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/tmp/dolphinscheduler
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=minioadmin
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=minioadmin
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=cn-north-1
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
resource.aws.s3.bucket.name=dolphinscheduler
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
resource.aws.s3.endpoint=http://localhost:9000
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
resource.hdfs.fs.defaultFS=hdfs://mycluster:8020
resource.hdfs.root.user=root
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler;
# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
resource.hdfs.fs.defaultFS=hdfs://localhost:8020
# whether to startup kerberos
hadoop.security.authentication.startup.state=false
@ -81,6 +101,8 @@ login.user.keytab.path=/opt/hdfs.headless.keytab
# kerberos expire time, the unit is hour
kerberos.expire.time=2
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088

38
docs/docs/zh/architecture/configuration.md

@ -169,19 +169,35 @@ DolphinScheduler使用Zookeeper进行集群管理、容错、事件监听等功
DolphinScheduler同样可以通过`bin/env/dolphinscheduler_env.sh`进行Zookeeper相关的配置。
## common.properties [hadoop、s3、yarn配置]
common.properties配置文件目前主要是配置hadoop/s3/yarn相关的配置,配置文件位置:
|服务名称| 配置文件 |
|--|--|
|Master Server | `master-server/conf/common.properties`|
|Api Server| `api-server/conf/common.properties`|
|Worker Server| `worker-server/conf/common.properties`|
|Alert Server| `alert-server/conf/common.properties`|
默认配置如下:
## 4.common.properties [hadoop、s3、yarn配置]
common.properties配置文件目前主要是配置hadoop/s3a相关的配置.
| 参数 | 默认值 | 描述 |
|--|--|--|
data.basedir.path | /tmp/dolphinscheduler | 本地工作目录,用于存放临时文件
resource.storage.type | NONE | 资源文件存储类型: HDFS,S3,NONE
resource.storage.upload.base.path | /dolphinscheduler | 资源文件存储路径
resource.aws.access.key.id | minioadmin | S3 access key
resource.aws.secret.access.key | minioadmin | S3 secret access key
resource.aws.region | us-east-1 | S3 区域
resource.aws.s3.bucket.name | dolphinscheduler | S3 存储桶名称
resource.aws.s3.endpoint | http://minio:9000 | s3 endpoint地址
resource.hdfs.root.user | hdfs | 如果存储类型为HDFS,需要配置拥有对应操作权限的用户
resource.hdfs.fs.defaultFS | hdfs://mycluster:8020 | 请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录
hadoop.security.authentication.startup.state | false | hadoop是否开启kerberos权限
java.security.krb5.conf.path | /opt/krb5.conf | kerberos配置目录
login.user.keytab.username | hdfs-mycluster@ESZ.COM | kerberos登录用户
login.user.keytab.path | /opt/hdfs.headless.keytab | kerberos登录用户keytab
kerberos.expire.time | 2 | kerberos过期时间,整数,单位为小时
yarn.resourcemanager.ha.rm.ids | | yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可
yarn.application.status.address | http://ds1:8088/ws/v1/cluster/apps/%s | 如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
dolphinscheduler.env.path | env/dolphinscheduler_env.sh | 运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state | false | 是否处于开发模式
task.resource.limit.state | false | 是否启用资源限制模式
## 5.application-api.properties [API服务配置]
|参数 |默认值| 描述|
|--|--|--|
|data.basedir.path | /tmp/dolphinscheduler | 本地工作目录,用于存放临时文件|
|resource.storage.type | NONE | 资源文件存储类型: HDFS,S3,NONE|
|resource.upload.path | /dolphinscheduler | 资源文件存储路径|

2
docs/docs/zh/faq.md

@ -118,7 +118,7 @@ A: 1,如果替换变量中包含特殊字符,**请用 \ 转移符进
4,monitorServerState="false",服务监控脚本是否启动,默认是不启动服务监控脚本的。**如果启动服务监控脚本,则每 5 分钟定时来监控 master 和 worker 的服务是否 down 机,如果 down 机则会自动重启**
5,hdfsStartupSate="false",是否开启 HDFS 资源上传功能。默认是不开启的,**如果不开启则资源中心是不能使用的**。如果开启,需要 conf/common/hadoop/hadoop.properties 中配置 fs.defaultFS 和 yarn 的相关配置,如果使用 namenode HA,需要将 core-site.xml 和 hdfs-site.xml 复制到conf根目录下
5,hdfsStartupSate="false",是否开启 HDFS 资源上传功能。默认是不开启的,**如果不开启则资源中心是不能使用的**。如果开启,需要 conf/common/hadoop/hadoop.properties 中配置 resource.hdfs.fs.defaultFS 和 yarn 的相关配置,如果使用 namenode HA,需要将 core-site.xml 和 hdfs-site.xml 复制到conf根目录下
注意:**1.0.x 版本是不会自动创建 hdfs 根目录的,需要自行创建,并且需要部署用户有hdfs的操作权限**

29
docs/docs/zh/guide/resource/configuration.md

@ -51,7 +51,24 @@ resource.storage.type=HDFS
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration,
# please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.upload.path=/tmp/dolphinscheduler
resource.storage.upload.base.path=/tmp/dolphinscheduler
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=minioadmin
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=minioadmin
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=cn-north-1
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
resource.aws.s3.bucket.name=dolphinscheduler
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
resource.aws.s3.endpoint=http://localhost:9000
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=root
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler;
# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
resource.hdfs.fs.defaultFS=hdfs://localhost:8020
# whether to startup kerberos
hadoop.security.authentication.startup.state=false
@ -69,15 +86,7 @@ login.user.keytab.path=/opt/hdfs.headless.keytab
kerberos.expire.time=2
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
hdfs.root.user=root
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler;
# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=hdfs://localhost:8020
aws.access.key.id=minioadmin
aws.secret.access.key=minioadmin
aws.region=us-east-1
aws.endpoint=http://localhost:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty

17
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -63,14 +63,12 @@ public final class Constants {
public static final String STORAGE_HDFS = "HDFS";
public static final String BUCKET_NAME = "dolphinscheduler-test";
public static final String EMPTY_STRING = "";
/**
* fs.defaultFS
* resource.hdfs.fs.defaultFS
*/
public static final String FS_DEFAULT_FS = "fs.defaultFS";
public static final String FS_DEFAULT_FS = "resource.hdfs.fs.defaultFS";
/**
@ -103,15 +101,15 @@ public final class Constants {
/**
* hdfs configuration
* hdfs.root.user
* resource.hdfs.root.user
*/
public static final String HDFS_ROOT_USER = "hdfs.root.user";
public static final String HDFS_ROOT_USER = "resource.hdfs.root.user";
/**
* hdfs/s3 configuration
* resource.upload.path
* resource.storage.upload.base.path
*/
public static final String RESOURCE_UPLOAD_PATH = "resource.upload.path";
public static final String RESOURCE_UPLOAD_PATH = "resource.storage.upload.base.path";
/**
* data basedir path
@ -155,7 +153,8 @@ public final class Constants {
*/
public static final String RESOURCE_STORAGE_TYPE = "resource.storage.type";
public static final String AWS_END_POINT = "aws.endpoint";
public static final String AWS_S3_BUCKET_NAME = "resource.aws.s3.bucket.name";
public static final String AWS_END_POINT = "resource.aws.s3.endpoint";
/**
* comma ,
*/

38
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -17,11 +17,11 @@
package org.apache.dolphinscheduler.common.utils;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.commons.io.IOUtils;
import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
@ -29,6 +29,8 @@ import org.apache.dolphinscheduler.common.exception.BaseException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@ -37,10 +39,31 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
@ -152,10 +175,9 @@ public class HadoopUtils implements Closeable, StorageOperate {
return true;
});
} else {
logger.warn("hdfs.root.user is not set value!");
logger.warn("resource.hdfs.root.user is not set value!");
fs = FileSystem.get(configuration);
}
//
} catch (Exception e) {
logger.error(e.getMessage(), e);
@ -278,7 +300,7 @@ public class HadoopUtils implements Closeable, StorageOperate {
* @throws IOException errors
*/
@Override
public boolean mkdir(String bucketName, String hdfsPath) throws IOException {
public boolean mkdir(String tenantCode, String hdfsPath) throws IOException {
return fs.mkdirs(new Path(hdfsPath));
}

189
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java

@ -33,13 +33,19 @@ import com.amazonaws.services.s3.transfer.MultipleFileDownload;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import org.apache.commons.lang.StringUtils;
import static org.apache.dolphinscheduler.common.Constants.AWS_END_POINT;
import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.jets3t.service.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
@ -56,14 +62,25 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.AWS_END_POINT;
import static org.apache.dolphinscheduler.common.Constants.BUCKET_NAME;
import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.transfer.MultipleFileDownload;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
public class S3Utils implements Closeable, StorageOperate {
@ -75,6 +92,8 @@ public class S3Utils implements Closeable, StorageOperate {
public static final String REGION = PropertyUtils.getString(TaskConstants.AWS_REGION);
public static final String BUCKET_NAME = PropertyUtils.getString(Constants.AWS_S3_BUCKET_NAME);
private AmazonS3 s3Client = null;
@ -83,19 +102,19 @@ public class S3Utils implements Closeable, StorageOperate {
if (!StringUtils.isEmpty(PropertyUtils.getString(AWS_END_POINT))) {
s3Client = AmazonS3ClientBuilder
.standard()
.withPathStyleAccessEnabled(true)
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(PropertyUtils.getString(AWS_END_POINT), Regions.fromName(REGION).getName()))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
.build();
.standard()
.withPathStyleAccessEnabled(true)
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(PropertyUtils.getString(AWS_END_POINT), Regions.fromName(REGION).getName()))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
.build();
} else {
s3Client = AmazonS3ClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
.withRegion(Regions.fromName(REGION))
.build();
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
.withRegion(Regions.fromName(REGION))
.build();
}
checkBucketNameIfNotPresent(BUCKET_NAME);
checkBucketNameExists(BUCKET_NAME);
}
}
@ -126,25 +145,32 @@ public class S3Utils implements Closeable, StorageOperate {
}
@Override
public void createTenantDirIfNotExists(String tenantCode) throws ServiceException {
createFolder(tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_UDF);
createFolder(tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_FILE);
public void createTenantDirIfNotExists(String tenantCode) throws Exception {
getInstance().mkdir(tenantCode, getS3ResDir(tenantCode));
getInstance().mkdir(tenantCode, getS3UdfDir(tenantCode));
}
@Override
public String getResDir(String tenantCode) {
return tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_FILE+FOLDER_SEPARATOR;
return getS3ResDir(tenantCode) + FOLDER_SEPARATOR;
}
@Override
public String getUdfDir(String tenantCode) {
return tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_UDF+FOLDER_SEPARATOR;
return getS3UdfDir(tenantCode) + FOLDER_SEPARATOR;
}
@Override
public boolean mkdir(String tenantCode, String path) throws IOException {
createFolder(path);
return true;
String objectName = path + FOLDER_SEPARATOR;
if (!s3Client.doesObjectExist(BUCKET_NAME, objectName)) {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
InputStream emptyContent = new ByteArrayInputStream(new byte[0]);
PutObjectRequest putObjectRequest = new PutObjectRequest(BUCKET_NAME, objectName, emptyContent, metadata);
s3Client.putObject(putObjectRequest);
}
return true;
}
@Override
@ -152,14 +178,15 @@ public class S3Utils implements Closeable, StorageOperate {
if (fileName.startsWith(FOLDER_SEPARATOR)) {
fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
}
return String.format(FORMAT_S_S, tenantCode+FOLDER_SEPARATOR+RESOURCE_TYPE_FILE, fileName);
return String.format(FORMAT_S_S, getS3ResDir(tenantCode), fileName);
}
@Override
public String getFileName(ResourceType resourceType, String tenantCode, String fileName) {
if (fileName.startsWith(FOLDER_SEPARATOR)) {
fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
}
return getDir(resourceType, tenantCode)+fileName;
return getDir(resourceType, tenantCode) + fileName;
}
@Override
@ -172,7 +199,7 @@ public class S3Utils implements Closeable, StorageOperate {
}
S3Object o = s3Client.getObject(BUCKET_NAME, srcFilePath);
try (S3ObjectInputStream s3is = o.getObjectContent();
FileOutputStream fos = new FileOutputStream(dstFilePath)) {
FileOutputStream fos = new FileOutputStream(dstFile)) {
byte[] readBuf = new byte[1024];
int readLen;
while ((readLen = s3is.read(readBuf)) > 0) {
@ -217,7 +244,7 @@ public class S3Utils implements Closeable, StorageOperate {
case FILE:
return getResDir(tenantCode);
default:
return tenantCode+ FOLDER_SEPARATOR ;
return "";
}
}
@ -228,33 +255,21 @@ public class S3Utils implements Closeable, StorageOperate {
s3Client.putObject(BUCKET_NAME, dstPath, new File(srcFile));
return true;
} catch (AmazonServiceException e) {
logger.error("upload failed,the bucketName is {},the dstPath is {}", BUCKET_NAME, tenantCode+ FOLDER_SEPARATOR +dstPath);
logger.error("upload failed,the bucketName is {},the filePath is {}", BUCKET_NAME, dstPath);
return false;
}
}
@Override
public List<String> vimFile(String tenantCode,String filePath, int skipLineNums, int limit) throws IOException {
public List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException {
if (StringUtils.isBlank(filePath)) {
logger.error("file path:{} is blank", filePath);
return Collections.emptyList();
}
S3Object s3Object=s3Client.getObject(BUCKET_NAME,filePath);
try(BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))){
Stream<String> stream = bufferedReader.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList());
}
}
private void
createFolder( String folderName) {
if (!s3Client.doesObjectExist(BUCKET_NAME, folderName + FOLDER_SEPARATOR)) {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
InputStream emptyContent = new ByteArrayInputStream(new byte[0]);
PutObjectRequest putObjectRequest = new PutObjectRequest(BUCKET_NAME, folderName + FOLDER_SEPARATOR, emptyContent, metadata);
s3Client.putObject(putObjectRequest);
S3Object s3Object = s3Client.getObject(BUCKET_NAME, filePath);
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))) {
Stream<String> stream = bufferedReader.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList());
}
}
@ -263,6 +278,47 @@ public class S3Utils implements Closeable, StorageOperate {
deleteTenantCode(tenantCode);
}
/**
* S3 resource dir
*
* @param tenantCode tenant code
* @return S3 resource dir
*/
public static String getS3ResDir(String tenantCode) {
return String.format("%s/" + RESOURCE_TYPE_FILE, getS3TenantDir(tenantCode));
}
/**
* S3 udf dir
*
* @param tenantCode tenant code
* @return get udf dir on S3
*/
public static String getS3UdfDir(String tenantCode) {
return String.format("%s/" + RESOURCE_TYPE_UDF, getS3TenantDir(tenantCode));
}
/**
* @param tenantCode tenant code
* @return file directory of tenants on S3
*/
public static String getS3TenantDir(String tenantCode) {
return String.format(FORMAT_S_S, getS3DataBasePath(), tenantCode);
}
/**
* get data S3 path
*
* @return data S3 path
*/
public static String getS3DataBasePath() {
if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) {
return "";
} else {
return RESOURCE_UPLOAD_PATH.replaceFirst(FOLDER_SEPARATOR, "");
}
}
private void deleteTenantCode(String tenantCode) {
deleteDirectory(getResDir(tenantCode));
deleteDirectory(getUdfDir(tenantCode));
@ -271,25 +327,27 @@ public class S3Utils implements Closeable, StorageOperate {
/**
* xxx untest
* upload local directory to S3
*
* @param tenantCode
* @param keyPrefix the name of directory
* @param strPath
*/
private void uploadDirectory(String tenantCode, String keyPrefix, String strPath) {
s3Client.putObject(BUCKET_NAME, tenantCode+ FOLDER_SEPARATOR +keyPrefix, new File(strPath));
s3Client.putObject(BUCKET_NAME, tenantCode + FOLDER_SEPARATOR + keyPrefix, new File(strPath));
}
/**
* xxx untest
* download S3 Directory to local
*
* @param tenantCode
* @param keyPrefix the name of directory
* @param srcPath
*/
private void downloadDirectory(String tenantCode, String keyPrefix, String srcPath){
TransferManager tm= TransferManagerBuilder.standard().withS3Client(s3Client).build();
try{
private void downloadDirectory(String tenantCode, String keyPrefix, String srcPath) {
TransferManager tm = TransferManagerBuilder.standard().withS3Client(s3Client).build();
try {
MultipleFileDownload download = tm.downloadDirectory(BUCKET_NAME, tenantCode + FOLDER_SEPARATOR + keyPrefix, new File(srcPath));
download.waitForCompletion();
} catch (AmazonS3Exception | InterruptedException e) {
@ -300,15 +358,26 @@ public class S3Utils implements Closeable, StorageOperate {
}
}
public void checkBucketNameIfNotPresent(String bucketName) {
if (!s3Client.doesBucketExistV2(bucketName)) {
logger.info("the current regionName is {}", s3Client.getRegionName());
s3Client.createBucket(bucketName);
public void checkBucketNameExists(String bucketName) {
if (StringUtils.isBlank(bucketName)) {
throw new IllegalArgumentException("resource.aws.s3.bucket.name is blank");
}
Bucket existsBucket = s3Client.listBuckets()
.stream()
.filter(
bucket -> bucket.getName().equals(bucketName)
)
.findFirst()
.orElseThrow(() -> {
return new IllegalArgumentException("bucketName: " + bucketName + " is not exists, you need to create them by yourself");
});
logger.info("bucketName: {} has been found, the current regionName is {}", existsBucket.getName(), s3Client.getRegionName());
}
/*
only delete the object of directory ,it`s better to delete the files in it -r
/**
* only delete the object of directory ,it`s better to delete the files in it -r
*/
private void deleteDirectory(String directoryName) {
if (s3Client.doesObjectExist(BUCKET_NAME, directoryName)) {

34
dolphinscheduler-common/src/main/resources/common.properties

@ -18,11 +18,29 @@
# user data local directory path, please make sure the directory exists and have read write permissions
data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# resource storage type: HDFS, S3, NONE
resource.storage.type=NONE
# resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/dolphinscheduler
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=minioadmin
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=minioadmin
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=cn-north-1
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
resource.aws.s3.bucket.name=dolphinscheduler
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
resource.aws.s3.endpoint=http://localhost:9000
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.upload.path=/dolphinscheduler
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
resource.hdfs.fs.defaultFS=hdfs://mycluster:8020
# whether to startup kerberos
hadoop.security.authentication.startup.state=false
@ -38,16 +56,8 @@ login.user.keytab.path=/opt/hdfs.headless.keytab
# kerberos expire time, the unit is hour
kerberos.expire.time=2
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=hdfs://mycluster:8020
aws.access.key.id=minioadmin
aws.secret.access.key=minioadmin
aws.region=us-east-1
aws.endpoint=http://localhost:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty

18
dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties

@ -22,7 +22,7 @@ data.basedir.path=/tmp/dolphinscheduler
resource.storage.type=S3
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.upload.path=/dolphinscheduler
resource.storage.upload.base.path=/dolphinscheduler
# whether to startup kerberos
hadoop.security.authentication.startup.state=false
@ -43,10 +43,10 @@ kerberos.expire.time=2
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
hdfs.root.user=hdfs
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=s3a://dolphinscheduler
resource.hdfs.fs.defaultFS=s3a://dolphinscheduler
# resourcemanager port, the default value is 8088 if not specified
@ -81,7 +81,11 @@ sudo.enable=true
development.state=false
# rpc port
alert.rpc.port=50052
aws.access.key.id=accessKey123
aws.secret.access.key=secretKey123
aws.region=us-east-1
aws.endpoint=http://s3:9000
resource.aws.access.key.id=accessKey123
resource.aws.secret.access.key=secretKey123
resource.aws.region=us-east-1
resource.aws.s3.bucket.name=dolphinscheduler
resource.aws.s3.endpoint=http://s3:9000
# Task resource limit state
task.resource.limit.state=false

12
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

@ -361,9 +361,9 @@ public class TaskConstants {
/**
* hdfs/s3 configuration
* resource.upload.path
* resource.storage.upload.base.path
*/
public static final String RESOURCE_UPLOAD_PATH = "resource.upload.path";
public static final String RESOURCE_UPLOAD_PATH = "resource.storage.upload.base.path";
/**
* data.quality.jar.name
@ -394,14 +394,14 @@ public class TaskConstants {
/**
* aws config
*/
public static final String AWS_ACCESS_KEY_ID= "aws.access.key.id";
public static final String AWS_SECRET_ACCESS_KEY= "aws.secret.access.key";
public static final String AWS_REGION = "aws.region";
public static final String AWS_ACCESS_KEY_ID = "resource.aws.access.key.id";
public static final String AWS_SECRET_ACCESS_KEY = "resource.aws.secret.access.key";
public static final String AWS_REGION = "resource.aws.region";
/**
* zeppelin config
*/
public static final String ZEPPELIN_REST_URL= "zeppelin.rest.url";
public static final String ZEPPELIN_REST_URL = "zeppelin.rest.url";
}

Loading…
Cancel
Save