diff --git a/docs/docs/en/architecture/configuration.md b/docs/docs/en/architecture/configuration.md index 7fb176219f..57cbdddaa3 100644 --- a/docs/docs/en/architecture/configuration.md +++ b/docs/docs/en/architecture/configuration.md @@ -182,27 +182,28 @@ zookeeper.retry.maxtime|10| maximum retry times ### common.properties [hadoop、s3、yarn config properties] Currently, common.properties mainly configures Hadoop,s3a related configurations. -|Parameters | Default value| Description| +| Parameters | Default value | Description | |--|--|--| -data.basedir.path|/tmp/dolphinscheduler| local directory used to store temp files -resource.storage.type|NONE| type of resource files: HDFS, S3, NONE -resource.upload.path|/dolphinscheduler| storage path of resource files -hadoop.security.authentication.startup.state|false| whether hadoop grant kerberos permission -java.security.krb5.conf.path|/opt/krb5.conf|kerberos config directory -login.user.keytab.username|hdfs-mycluster@ESZ.COM|kerberos username -login.user.keytab.path|/opt/hdfs.headless.keytab|kerberos user keytab -kerberos.expire.time|2|kerberos expire time,integer,the unit is hour -resource.view.suffixs| txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties| file types supported by resource center -hdfs.root.user|hdfs| configure users with corresponding permissions if storage type is HDFS -fs.defaultFS|hdfs://mycluster:8020|If resource.storage.type=S3, then the request url would be similar to 's3a://dolphinscheduler'. Otherwise if resource.storage.type=HDFS and hadoop supports HA, copy core-site.xml and hdfs-site.xml into 'conf' directory -fs.s3a.endpoint||s3 endpoint url -fs.s3a.access.key||s3 access key -fs.s3a.secret.key||s3 secret key -yarn.resourcemanager.ha.rm.ids||specify the yarn resourcemanager url. if resourcemanager supports HA, input HA IP addresses (separated by comma), or input null for standalone -yarn.application.status.address|http://ds1:8088/ws/v1/cluster/apps/%s|keep default if ResourceManager supports HA or not use ResourceManager, or replace ds1 with corresponding hostname if ResourceManager in standalone mode -dolphinscheduler.env.path|env/dolphinscheduler_env.sh|load environment variables configs [eg: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...] -development.state|false| specify whether in development state -task.resource.limit.state|false|specify whether in resource limit state +data.basedir.path | /tmp/dolphinscheduler | local directory used to store temp files +resource.storage.type | NONE | type of resource files: HDFS, S3, NONE +resource.storage.upload.base.path | /dolphinscheduler | storage path of resource files +resource.aws.access.key.id | minioadmin | access key id of S3 +resource.aws.secret.access.key | minioadmin | secret access key of S3 +resource.aws.region |us-east-1 | region of S3 +resource.aws.s3.bucket.name | dolphinscheduler | bucket name of S3 +resource.aws.s3.endpoint | http://minio:9000 | endpoint of S3 +resource.hdfs.root.user | hdfs | configure users with corresponding permissions if storage type is HDFS +resource.hdfs.fs.defaultFS | hdfs://mycluster:8020 | If resource.storage.type=S3, then the request url would be similar to 's3a://dolphinscheduler'. Otherwise if resource.storage.type=HDFS and hadoop supports HA, copy core-site.xml and hdfs-site.xml into 'conf' directory +hadoop.security.authentication.startup.state | false | whether hadoop grant kerberos permission +java.security.krb5.conf.path | /opt/krb5.conf | kerberos config directory +login.user.keytab.username | hdfs-mycluster@ESZ.COM | kerberos username +login.user.keytab.path | /opt/hdfs.headless.keytab | kerberos user keytab +kerberos.expire.time | 2 | kerberos expire time,integer,the unit is hour +yarn.resourcemanager.ha.rm.ids | | specify the yarn resourcemanager url. if resourcemanager supports HA, input HA IP addresses (separated by comma), or input null for standalone +yarn.application.status.address | http://ds1:8088/ws/v1/cluster/apps/%s | keep default if ResourceManager supports HA or not use ResourceManager, or replace ds1 with corresponding hostname if ResourceManager in standalone mode +dolphinscheduler.env.path | env/dolphinscheduler_env.sh | load environment variables configs [eg: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...] +development.state | false | specify whether in development state +task.resource.limit.state | false | specify whether in resource limit state ### application-api.properties [API-service log config] diff --git a/docs/docs/en/faq.md b/docs/docs/en/faq.md index 997d38779e..0fe807b0c8 100644 --- a/docs/docs/en/faq.md +++ b/docs/docs/en/faq.md @@ -126,7 +126,7 @@ A: 1, if the replacement variable contains special characters, **use the \ tra ​ 4, monitorServerState = "false", whether the service monitoring script is started, the default is not to start the service monitoring script. **If the service monitoring script is started, the master and worker services are monitored every 5 minutes, and if the machine is down, it will automatically restart.** -​ 5, hdfsStartupSate="false", whether to enable HDFS resource upload function. The default is not enabled. **If it is not enabled, the resource center cannot be used.** If enabled, you need to configure the configuration of fs.defaultFS and yarn in conf/common/hadoop/hadoop.properties. If you use namenode HA, you need to copy core-site.xml and hdfs-site.xml to the conf root directory. +​ 5, hdfsStartupSate="false", whether to enable HDFS resource upload function. The default is not enabled. **If it is not enabled, the resource center cannot be used.** If enabled, you need to configure the configuration of resource.hdfs.fs.defaultFS and yarn in conf/common/hadoop/hadoop.properties. If you use namenode HA, you need to copy core-site.xml and hdfs-site.xml to the conf root directory. ​ Note: **The 1.0.x version does not automatically create the hdfs root directory, you need to create it yourself, and you need to deploy the user with hdfs operation permission.** diff --git a/docs/docs/en/guide/resource/configuration.md b/docs/docs/en/guide/resource/configuration.md index b411a929f4..5677e1917a 100644 --- a/docs/docs/en/guide/resource/configuration.md +++ b/docs/docs/en/guide/resource/configuration.md @@ -11,7 +11,7 @@ For a single machine, you can choose to use local file directory as the upload d Configure the file in the following paths: `api-server/conf/common.properties` and `worker-server/conf/common.properties`. - Change `data.basedir.path` to the local directory path. Please make sure the user who deploy dolphinscheduler have read and write permissions, such as: `data.basedir.path=/tmp/dolphinscheduler`. And the directory you configured will be auto-created if it does not exists. -- Modify the following two parameters, `resource.storage.type=HDFS` and `fs.defaultFS=file:///`. +- Modify the following two parameters, `resource.storage.type=HDFS` and `resource.hdfs.fs.defaultFS=file:///`. ## HDFS Resource Configuration @@ -47,7 +47,24 @@ resource.storage.type=HDFS # resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, # please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended -resource.upload.path=/tmp/dolphinscheduler +resource.storage.upload.base.path=/tmp/dolphinscheduler + +# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required +resource.aws.access.key.id=minioadmin +# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required +resource.aws.secret.access.key=minioadmin +# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required +resource.aws.region=cn-north-1 +# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name. +resource.aws.s3.bucket.name=dolphinscheduler +# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn +resource.aws.s3.endpoint=http://localhost:9000 + +# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path +resource.hdfs.root.user=root +# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; +# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir +resource.hdfs.fs.defaultFS=hdfs://localhost:8020 # whether to startup kerberos hadoop.security.authentication.startup.state=false @@ -65,15 +82,7 @@ login.user.keytab.path=/opt/hdfs.headless.keytab kerberos.expire.time=2 # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path -hdfs.root.user=root -# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; -# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir -fs.defaultFS=hdfs://localhost:8020 -aws.access.key.id=minioadmin -aws.secret.access.key=minioadmin -aws.region=us-east-1 -aws.endpoint=http://localhost:9000 + # resourcemanager port, the default value is 8088 if not specified resource.manager.httpaddress.port=8088 # if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty diff --git a/docs/docs/zh/architecture/configuration.md b/docs/docs/zh/architecture/configuration.md index ec413b5404..47055a1b87 100644 --- a/docs/docs/zh/architecture/configuration.md +++ b/docs/docs/zh/architecture/configuration.md @@ -174,27 +174,28 @@ zookeeper.retry.maxtime|10|最大重试次数 ## 4.common.properties [hadoop、s3、yarn配置] common.properties配置文件目前主要是配置hadoop/s3a相关的配置. -|参数 |默认值| 描述| +| 参数 | 默认值 | 描述 | |--|--|--| -data.basedir.path|/tmp/dolphinscheduler|本地工作目录,用于存放临时文件 -resource.storage.type|NONE|资源文件存储类型: HDFS,S3,NONE -resource.upload.path|/dolphinscheduler|资源文件存储路径 -hadoop.security.authentication.startup.state|false|hadoop是否开启kerberos权限 -java.security.krb5.conf.path|/opt/krb5.conf|kerberos配置目录 -login.user.keytab.username|hdfs-mycluster@ESZ.COM|kerberos登录用户 -login.user.keytab.path|/opt/hdfs.headless.keytab|kerberos登录用户keytab -kerberos.expire.time|2|kerberos过期时间,整数,单位为小时 -resource.view.suffixs| txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties|资源中心支持的文件格式 -hdfs.root.user|hdfs|如果存储类型为HDFS,需要配置拥有对应操作权限的用户 -fs.defaultFS|hdfs://mycluster:8020|请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录 -fs.s3a.endpoint||s3 endpoint地址 -fs.s3a.access.key||s3 access key -fs.s3a.secret.key||s3 secret key -yarn.resourcemanager.ha.rm.ids||yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可 -yarn.application.status.address|http://ds1:8088/ws/v1/cluster/apps/%s|如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname -dolphinscheduler.env.path|env/dolphinscheduler_env.sh|运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...] -development.state|false|是否处于开发模式 -task.resource.limit.state|false|是否启用资源限制模式 +data.basedir.path | /tmp/dolphinscheduler | 本地工作目录,用于存放临时文件 +resource.storage.type | NONE | 资源文件存储类型: HDFS,S3,NONE +resource.storage.upload.base.path | /dolphinscheduler | 资源文件存储路径 +resource.aws.access.key.id | minioadmin | S3 access key +resource.aws.secret.access.key | minioadmin | S3 secret access key +resource.aws.region | us-east-1 | S3 区域 +resource.aws.s3.bucket.name | dolphinscheduler | S3 存储桶名称 +resource.aws.s3.endpoint | http://minio:9000 | s3 endpoint地址 +resource.hdfs.root.user | hdfs | 如果存储类型为HDFS,需要配置拥有对应操作权限的用户 +resource.hdfs.fs.defaultFS | hdfs://mycluster:8020 | 请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录 +hadoop.security.authentication.startup.state | false | hadoop是否开启kerberos权限 +java.security.krb5.conf.path | /opt/krb5.conf | kerberos配置目录 +login.user.keytab.username | hdfs-mycluster@ESZ.COM | kerberos登录用户 +login.user.keytab.path | /opt/hdfs.headless.keytab | kerberos登录用户keytab +kerberos.expire.time | 2 | kerberos过期时间,整数,单位为小时 +yarn.resourcemanager.ha.rm.ids | | yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可 +yarn.application.status.address | http://ds1:8088/ws/v1/cluster/apps/%s | 如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname +dolphinscheduler.env.path | env/dolphinscheduler_env.sh | 运行脚本加载环境变量配置文件[如: JAVA_HOME,HADOOP_HOME, HIVE_HOME ...] +development.state | false | 是否处于开发模式 +task.resource.limit.state | false | 是否启用资源限制模式 ## 5.application-api.properties [API服务配置] diff --git a/docs/docs/zh/faq.md b/docs/docs/zh/faq.md index 4c73ee05a7..d3745ca767 100644 --- a/docs/docs/zh/faq.md +++ b/docs/docs/zh/faq.md @@ -118,7 +118,7 @@ A: 1,如果替换变量中包含特殊字符,**请用 \ 转移符进 ​ 4,monitorServerState="false",服务监控脚本是否启动,默认是不启动服务监控脚本的。**如果启动服务监控脚本,则每 5 分钟定时来监控 master 和 worker 的服务是否 down 机,如果 down 机则会自动重启** -​ 5,hdfsStartupSate="false",是否开启 HDFS 资源上传功能。默认是不开启的,**如果不开启则资源中心是不能使用的**。如果开启,需要 conf/common/hadoop/hadoop.properties 中配置 fs.defaultFS 和 yarn 的相关配置,如果使用 namenode HA,需要将 core-site.xml 和 hdfs-site.xml 复制到conf根目录下 +​ 5,hdfsStartupSate="false",是否开启 HDFS 资源上传功能。默认是不开启的,**如果不开启则资源中心是不能使用的**。如果开启,需要 conf/common/hadoop/hadoop.properties 中配置 resource.hdfs.fs.defaultFS 和 yarn 的相关配置,如果使用 namenode HA,需要将 core-site.xml 和 hdfs-site.xml 复制到conf根目录下 ​ 注意:**1.0.x 版本是不会自动创建 hdfs 根目录的,需要自行创建,并且需要部署用户有hdfs的操作权限** diff --git a/docs/docs/zh/guide/resource/configuration.md b/docs/docs/zh/guide/resource/configuration.md index 3c6d66281b..e8edee566c 100644 --- a/docs/docs/zh/guide/resource/configuration.md +++ b/docs/docs/zh/guide/resource/configuration.md @@ -11,7 +11,7 @@ 对以下路径的文件进行配置:`api-server/conf/common.properties` 和 `worker-server/conf/common.properties` - 将 `data.basedir.path` 改为本地存储路径,请确保部署 DolphinScheduler 的用户拥有读写权限,例如:`data.basedir.path=/tmp/dolphinscheduler`。当路径不存在时会自动创建文件夹 -- 修改下列两个参数,分别是 `resource.storage.type=HDFS` 和 `fs.defaultFS=file:///`。 +- 修改下列两个参数,分别是 `resource.storage.type=HDFS` 和 `resource.hdfs.fs.defaultFS=file:///`。 ## HDFS 资源配置 @@ -47,7 +47,24 @@ resource.storage.type=HDFS # resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, # please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended -resource.upload.path=/tmp/dolphinscheduler +resource.storage.upload.base.path=/tmp/dolphinscheduler + +# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required +resource.aws.access.key.id=minioadmin +# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required +resource.aws.secret.access.key=minioadmin +# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required +resource.aws.region=cn-north-1 +# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name. +resource.aws.s3.bucket.name=dolphinscheduler +# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn +resource.aws.s3.endpoint=http://localhost:9000 + +# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path +resource.hdfs.root.user=root +# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; +# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir +resource.hdfs.fs.defaultFS=hdfs://localhost:8020 # whether to startup kerberos hadoop.security.authentication.startup.state=false @@ -65,15 +82,7 @@ login.user.keytab.path=/opt/hdfs.headless.keytab kerberos.expire.time=2 # resource view suffixs #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path -hdfs.root.user=root -# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; -# if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir -fs.defaultFS=hdfs://localhost:8020 -aws.access.key.id=minioadmin -aws.secret.access.key=minioadmin -aws.region=us-east-1 -aws.endpoint=http://localhost:9000 + # resourcemanager port, the default value is 8088 if not specified resource.manager.httpaddress.port=8088 # if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 2c37b98ecc..060779bedc 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -61,14 +61,12 @@ public final class Constants { public static final String STORAGE_HDFS = "HDFS"; - public static final String BUCKET_NAME = "dolphinscheduler-test"; - public static final String EMPTY_STRING = ""; /** - * fs.defaultFS + * resource.hdfs.fs.defaultFS */ - public static final String FS_DEFAULT_FS = "fs.defaultFS"; + public static final String FS_DEFAULT_FS = "resource.hdfs.fs.defaultFS"; /** @@ -96,15 +94,15 @@ public final class Constants { /** * hdfs configuration - * hdfs.root.user + * resource.hdfs.root.user */ - public static final String HDFS_ROOT_USER = "hdfs.root.user"; + public static final String HDFS_ROOT_USER = "resource.hdfs.root.user"; /** * hdfs/s3 configuration - * resource.upload.path + * resource.storage.upload.base.path */ - public static final String RESOURCE_UPLOAD_PATH = "resource.upload.path"; + public static final String RESOURCE_UPLOAD_PATH = "resource.storage.upload.base.path"; /** * data basedir path @@ -148,7 +146,8 @@ public final class Constants { */ public static final String RESOURCE_STORAGE_TYPE = "resource.storage.type"; - public static final String AWS_END_POINT = "aws.endpoint"; + public static final String AWS_S3_BUCKET_NAME = "resource.aws.s3.bucket.name"; + public static final String AWS_END_POINT = "resource.aws.s3.endpoint"; /** * comma , */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index c8a8928f28..0fc3fa13de 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -21,6 +21,25 @@ import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR; import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S; import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE; import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF; + +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ResUploadType; +import org.apache.dolphinscheduler.common.exception.BaseException; +import org.apache.dolphinscheduler.common.storage.StorageOperate; +import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; +import org.apache.dolphinscheduler.spi.enums.ResourceType; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.security.UserGroupInformation; + import java.io.BufferedReader; import java.io.Closeable; import java.io.File; @@ -35,28 +54,14 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.ResUploadType; -import org.apache.dolphinscheduler.common.exception.BaseException; -import org.apache.dolphinscheduler.common.storage.StorageOperate; -import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; -import org.apache.dolphinscheduler.spi.enums.ResourceType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.security.UserGroupInformation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * hadoop utils @@ -149,10 +154,9 @@ public class HadoopUtils implements Closeable, StorageOperate { return true; }); } else { - logger.warn("hdfs.root.user is not set value!"); + logger.warn("resource.hdfs.root.user is not set value!"); fs = FileSystem.get(configuration); } -// } catch (Exception e) { logger.error(e.getMessage(), e); @@ -275,7 +279,7 @@ public class HadoopUtils implements Closeable, StorageOperate { * @throws IOException errors */ @Override - public boolean mkdir(String bucketName, String hdfsPath) throws IOException { + public boolean mkdir(String tenantCode, String hdfsPath) throws IOException { return fs.mkdirs(new Path(hdfsPath)); } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java index 2471b2e990..2fdd2586eb 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java @@ -17,28 +17,21 @@ package org.apache.dolphinscheduler.common.utils; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.regions.Regions; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectInputStream; -import com.amazonaws.services.s3.transfer.MultipleFileDownload; -import com.amazonaws.services.s3.transfer.TransferManager; -import com.amazonaws.services.s3.transfer.TransferManagerBuilder; -import org.apache.commons.lang3.StringUtils; +import static org.apache.dolphinscheduler.common.Constants.AWS_END_POINT; +import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR; +import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE; +import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF; +import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3; + +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.ResUploadType; import org.apache.dolphinscheduler.common.storage.StorageOperate; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.spi.enums.ResourceType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.StringUtils; import java.io.BufferedReader; import java.io.ByteArrayInputStream; @@ -54,14 +47,25 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.dolphinscheduler.common.Constants.AWS_END_POINT; -import static org.apache.dolphinscheduler.common.Constants.BUCKET_NAME; -import static org.apache.dolphinscheduler.common.Constants.FOLDER_SEPARATOR; -import static org.apache.dolphinscheduler.common.Constants.FORMAT_S_S; -import static org.apache.dolphinscheduler.common.Constants.RESOURCE_STORAGE_TYPE; -import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_FILE; -import static org.apache.dolphinscheduler.common.Constants.RESOURCE_TYPE_UDF; -import static org.apache.dolphinscheduler.common.Constants.STORAGE_S3; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.Bucket; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.transfer.MultipleFileDownload; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.TransferManagerBuilder; public class S3Utils implements Closeable, StorageOperate { @@ -73,6 +77,8 @@ public class S3Utils implements Closeable, StorageOperate { public static final String REGION = PropertyUtils.getString(TaskConstants.AWS_REGION); + public static final String BUCKET_NAME = PropertyUtils.getString(Constants.AWS_S3_BUCKET_NAME); + private AmazonS3 s3Client = null; @@ -81,19 +87,19 @@ public class S3Utils implements Closeable, StorageOperate { if (!StringUtils.isEmpty(PropertyUtils.getString(AWS_END_POINT))) { s3Client = AmazonS3ClientBuilder - .standard() - .withPathStyleAccessEnabled(true) - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(PropertyUtils.getString(AWS_END_POINT), Regions.fromName(REGION).getName())) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID))) - .build(); + .standard() + .withPathStyleAccessEnabled(true) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(PropertyUtils.getString(AWS_END_POINT), Regions.fromName(REGION).getName())) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID))) + .build(); } else { s3Client = AmazonS3ClientBuilder - .standard() - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID))) - .withRegion(Regions.fromName(REGION)) - .build(); + .standard() + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY_ID))) + .withRegion(Regions.fromName(REGION)) + .build(); } - checkBucketNameIfNotPresent(BUCKET_NAME); + checkBucketNameExists(BUCKET_NAME); } } @@ -125,24 +131,31 @@ public class S3Utils implements Closeable, StorageOperate { @Override public void createTenantDirIfNotExists(String tenantCode) throws Exception { - createFolder(tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_UDF); - createFolder(tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_FILE); + getInstance().mkdir(tenantCode, getS3ResDir(tenantCode)); + getInstance().mkdir(tenantCode, getS3UdfDir(tenantCode)); } @Override public String getResDir(String tenantCode) { - return tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_FILE+FOLDER_SEPARATOR; + return getS3ResDir(tenantCode) + FOLDER_SEPARATOR; } @Override public String getUdfDir(String tenantCode) { - return tenantCode+ FOLDER_SEPARATOR +RESOURCE_TYPE_UDF+FOLDER_SEPARATOR; + return getS3UdfDir(tenantCode) + FOLDER_SEPARATOR; } @Override public boolean mkdir(String tenantCode, String path) throws IOException { - createFolder(path); - return true; + String objectName = path + FOLDER_SEPARATOR; + if (!s3Client.doesObjectExist(BUCKET_NAME, objectName)) { + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(0); + InputStream emptyContent = new ByteArrayInputStream(new byte[0]); + PutObjectRequest putObjectRequest = new PutObjectRequest(BUCKET_NAME, objectName, emptyContent, metadata); + s3Client.putObject(putObjectRequest); + } + return true; } @Override @@ -150,21 +163,22 @@ public class S3Utils implements Closeable, StorageOperate { if (fileName.startsWith(FOLDER_SEPARATOR)) { fileName = fileName.replaceFirst(FOLDER_SEPARATOR, ""); } - return String.format(FORMAT_S_S, tenantCode+FOLDER_SEPARATOR+RESOURCE_TYPE_FILE, fileName); + return String.format(FORMAT_S_S, getS3ResDir(tenantCode), fileName); } + @Override public String getFileName(ResourceType resourceType, String tenantCode, String fileName) { if (fileName.startsWith(FOLDER_SEPARATOR)) { fileName = fileName.replaceFirst(FOLDER_SEPARATOR, ""); } - return getDir(resourceType, tenantCode)+fileName; + return getDir(resourceType, tenantCode) + fileName; } @Override public void download(String tenantCode, String srcFilePath, String dstFile, boolean deleteSource, boolean overwrite) throws IOException { S3Object o = s3Client.getObject(BUCKET_NAME, srcFilePath); try (S3ObjectInputStream s3is = o.getObjectContent(); - FileOutputStream fos = new FileOutputStream(new File(dstFile))) { + FileOutputStream fos = new FileOutputStream(dstFile)) { byte[] readBuf = new byte[1024]; int readLen = 0; while ((readLen = s3is.read(readBuf)) > 0) { @@ -210,7 +224,7 @@ public class S3Utils implements Closeable, StorageOperate { case FILE: return getResDir(tenantCode); default: - return tenantCode+ FOLDER_SEPARATOR ; + return ""; } } @@ -221,33 +235,21 @@ public class S3Utils implements Closeable, StorageOperate { s3Client.putObject(BUCKET_NAME, dstPath, new File(srcFile)); return true; } catch (AmazonServiceException e) { - logger.error("upload failed,the bucketName is {},the dstPath is {}", BUCKET_NAME, tenantCode+ FOLDER_SEPARATOR +dstPath); + logger.error("upload failed,the bucketName is {},the filePath is {}", BUCKET_NAME, dstPath); return false; } } - @Override - public List vimFile(String tenantCode,String filePath, int skipLineNums, int limit) throws IOException { + public List vimFile(String tenantCode, String filePath, int skipLineNums, int limit) throws IOException { if (StringUtils.isBlank(filePath)) { logger.error("file path:{} is blank", filePath); return Collections.emptyList(); } - S3Object s3Object=s3Client.getObject(BUCKET_NAME,filePath); - try(BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))){ - Stream stream = bufferedReader.lines().skip(skipLineNums).limit(limit); - return stream.collect(Collectors.toList()); - } - } - - private void - createFolder( String folderName) { - if (!s3Client.doesObjectExist(BUCKET_NAME, folderName + FOLDER_SEPARATOR)) { - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(0); - InputStream emptyContent = new ByteArrayInputStream(new byte[0]); - PutObjectRequest putObjectRequest = new PutObjectRequest(BUCKET_NAME, folderName + FOLDER_SEPARATOR, emptyContent, metadata); - s3Client.putObject(putObjectRequest); + S3Object s3Object = s3Client.getObject(BUCKET_NAME, filePath); + try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent()))) { + Stream stream = bufferedReader.lines().skip(skipLineNums).limit(limit); + return stream.collect(Collectors.toList()); } } @@ -256,6 +258,47 @@ public class S3Utils implements Closeable, StorageOperate { deleteTenantCode(tenantCode); } + /** + * S3 resource dir + * + * @param tenantCode tenant code + * @return S3 resource dir + */ + public static String getS3ResDir(String tenantCode) { + return String.format("%s/" + RESOURCE_TYPE_FILE, getS3TenantDir(tenantCode)); + } + + /** + * S3 udf dir + * + * @param tenantCode tenant code + * @return get udf dir on S3 + */ + public static String getS3UdfDir(String tenantCode) { + return String.format("%s/" + RESOURCE_TYPE_UDF, getS3TenantDir(tenantCode)); + } + + /** + * @param tenantCode tenant code + * @return file directory of tenants on S3 + */ + public static String getS3TenantDir(String tenantCode) { + return String.format(FORMAT_S_S, getS3DataBasePath(), tenantCode); + } + + /** + * get data S3 path + * + * @return data S3 path + */ + public static String getS3DataBasePath() { + if (FOLDER_SEPARATOR.equals(RESOURCE_UPLOAD_PATH)) { + return ""; + } else { + return RESOURCE_UPLOAD_PATH.replaceFirst(FOLDER_SEPARATOR, ""); + } + } + private void deleteTenantCode(String tenantCode) { deleteDirectory(getResDir(tenantCode)); deleteDirectory(getUdfDir(tenantCode)); @@ -264,25 +307,27 @@ public class S3Utils implements Closeable, StorageOperate { /** * xxx untest * upload local directory to S3 + * * @param tenantCode * @param keyPrefix the name of directory * @param strPath */ private void uploadDirectory(String tenantCode, String keyPrefix, String strPath) { - s3Client.putObject(BUCKET_NAME, tenantCode+ FOLDER_SEPARATOR +keyPrefix, new File(strPath)); + s3Client.putObject(BUCKET_NAME, tenantCode + FOLDER_SEPARATOR + keyPrefix, new File(strPath)); } /** * xxx untest * download S3 Directory to local + * * @param tenantCode * @param keyPrefix the name of directory * @param srcPath */ - private void downloadDirectory(String tenantCode, String keyPrefix, String srcPath){ - TransferManager tm= TransferManagerBuilder.standard().withS3Client(s3Client).build(); - try{ + private void downloadDirectory(String tenantCode, String keyPrefix, String srcPath) { + TransferManager tm = TransferManagerBuilder.standard().withS3Client(s3Client).build(); + try { MultipleFileDownload download = tm.downloadDirectory(BUCKET_NAME, tenantCode + FOLDER_SEPARATOR + keyPrefix, new File(srcPath)); download.waitForCompletion(); } catch (AmazonS3Exception | InterruptedException e) { @@ -293,15 +338,26 @@ public class S3Utils implements Closeable, StorageOperate { } } - public void checkBucketNameIfNotPresent(String bucketName) { - if (!s3Client.doesBucketExistV2(bucketName)) { - logger.info("the current regionName is {}", s3Client.getRegionName()); - s3Client.createBucket(bucketName); + public void checkBucketNameExists(String bucketName) { + if (StringUtils.isBlank(bucketName)) { + throw new IllegalArgumentException("resource.aws.s3.bucket.name is blank"); } + + Bucket existsBucket = s3Client.listBuckets() + .stream() + .filter( + bucket -> bucket.getName().equals(bucketName) + ) + .findFirst() + .orElseThrow(() -> { + return new IllegalArgumentException("bucketName: " + bucketName + " is not exists, you need to create them by yourself"); + }); + + logger.info("bucketName: {} has been found, the current regionName is {}", existsBucket.getName(), s3Client.getRegionName()); } - /* - only delete the object of directory ,it`s better to delete the files in it -r + /** + * only delete the object of directory ,it`s better to delete the files in it -r */ private void deleteDirectory(String directoryName) { if (s3Client.doesObjectExist(BUCKET_NAME, directoryName)) { diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 13c7d3a479..97355f8811 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -18,11 +18,29 @@ # user data local directory path, please make sure the directory exists and have read write permissions data.basedir.path=/tmp/dolphinscheduler +# resource view suffixs +#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js + # resource storage type: HDFS, S3, NONE resource.storage.type=NONE +# resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended +resource.storage.upload.base.path=/dolphinscheduler + +# The AWS access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required +resource.aws.access.key.id=minioadmin +# The AWS secret access key. if resource.storage.type=S3 or use EMR-Task, This configuration is required +resource.aws.secret.access.key=minioadmin +# The AWS Region to use. if resource.storage.type=S3 or use EMR-Task, This configuration is required +resource.aws.region=cn-north-1 +# The name of the bucket. You need to create them by yourself. Otherwise, the system cannot start. All buckets in Amazon S3 share a single namespace; ensure the bucket is given a unique name. +resource.aws.s3.bucket.name=dolphinscheduler +# You need to set this parameter when private cloud s3. If S3 uses public cloud, you only need to set resource.aws.region or set to the endpoint of a public cloud such as S3.cn-north-1.amazonaws.com.cn +resource.aws.s3.endpoint=http://localhost:9000 -# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended -resource.upload.path=/dolphinscheduler +# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path +resource.hdfs.root.user=hdfs +# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir +resource.hdfs.fs.defaultFS=hdfs://mycluster:8020 # whether to startup kerberos hadoop.security.authentication.startup.state=false @@ -38,16 +56,8 @@ login.user.keytab.path=/opt/hdfs.headless.keytab # kerberos expire time, the unit is hour kerberos.expire.time=2 -# resource view suffixs -#resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js -# if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path -hdfs.root.user=hdfs -# if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir -fs.defaultFS=hdfs://mycluster:8020 -aws.access.key.id=minioadmin -aws.secret.access.key=minioadmin -aws.region=us-east-1 -aws.endpoint=http://localhost:9000 + + # resourcemanager port, the default value is 8088 if not specified resource.manager.httpaddress.port=8088 # if resourcemanager HA is enabled, please set the HA IPs; if resourcemanager is single, keep this value empty diff --git a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties index dcf839ff76..c8a3c32433 100644 --- a/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties +++ b/dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties @@ -22,7 +22,7 @@ data.basedir.path=/tmp/dolphinscheduler resource.storage.type=S3 # resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended -resource.upload.path=/dolphinscheduler +resource.storage.upload.base.path=/dolphinscheduler # whether to startup kerberos hadoop.security.authentication.startup.state=false @@ -43,10 +43,10 @@ kerberos.expire.time=2 #resource.view.suffixs=txt,log,sh,bat,conf,cfg,py,java,sql,xml,hql,properties,json,yml,yaml,ini,js # if resource.storage.type=HDFS, the user must have the permission to create directories under the HDFS root path -hdfs.root.user=hdfs +resource.hdfs.root.user=hdfs # if resource.storage.type=S3, the value like: s3a://dolphinscheduler; if resource.storage.type=HDFS and namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir -fs.defaultFS=s3a://dolphinscheduler +resource.hdfs.fs.defaultFS=s3a://dolphinscheduler # resourcemanager port, the default value is 8088 if not specified @@ -81,10 +81,11 @@ sudo.enable=true development.state=false # rpc port alert.rpc.port=50052 -aws.access.key.id=accessKey123 -aws.secret.access.key=secretKey123 -aws.region=us-east-1 -aws.endpoint=http://s3:9000 +resource.aws.access.key.id=accessKey123 +resource.aws.secret.access.key=secretKey123 +resource.aws.region=us-east-1 +resource.aws.s3.bucket.name=dolphinscheduler +resource.aws.s3.endpoint=http://s3:9000 # Task resource limit state task.resource.limit.state=false \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java index c1a1ddadc1..1df63f8a27 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java @@ -365,9 +365,9 @@ public class TaskConstants { /** * hdfs/s3 configuration - * resource.upload.path + * resource.storage.upload.base.path */ - public static final String RESOURCE_UPLOAD_PATH = "resource.upload.path"; + public static final String RESOURCE_UPLOAD_PATH = "resource.storage.upload.base.path"; /** * data.quality.jar.name @@ -400,9 +400,9 @@ public class TaskConstants { /** * aws config */ - public static final String AWS_ACCESS_KEY_ID= "aws.access.key.id"; - public static final String AWS_SECRET_ACCESS_KEY= "aws.secret.access.key"; - public static final String AWS_REGION = "aws.region"; + public static final String AWS_ACCESS_KEY_ID = "resource.aws.access.key.id"; + public static final String AWS_SECRET_ACCESS_KEY = "resource.aws.secret.access.key"; + public static final String AWS_REGION = "resource.aws.region"; /** * use for k8s task @@ -424,7 +424,7 @@ public class TaskConstants { /** * zeppelin config */ - public static final String ZEPPELIN_REST_URL= "zeppelin.rest.url"; + public static final String ZEPPELIN_REST_URL = "zeppelin.rest.url"; /** * conda config used by jupyter task plugin