Browse Source

[Feature-10363][server] S3 Resource center supports bucket customization (#10433)

* [Feature-10363][server] S3 Resource center supports bucket customization

* Modify the configuration. Sort out HDFS and S3 configurations to distinguish them
* Modify the configuration comment to clarify the configuration meaning
* Modify official documents to add missing configuration descriptions and clarify configuration meanings and usage
* Modify the configuration file in e2E to the correct definition, dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties
* Modified code to support bucket customization
* Check that the bucket on personal permission, Otherwise, the startup fails
* S3 increase support resource. Storage. Upload. Base. The base path path
* Change S3Utils implementation style to be consistent with HadoopUtils

* [Feature-10363][server] S3 Resource center supports bucket customization

* Modify code review comments
3.1.0-release
guodong 2 years ago committed by GitHub
parent
commit
2c227ab384
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      docs/docs/en/architecture/configuration.md
  2. 2
      docs/docs/en/faq.md
  3. 31
      docs/docs/en/guide/resource/configuration.md
  4. 41
      docs/docs/zh/architecture/configuration.md
  5. 2
      docs/docs/zh/faq.md
  6. 31
      docs/docs/zh/guide/resource/configuration.md
  7. 17
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  8. 46
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  9. 204
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java
  10. 34
      dolphinscheduler-common/src/main/resources/common.properties
  11. 15
      dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties
  12. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

41
docs/docs/en/architecture/configuration.md

@ -182,27 +182,28 @@ zookeeper.retry.maxtime|10| maximum retry times
### common.properties [hadoop、s3、yarn config properties]
Currently, common.properties mainly configures Hadoop,s3a related configurations.
|Parameters | Default value| Description|
| Parameters | Default value | Description |
|--|--|--|
data.basedir.path|/tmp/dolphinscheduler| local directory used to store temp files
resource.storage.type|NONE| type of resource files: HDFS, S3, NONE
resource.upload.path|/dolphinscheduler| storage path of resource files
hadoop.security.authentication.startup.state|false| whether hadoop grant kerberos permission
java.security.krb5.conf.path|/opt/krb5.conf|kerberos config directory
login.user.keytab.username|hdfs-mycluster@ESZ.COM|kerberos username
login.user.keytab.path|/opt/hdfs.headless.keytab|kerberos user keytab
kerberos.expire.time|2|kerberos expire time,integer,the unit is hour
resource.view.suffixs| txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties| file types supported by resource center
hdfs.root.user|hdfs| configure users with corresponding permissions if storage type is HDFS
fs.defaultFS|hdfs://mycluster:8020|If resource.storage.type=S3, then the request url would be similar to 's3a://dolphinscheduler'. Otherwise if resource.storage.type=HDFS and hadoop supports HA, copy core-site.xml and hdfs-site.xml into 'conf' directory
fs.s3a.endpoint||s3 endpoint url
fs.s3a.access.key||s3 access key
fs.s3a.secret.key||s3 secret key
yarn.resourcemanager.ha.rm.ids||specify the yarn resourcemanager url. if resourcemanager supports HA, input HA IP addresses (separated by comma), or input null for standalone
yarn.application.status.address|http://ds1:8088/ws/v1/cluster/apps/%s|keep default if ResourceManager supports HA or not use ResourceManager, or replace ds1 with corresponding hostname if ResourceManager in standalone mode
dolphinscheduler.env.path|env/dolphinscheduler_env.sh|load environment variables configs [eg: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state|false| specify whether in development state
task.resource.limit.state|false|specify whether in resource limit state
data.basedir.path | /tmp/dolphinscheduler | local directory used to store temp files
resource.storage.type | NONE | type of resource files: HDFS, S3, NONE
resource.storage.upload.base.path | /dolphinscheduler | storage path of resource files
resource.aws.access.key.id | minioadmin | access key id of S3
resource.aws.secret.access.key | minioadmin | secret access key of S3
resource.aws.region |us-east-1 | region of S3
resource.aws.s3.bucket.name | dolphinscheduler | bucket name of S3
resource.aws.s3.endpoint | http://minio:9000 | endpoint of S3
resource.hdfs.root.user | hdfs | configure users with corresponding permissions if storage type is HDFS
resource.hdfs.fs.defaultFS | hdfs://mycluster:8020 | If resource.storage.type=S3, then the request url would be similar to 's3a://dolphinscheduler'. Otherwise if resource.storage.type=HDFS and hadoop supports HA, copy core-site.xml and hdfs-site.xml into 'conf' directory
hadoop.security.authentication.startup.state | false | whether hadoop grant kerberos permission
java.security.krb5.conf.path | /opt/krb5.conf | kerberos config directory
login.user.keytab.username | hdfs-mycluster@ESZ.COM | kerberos username
login.user.keytab.path | /opt/hdfs.headless.keytab | kerberos user keytab
kerberos.expire.time | 2 | kerberos expire time,integer,the unit is hour
yarn.resourcemanager.ha.rm.ids | | specify the yarn resourcemanager url. if resourcemanager supports HA, input HA IP addresses (separated by comma), or input null for standalone
yarn.application.status.address | http://ds1:8088/ws/v1/cluster/apps/%s | keep default if ResourceManager supports HA or not use ResourceManager, or replace ds1 with corresponding hostname if ResourceManager in standalone mode
dolphinscheduler.env.path | env/dolphinscheduler_env.sh | load environment variables configs [eg: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state | false | specify whether in development state
task.resource.limit.state | false | specify whether in resource limit state
### application-api.properties [API-service log config]

2
docs/docs/en/faq.md

@ -126,7 +126,7 @@ A: 1, if the replacement variable contains special characters, **use the \ tra
4, monitorServerState = "false", whether the service monitoring script is started, the default is not to start the service monitoring script. **If the service monitoring script is started, the master and worker services are monitored every 5 minutes, and if the machine is down, it will automatically restart.**
5, hdfsStartupSate="false", whether to enable HDFS resource upload function. The default is not enabled. **If it is not enabled, the resource center cannot be used.** If enabled, you need to configure the configuration of fs.defaultFS and yarn in conf/common/hadoop/hadoop.properties. If you use namenode HA, you need to copy core-site.xml and hdfs-site.xml to the conf root directory.
5, hdfsStartupSate="false", whether to enable HDFS resource upload function. The default is not enabled. **If it is not enabled, the resource center cannot be used.** If enabled, you need to configure the configuration of resource.hdfs.fs.defaultFS and yarn in conf/common/hadoop/hadoop.properties. If you use namenode HA, you need to copy core-site.xml and hdfs-site.xml to the conf root directory.
Note: **The 1.0.x version does not automatically create the hdfs root directory, you need to create it yourself, and you need to deploy the user with hdfs operation permission.**

31
docs/docs/en/guide/resource/configuration.md

@ -11,7 +11,7 @@ For a single machine, you can choose to use local file directory as the upload d
Configure the file in the following paths: `api-server/conf/common.properties` and `worker-server/conf/common.properties`.
- Change `data.basedir.path` to the local directory path. Please make sure the user who deploy dolphinscheduler have read and write permissions, such as: `data.basedir.path=/tmp/dolphinscheduler`. And the directory you configured will be auto-created if it does not exists.
- Modify the following two parameters, `resource.storage.type=HDFS` and `fs.defaultFS=file:///`.
- Modify the following two parameters, `resource.storage.type=HDFS` and `resource.hdfs.fs.defaultFS=file:///`.
## HDFS Resource Configuration
@ -47,7 +47,24 @@ resource.storage.type=HDFS
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration,
# please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.upload.path=/tmp/dolphinscheduler
resource.storage.upload.base.path=/tmp/dolphinscheduler
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=minioadmin
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=minioadmin
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=cn-north-1
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
resource.aws.s3.bucket.name=dolphinscheduler
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
resource.aws.s3.endpoint=http://localhost:9000
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=root
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler;
# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
resource.hdfs.fs.defaultFS=hdfs://localhost:8020
# whether to startup kerberos
hadoop.security.authentication.startup.state=false
@ -65,15 +82,7 @@ login.user.keytab.path=/opt/hdfs.headless.keytab
kerberos.expire.time=2
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
hdfs.root.user=root
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler;
# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=hdfs://localhost:8020
aws.access.key.id=minioadmin
aws.secret.access.key=minioadmin
aws.region=us-east-1
aws.endpoint=http://localhost:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty

41
docs/docs/zh/architecture/configuration.md

@ -174,27 +174,28 @@ zookeeper.retry.maxtime|10|最大重试次数
## 4.common.properties [hadoop、s3、yarn配置]
common.properties配置文件目前主要是配置hadoop/s3a相关的配置.
|参数 |默认值| 描述|
| 参数 | 默认值 | 描述 |
|--|--|--|
data.basedir.path|/tmp/dolphinscheduler|本地工作目录,用于存放临时文件
resource.storage.type|NONE|资源文件存储类型: HDFS,S3,NONE
resource.upload.path|/dolphinscheduler|资源文件存储路径
hadoop.security.authentication.startup.state|false|hadoop是否开启kerberos权限
java.security.krb5.conf.path|/opt/krb5.conf|kerberos配置目录
login.user.keytab.username|hdfs-mycluster@ESZ.COM|kerberos登录用户
login.user.keytab.path|/opt/hdfs.headless.keytab|kerberos登录用户keytab
kerberos.expire.time|2|kerberos过期时间,整数,单位为小时
resource.view.suffixs| txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties|资源中心支持的文件格式
hdfs.root.user|hdfs|如果存储类型为HDFS,需要配置拥有对应操作权限的用户
fs.defaultFS|hdfs://mycluster:8020|请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录
fs.s3a.endpoint||s3 endpoint地址
fs.s3a.access.key||s3 access key
fs.s3a.secret.key||s3 secret key
yarn.resourcemanager.ha.rm.ids||yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可
yarn.application.status.address|http://ds1:8088/ws/v1/cluster/apps/%s|如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
dolphinscheduler.env.path|env/dolphinscheduler_env.sh|运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state|false|是否处于开发模式
task.resource.limit.state|false|是否启用资源限制模式
data.basedir.path | /tmp/dolphinscheduler | 本地工作目录,用于存放临时文件
resource.storage.type | NONE | 资源文件存储类型: HDFS,S3,NONE
resource.storage.upload.base.path | /dolphinscheduler | 资源文件存储路径
resource.aws.access.key.id | minioadmin | S3 access key
resource.aws.secret.access.key | minioadmin | S3 secret access key
resource.aws.region | us-east-1 | S3 区域
resource.aws.s3.bucket.name | dolphinscheduler | S3 存储桶名称
resource.aws.s3.endpoint | http://minio:9000 | s3 endpoint地址
resource.hdfs.root.user | hdfs | 如果存储类型为HDFS,需要配置拥有对应操作权限的用户
resource.hdfs.fs.defaultFS | hdfs://mycluster:8020 | 请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录
hadoop.security.authentication.startup.state | false | hadoop是否开启kerberos权限
java.security.krb5.conf.path | /opt/krb5.conf | kerberos配置目录
login.user.keytab.username | hdfs-mycluster@ESZ.COM | kerberos登录用户
login.user.keytab.path | /opt/hdfs.headless.keytab | kerberos登录用户keytab
kerberos.expire.time | 2 | kerberos过期时间,整数,单位为小时
yarn.resourcemanager.ha.rm.ids | | yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可
yarn.application.status.address | http://ds1:8088/ws/v1/cluster/apps/%s | 如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
dolphinscheduler.env.path | env/dolphinscheduler_env.sh | 运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...]
development.state | false | 是否处于开发模式
task.resource.limit.state | false | 是否启用资源限制模式
## 5.application-api.properties [API服务配置]

2
docs/docs/zh/faq.md

@ -118,7 +118,7 @@ A: 1,如果替换变量中包含特殊字符,**请用 \ 转移符进
4,monitorServerState="false",服务监控脚本是否启动,默认是不启动服务监控脚本的。**如果启动服务监控脚本,则每 5 分钟定时来监控 master 和 worker 的服务是否 down 机,如果 down 机则会自动重启**
5,hdfsStartupSate="false",是否开启 HDFS 资源上传功能。默认是不开启的,**如果不开启则资源中心是不能使用的**。如果开启,需要 conf/common/hadoop/hadoop.properties 中配置 fs.defaultFS 和 yarn 的相关配置,如果使用 namenode HA,需要将 core-site.xml 和 hdfs-site.xml 复制到conf根目录下
5,hdfsStartupSate="false",是否开启 HDFS 资源上传功能。默认是不开启的,**如果不开启则资源中心是不能使用的**。如果开启,需要 conf/common/hadoop/hadoop.properties 中配置 resource.hdfs.fs.defaultFS 和 yarn 的相关配置,如果使用 namenode HA,需要将 core-site.xml 和 hdfs-site.xml 复制到conf根目录下
注意:**1.0.x 版本是不会自动创建 hdfs 根目录的,需要自行创建,并且需要部署用户有hdfs的操作权限**

31
docs/docs/zh/guide/resource/configuration.md

@ -11,7 +11,7 @@
对以下路径的文件进行配置:`api-server/conf/common.properties` 和 `worker-server/conf/common.properties`
- 将 `data.basedir.path` 改为本地存储路径,请确保部署 DolphinScheduler 的用户拥有读写权限,例如:`data.basedir.path=/tmp/dolphinscheduler`。当路径不存在时会自动创建文件夹
- 修改下列两个参数,分别是 `resource.storage.type=HDFS``fs.defaultFS=file:///`
- 修改下列两个参数,分别是 `resource.storage.type=HDFS``resource.hdfs.fs.defaultFS=file:///`
## HDFS 资源配置
@ -47,7 +47,24 @@ resource.storage.type=HDFS
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration,
# please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.upload.path=/tmp/dolphinscheduler
resource.storage.upload.base.path=/tmp/dolphinscheduler
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=minioadmin
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=minioadmin
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=cn-north-1
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
resource.aws.s3.bucket.name=dolphinscheduler
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
resource.aws.s3.endpoint=http://localhost:9000
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=root
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler;
# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
resource.hdfs.fs.defaultFS=hdfs://localhost:8020
# whether to startup kerberos
hadoop.security.authentication.startup.state=false
@ -65,15 +82,7 @@ login.user.keytab.path=/opt/hdfs.headless.keytab
kerberos.expire.time=2
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
hdfs.root.user=root
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler;
# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=hdfs://localhost:8020
aws.access.key.id=minioadmin
aws.secret.access.key=minioadmin
aws.region=us-east-1
aws.endpoint=http://localhost:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty

17
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -61,14 +61,12 @@ public final class Constants {
public static final String STORAGE_HDFS = "HDFS";
public static final String BUCKET_NAME = "dolphinscheduler-test";
public static final String EMPTY_STRING = "";
/**
* fs.defaultFS
* resource.hdfs.fs.defaultFS
*/
public static final String FS_DEFAULT_FS = "fs.defaultFS";
public static final String FS_DEFAULT_FS = "resource.hdfs.fs.defaultFS";
/**
@ -96,15 +94,15 @@ public final class Constants {
/**
* hdfs configuration
* hdfs.root.user
* resource.hdfs.root.user
*/
public static final String HDFS_ROOT_USER = "hdfs.root.user";
public static final String HDFS_ROOT_USER = "resource.hdfs.root.user";
/**
* hdfs/s3 configuration
* resource.upload.path
* resource.storage.upload.base.path
*/
public static final String RESOURCE_UPLOAD_PATH = "resource.upload.path";
public static final String RESOURCE_UPLOAD_PATH = "resource.storage.upload.base.path";
/**
* data basedir path
@ -148,7 +146,8 @@ public final class Constants {
*/
public static final String RESOURCE_STORAGE_TYPE = "resource.storage.type";
public static final String AWS_END_POINT = "aws.endpoint";
public static final String AWS_S3_BUCKET_NAME = "resource.aws.s3.bucket.name";
public static final String AWS_END_POINT = "resource.aws.s3.endpoint";
/**
* comma ,
*/

46
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -21,6 +21,25 @@ import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.exception.BaseException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
@ -35,28 +54,14 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.exception.BaseException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* hadoop utils
@ -149,10 +154,9 @@ public class HadoopUtils implements Closeable, StorageOperate {
return true;
});
} else {
logger.warn("hdfs.root.user is not set value!");
logger.warn("resource.hdfs.root.user is not set value!");
fs = FileSystem.get(configuration);
}
//
} catch (Exception e) {
logger.error(e.getMessage(), e);
@ -275,7 +279,7 @@ public class HadoopUtils implements Closeable, StorageOperate {
* @throws IOException errors
*/
@Override
public boolean mkdir(String bucketName, String hdfsPath) throws IOException {
public boolean mkdir(String tenantCode, String hdfsPath) throws IOException {
return fs.mkdirs(new Path(hdfsPath));
}

204
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java

@ -17,28 +17,21 @@
package org.apache.dolphinscheduler.common.utils;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.transfer.MultipleFileDownload;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import org.apache.commons.lang3.StringUtils;
import static org.apache.dolphinscheduler.common.Constants.AWS_END_POINT;
import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
@ -54,14 +47,25 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.AWS_END_POINT;
import static org.apache.dolphinscheduler.common.Constants.BUCKET_NAME;
import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR;
import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF;
import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.transfer.MultipleFileDownload;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
public class S3Utils implements Closeable, StorageOperate {
@ -73,6 +77,8 @@ public class S3Utils implements Closeable, StorageOperate {
public static final String REGION = PropertyUtils.getString(TaskConstants.AWS_REGION);
public static final String BUCKET_NAME = PropertyUtils.getString(Constants.AWS_S3_BUCKET_NAME);
private AmazonS3 s3Client = null;
@ -81,19 +87,19 @@ public class S3Utils implements Closeable, StorageOperate {
if (!StringUtils.isEmpty(PropertyUtils.getString(AWS_END_POINT))) {
s3Client = AmazonS3ClientBuilder
.standard()
.withPathStyleAccessEnabled(true)
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(PropertyUtils.getString(AWS_END_POINT), Regions.fromName(REGION).getName()))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
.build();
.standard()
.withPathStyleAccessEnabled(true)
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(PropertyUtils.getString(AWS_END_POINT), Regions.fromName(REGION).getName()))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
.build();
} else {
s3Client = AmazonS3ClientBuilder
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
.withRegion(Regions.fromName(REGION))
.build();
.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID)))
.withRegion(Regions.fromName(REGION))
.build();
}
checkBucketNameIfNotPresent(BUCKET_NAME);
checkBucketNameExists(BUCKET_NAME);
}
}
@ -125,24 +131,31 @@ public class S3Utils implements Closeable, StorageOperate {
@Override
public void createTenantDirIfNotExists(String tenantCode) throws Exception {
createFolder(tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_UDF);
createFolder(tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_FILE);
getInstance().mkdir(tenantCode, getS3ResDir(tenantCode));
getInstance().mkdir(tenantCode, getS3UdfDir(tenantCode));
}
@Override
public String getResDir(String tenantCode) {
return tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_FILE+FOLDER_SEPARATOR;
return getS3ResDir(tenantCode) + FOLDER_SEPARATOR;
}
@Override
public String getUdfDir(String tenantCode) {
return tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_UDF+FOLDER_SEPARATOR;
return getS3UdfDir(tenantCode) + FOLDER_SEPARATOR;
}
@Override
public boolean mkdir(String tenantCode, String path) throws IOException {
createFolder(path);
return true;
String objectName = path + FOLDER_SEPARATOR;
if (!s3Client.doesObjectExist(BUCKET_NAME, objectName)) {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
InputStream emptyContent = new ByteArrayInputStream(new byte[0]);
PutObjectRequest putObjectRequest = new PutObjectRequest(BUCKET_NAME, objectName, emptyContent, metadata);
s3Client.putObject(putObjectRequest);
}
return true;
}
@Override
@ -150,21 +163,22 @@ public class S3Utils implements Closeable, StorageOperate {
if (fileName.startsWith(FOLDER_SEPARATOR)) {
fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
}
return String.format(FORMAT_S_S, tenantCode+FOLDER_SEPARATOR+RESOURCE_TYPE_FILE, fileName);
return String.format(FORMAT_S_S, getS3ResDir(tenantCode), fileName);
}
@Override
public String getFileName(ResourceType resourceType, String tenantCode, String fileName) {
if (fileName.startsWith(FOLDER_SEPARATOR)) {
fileName = fileName.replaceFirst(FOLDER_SEPARATOR, "");
}
return getDir(resourceType, tenantCode)+fileName;
return getDir(resourceType, tenantCode) + fileName;
}
@Override
public void download(String tenantCode, String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite) throws IOException {
S3Object o = s3Client.getObject(BUCKET_NAME, srcFilePath);
try (S3ObjectInputStream s3is = o.getObjectContent();
FileOutputStream fos = new FileOutputStream(new File(dstFile))) {
FileOutputStream fos = new FileOutputStream(dstFile)) {
byte[] readBuf = new byte[1024];
int readLen = 0;
while ((readLen = s3is.read(readBuf)) > 0) {
@ -210,7 +224,7 @@ public class S3Utils implements Closeable, StorageOperate {
case FILE:
return getResDir(tenantCode);
default:
return tenantCode+ FOLDER_SEPARATOR ;
return "";
}
}
@ -221,33 +235,21 @@ public class S3Utils implements Closeable, StorageOperate {
s3Client.putObject(BUCKET_NAME, dstPath, new File(srcFile));
return true;
} catch (AmazonServiceException e) {
logger.error("upload failed,the bucketName is {},the dstPath is {}", BUCKET_NAME, tenantCode+ FOLDER_SEPARATOR +dstPath);
logger.error("upload failed,the bucketName is {},the filePath is {}", BUCKET_NAME, dstPath);
return false;
}
}
@Override
public List<String> vimFile(String tenantCode,String filePath, int skipLineNums, int limit) throws IOException {
public List<String> vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException {
if (StringUtils.isBlank(filePath)) {
logger.error("file path:{} is blank", filePath);
return Collections.emptyList();
}
S3Object s3Object=s3Client.getObject(BUCKET_NAME,filePath);
try(BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))){
Stream<String> stream = bufferedReader.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList());
}
}
private void
createFolder( String folderName) {
if (!s3Client.doesObjectExist(BUCKET_NAME, folderName + FOLDER_SEPARATOR)) {
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(0);
InputStream emptyContent = new ByteArrayInputStream(new byte[0]);
PutObjectRequest putObjectRequest = new PutObjectRequest(BUCKET_NAME, folderName + FOLDER_SEPARATOR, emptyContent, metadata);
s3Client.putObject(putObjectRequest);
S3Object s3Object = s3Client.getObject(BUCKET_NAME, filePath);
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))) {
Stream<String> stream = bufferedReader.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList());
}
}
@ -256,6 +258,47 @@ public class S3Utils implements Closeable, StorageOperate {
deleteTenantCode(tenantCode);
}
/**
* S3 resource dir
*
* @param tenantCode tenant code
* @return S3 resource dir
*/
public static String getS3ResDir(String tenantCode) {
return String.format("%s/" + RESOURCE_TYPE_FILE, getS3TenantDir(tenantCode));
}
/**
* S3 udf dir
*
* @param tenantCode tenant code
* @return get udf dir on S3
*/
public static String getS3UdfDir(String tenantCode) {
return String.format("%s/" + RESOURCE_TYPE_UDF, getS3TenantDir(tenantCode));
}
/**
* @param tenantCode tenant code
* @return file directory of tenants on S3
*/
public static String getS3TenantDir(String tenantCode) {
return String.format(FORMAT_S_S, getS3DataBasePath(), tenantCode);
}
/**
* get data S3 path
*
* @return data S3 path
*/
public static String getS3DataBasePath() {
if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) {
return "";
} else {
return RESOURCE_UPLOAD_PATH.replaceFirst(FOLDER_SEPARATOR, "");
}
}
private void deleteTenantCode(String tenantCode) {
deleteDirectory(getResDir(tenantCode));
deleteDirectory(getUdfDir(tenantCode));
@ -264,25 +307,27 @@ public class S3Utils implements Closeable, StorageOperate {
/**
* xxx untest
* upload local directory to S3
*
* @param tenantCode
* @param keyPrefix the name of directory
* @param strPath
*/
private void uploadDirectory(String tenantCode, String keyPrefix, String strPath) {
s3Client.putObject(BUCKET_NAME, tenantCode+ FOLDER_SEPARATOR +keyPrefix, new File(strPath));
s3Client.putObject(BUCKET_NAME, tenantCode + FOLDER_SEPARATOR + keyPrefix, new File(strPath));
}
/**
* xxx untest
* download S3 Directory to local
*
* @param tenantCode
* @param keyPrefix the name of directory
* @param srcPath
*/
private void downloadDirectory(String tenantCode, String keyPrefix, String srcPath){
TransferManager tm= TransferManagerBuilder.standard().withS3Client(s3Client).build();
try{
private void downloadDirectory(String tenantCode, String keyPrefix, String srcPath) {
TransferManager tm = TransferManagerBuilder.standard().withS3Client(s3Client).build();
try {
MultipleFileDownload download = tm.downloadDirectory(BUCKET_NAME, tenantCode + FOLDER_SEPARATOR + keyPrefix, new File(srcPath));
download.waitForCompletion();
} catch (AmazonS3Exception | InterruptedException e) {
@ -293,15 +338,26 @@ public class S3Utils implements Closeable, StorageOperate {
}
}
public void checkBucketNameIfNotPresent(String bucketName) {
if (!s3Client.doesBucketExistV2(bucketName)) {
logger.info("the current regionName is {}", s3Client.getRegionName());
s3Client.createBucket(bucketName);
public void checkBucketNameExists(String bucketName) {
if (StringUtils.isBlank(bucketName)) {
throw new IllegalArgumentException("resource.aws.s3.bucket.name is blank");
}
Bucket existsBucket = s3Client.listBuckets()
.stream()
.filter(
bucket -> bucket.getName().equals(bucketName)
)
.findFirst()
.orElseThrow(() -> {
return new IllegalArgumentException("bucketName: " + bucketName + " is not exists, you need to create them by yourself");
});
logger.info("bucketName: {} has been found, the current regionName is {}", existsBucket.getName(), s3Client.getRegionName());
}
/*
only delete the object of directory ,it`s better to delete the files in it -r
/**
* only delete the object of directory ,it`s better to delete the files in it -r
*/
private void deleteDirectory(String directoryName) {
if (s3Client.doesObjectExist(BUCKET_NAME, directoryName)) {

34
dolphinscheduler-common/src/main/resources/common.properties

@ -18,11 +18,29 @@
# user data local directory path, please make sure the directory exists and have read write permissions
data.basedir.path=/tmp/dolphinscheduler
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# resource storage type: HDFS, S3, NONE
resource.storage.type=NONE
# resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.storage.upload.base.path=/dolphinscheduler
# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.access.key.id=minioadmin
# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.secret.access.key=minioadmin
# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required
resource.aws.region=cn-north-1
# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name.
resource.aws.s3.bucket.name=dolphinscheduler
# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn
resource.aws.s3.endpoint=http://localhost:9000
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.upload.path=/dolphinscheduler
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
resource.hdfs.fs.defaultFS=hdfs://mycluster:8020
# whether to startup kerberos
hadoop.security.authentication.startup.state=false
@ -38,16 +56,8 @@ login.user.keytab.path=/opt/hdfs.headless.keytab
# kerberos expire time, the unit is hour
kerberos.expire.time=2
# resource view suffixs
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=hdfs://mycluster:8020
aws.access.key.id=minioadmin
aws.secret.access.key=minioadmin
aws.region=us-east-1
aws.endpoint=http://localhost:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
# if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty

15
dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties

@ -22,7 +22,7 @@ data.basedir.path=/tmp/dolphinscheduler
resource.storage.type=S3
# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended
resource.upload.path=/dolphinscheduler
resource.storage.upload.base.path=/dolphinscheduler
# whether to startup kerberos
hadoop.security.authentication.startup.state=false
@ -43,10 +43,10 @@ kerberos.expire.time=2
#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js
# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path
hdfs.root.user=hdfs
resource.hdfs.root.user=hdfs
# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=s3a://dolphinscheduler
resource.hdfs.fs.defaultFS=s3a://dolphinscheduler
# resourcemanager port, the default value is 8088 if not specified
@ -81,10 +81,11 @@ sudo.enable=true
development.state=false
# rpc port
alert.rpc.port=50052
aws.access.key.id=accessKey123
aws.secret.access.key=secretKey123
aws.region=us-east-1
aws.endpoint=http://s3:9000
resource.aws.access.key.id=accessKey123
resource.aws.secret.access.key=secretKey123
resource.aws.region=us-east-1
resource.aws.s3.bucket.name=dolphinscheduler
resource.aws.s3.endpoint=http://s3:9000
# Task resource limit state
task.resource.limit.state=false

12
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

@ -365,9 +365,9 @@ public class TaskConstants {
/**
* hdfs/s3 configuration
* resource.upload.path
* resource.storage.upload.base.path
*/
public static final String RESOURCE_UPLOAD_PATH = "resource.upload.path";
public static final String RESOURCE_UPLOAD_PATH = "resource.storage.upload.base.path";
/**
* data.quality.jar.name
@ -400,9 +400,9 @@ public class TaskConstants {
/**
* aws config
*/
public static final String AWS_ACCESS_KEY_ID= "aws.access.key.id";
public static final String AWS_SECRET_ACCESS_KEY= "aws.secret.access.key";
public static final String AWS_REGION = "aws.region";
public static final String AWS_ACCESS_KEY_ID = "resource.aws.access.key.id";
public static final String AWS_SECRET_ACCESS_KEY = "resource.aws.secret.access.key";
public static final String AWS_REGION = "resource.aws.region";
/**
* use for k8s task
@ -424,7 +424,7 @@ public class TaskConstants {
/**
* zeppelin config
*/
public static final String ZEPPELIN_REST_URL= "zeppelin.rest.url";
public static final String ZEPPELIN_REST_URL = "zeppelin.rest.url";
/**
* conda config used by jupyter task plugin

Loading…
Cancel
Save