diff --git a/docs/docs/en/guide/remote-logging.md b/docs/docs/en/guide/remote-logging.md index add7a58485..205c80405f 100644 --- a/docs/docs/en/guide/remote-logging.md +++ b/docs/docs/en/guide/remote-logging.md @@ -33,3 +33,20 @@ remote.logging.oss.bucket.name= remote.logging.oss.endpoint= ``` +## Writing task logs to [Amazon S3](https://aws.amazon.com/cn/s3/) + +Configure `common.properties` as follows: + +```properties +# s3 access key id, required if you set remote.logging.target=S3 +remote.logging.s3.access.key.id= +# s3 access key secret, required if you set remote.logging.target=S3 +remote.logging.s3.access.key.secret= +# s3 bucket name, required if you set remote.logging.target=S3 +remote.logging.s3.bucket.name= +# s3 endpoint, required if you set remote.logging.target=S3 +remote.logging.s3.endpoint= +# s3 region, required if you set remote.logging.target=S3 +remote.logging.s3.region= +``` + diff --git a/docs/docs/zh/guide/remote-logging.md b/docs/docs/zh/guide/remote-logging.md index 0adde98e7a..9accca4bac 100644 --- a/docs/docs/zh/guide/remote-logging.md +++ b/docs/docs/zh/guide/remote-logging.md @@ -10,7 +10,7 @@ Apache DolphinScheduler支持将任务日志传输到远端存储上。当配置 ```properties # 是否开启远程日志存储 remote.logging.enable=true -# 任务日志写入的远端存储,目前仅支持OSS +# 任务日志写入的远端存储,目前支持OSS, S3 remote.logging.target=OSS # 任务日志在远端存储上的目录 remote.logging.base.dir=logs @@ -33,3 +33,20 @@ remote.logging.oss.bucket.name= remote.logging.oss.endpoint= ``` +## 将任务日志写入[Amazon S3](https://aws.amazon.com/cn/s3/) + +配置`common.propertis`如下: + +```properties +# s3 access key id, required if you set remote.logging.target=S3 +remote.logging.s3.access.key.id= +# s3 access key secret, required if you set remote.logging.target=S3 +remote.logging.s3.access.key.secret= +# s3 bucket name, required if you set remote.logging.target=S3 +remote.logging.s3.bucket.name= +# s3 endpoint, required if you set remote.logging.target=S3 +remote.logging.s3.endpoint= +# s3 region, required if you set remote.logging.target=S3 +remote.logging.s3.region= +``` + diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index 652e15dc19..a67425b51c 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -88,6 +88,11 @@ aliyun-sdk-oss + + com.amazonaws + aws-java-sdk-s3 + + com.github.oshi oshi-core diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java index 792746bddb..8b517a5ce8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java @@ -829,4 +829,18 @@ public final class Constants { public static final String REMOTE_LOGGING_OSS_BUCKET_NAME = "remote.logging.oss.bucket.name"; public static final String REMOTE_LOGGING_OSS_ENDPOINT = "remote.logging.oss.endpoint"; + + /** + * remote logging for S3 + */ + + public static final String REMOTE_LOGGING_S3_ACCESS_KEY_ID = "remote.logging.s3.access.key.id"; + + public static final String REMOTE_LOGGING_S3_ACCESS_KEY_SECRET = "remote.logging.s3.access.key.secret"; + + public static final String REMOTE_LOGGING_S3_BUCKET_NAME = "remote.logging.s3.bucket.name"; + + public static final String REMOTE_LOGGING_S3_ENDPOINT = "remote.logging.s3.endpoint"; + + public static final String REMOTE_LOGGING_S3_REGION = "remote.logging.s3.region"; } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandler.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandler.java index a8d19af98c..b7cd131e04 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandler.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandler.java @@ -27,8 +27,6 @@ import org.apache.commons.lang3.StringUtils; import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.nio.file.Path; -import java.nio.file.Paths; import lombok.extern.slf4j.Slf4j; @@ -40,13 +38,23 @@ import com.aliyun.oss.model.PutObjectRequest; @Slf4j public class OssRemoteLogHandler implements RemoteLogHandler, Closeable { - private static final int OBJECT_NAME_COUNT = 2; - private OSS ossClient; private String bucketName; - public OssRemoteLogHandler() { + private static OssRemoteLogHandler instance; + + private OssRemoteLogHandler() { + + } + + public static synchronized OssRemoteLogHandler getInstance() { + if (instance == null) { + instance = new OssRemoteLogHandler(); + instance.init(); + } + + return instance; } public void init() { @@ -61,7 +69,7 @@ public class OssRemoteLogHandler implements RemoteLogHandler, Closeable { @Override public void sendRemoteLog(String logPath) { - String objectName = getObjectNameFromLogPath(logPath); + String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath); try { log.info("send remote log {} to OSS {}", logPath, objectName); @@ -74,7 +82,7 @@ public class OssRemoteLogHandler implements RemoteLogHandler, Closeable { @Override public void getRemoteLog(String logPath) { - String objectName = getObjectNameFromLogPath(logPath); + String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath); try { log.info("get remote log on OSS {} to {}", objectName, logPath); @@ -91,18 +99,6 @@ public class OssRemoteLogHandler implements RemoteLogHandler, Closeable { } } - private String getObjectNameFromLogPath(String logPath) { - Path path = Paths.get(logPath); - int nameCount = path.getNameCount(); - - if (nameCount < OBJECT_NAME_COUNT) { - return Paths.get(readOssBaseDir(), logPath).toString(); - } else { - return Paths.get(readOssBaseDir(), path.subpath(nameCount - OBJECT_NAME_COUNT, nameCount).toString()) - .toString(); - } - } - private void checkBucketNameExists(String bucketName) { if (StringUtils.isBlank(bucketName)) { throw new IllegalArgumentException(Constants.REMOTE_LOGGING_OSS_BUCKET_NAME + " is empty"); @@ -136,8 +132,4 @@ public class OssRemoteLogHandler implements RemoteLogHandler, Closeable { private String readOssBucketName() { return PropertyUtils.getString(Constants.REMOTE_LOGGING_OSS_BUCKET_NAME); } - - private String readOssBaseDir() { - return PropertyUtils.getString(Constants.REMOTE_LOGGING_BASE_DIR); - } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java index a132db8d14..328f7768d8 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java @@ -21,19 +21,25 @@ import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.utils.PropertyUtils; import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; @UtilityClass +@Slf4j public class RemoteLogHandlerFactory { public RemoteLogHandler getRemoteLogHandler() { if (!RemoteLogUtils.isRemoteLoggingEnable()) { return null; } - if (!"OSS".equals(PropertyUtils.getUpperCaseString(Constants.REMOTE_LOGGING_TARGET))) { - return null; + + String target = PropertyUtils.getUpperCaseString(Constants.REMOTE_LOGGING_TARGET); + if ("OSS".equals(target)) { + return OssRemoteLogHandler.getInstance(); + } else if ("S3".equals(target)) { + return S3RemoteLogHandler.getInstance(); } - OssRemoteLogHandler ossRemoteLogHandler = new OssRemoteLogHandler(); - ossRemoteLogHandler.init(); - return ossRemoteLogHandler; + + log.error("No suitable remote logging target for {}", target); + return null; } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogService.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogService.java index 4097675eb1..1e7447e878 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogService.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogService.java @@ -37,11 +37,10 @@ public class RemoteLogService { RemoteLogHandler remoteLogHandler = RemoteLogHandlerFactory.getRemoteLogHandler(); if (remoteLogHandler == null) { - log.error("remote log handler is null"); return; } remoteLogHandler.sendRemoteLog(logPath); - log.info("Succeed to send log {} to remote target {}", logPath, + log.info("End send log {} to remote target {}", logPath, PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET)); } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java index 9d81a1bfd1..7df90a9dec 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java @@ -36,6 +36,8 @@ public class RemoteLogUtils { private static RemoteLogService remoteLogService; + private static final int OBJECT_NAME_COUNT = 2; + @Autowired private RemoteLogService autowiredRemoteLogService; @@ -59,7 +61,6 @@ public class RemoteLogUtils { mkdirOfLog(logPath); RemoteLogHandler remoteLogHandler = RemoteLogHandlerFactory.getRemoteLogHandler(); if (remoteLogHandler == null) { - log.error("remote log handler is null"); return; } remoteLogHandler.getRemoteLog(logPath); @@ -76,4 +77,18 @@ public class RemoteLogUtils { public static boolean isRemoteLoggingEnable() { return PropertyUtils.getBoolean(Constants.REMOTE_LOGGING_ENABLE, Boolean.FALSE); } + + public static String getObjectNameFromLogPath(String logPath) { + Path path = Paths.get(logPath); + int nameCount = path.getNameCount(); + + String logBaseDir = PropertyUtils.getString(Constants.REMOTE_LOGGING_BASE_DIR); + + if (nameCount < OBJECT_NAME_COUNT) { + return Paths.get(logBaseDir, logPath).toString(); + } else { + return Paths.get(logBaseDir, path.subpath(nameCount - OBJECT_NAME_COUNT, nameCount).toString()) + .toString(); + } + } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/S3RemoteLogHandler.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/S3RemoteLogHandler.java new file mode 100644 index 0000000000..00c632ecda --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/S3RemoteLogHandler.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.log.remote; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; + +import org.apache.commons.lang3.StringUtils; + +import java.io.Closeable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import lombok.extern.slf4j.Slf4j; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.Bucket; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; + +@Slf4j +public class S3RemoteLogHandler implements RemoteLogHandler, Closeable { + + private String accessKeyId; + + private String accessKeySecret; + + private String region; + + private String bucketName; + + private String endPoint; + + private AmazonS3 s3Client; + + private static S3RemoteLogHandler instance; + + private S3RemoteLogHandler() { + + } + + public static synchronized S3RemoteLogHandler getInstance() { + if (instance == null) { + instance = new S3RemoteLogHandler(); + instance.init(); + } + + return instance; + } + + public void init() { + accessKeyId = readAccessKeyID(); + accessKeySecret = readAccessKeySecret(); + region = readRegion(); + bucketName = readBucketName(); + endPoint = readEndPoint(); + s3Client = buildS3Client(); + checkBucketNameExists(bucketName); + } + + protected AmazonS3 buildS3Client() { + if (StringUtils.isNotEmpty(endPoint)) { + return AmazonS3ClientBuilder + .standard() + .withPathStyleAccessEnabled(true) + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + endPoint, Regions.fromName(region).getName())) + .withCredentials( + new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, accessKeySecret))) + .build(); + } else { + return AmazonS3ClientBuilder + .standard() + .withCredentials( + new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, accessKeySecret))) + .withRegion(Regions.fromName(region)) + .build(); + } + } + + @Override + public void close() throws IOException { + s3Client.shutdown(); + } + + @Override + public void sendRemoteLog(String logPath) { + String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath); + + try { + log.info("send remote log {} to S3 {}", logPath, objectName); + s3Client.putObject(bucketName, objectName, new File(logPath)); + } catch (Exception e) { + log.error("error while sending remote log {} to S3 {}", logPath, objectName, e); + } + } + + @Override + public void getRemoteLog(String logPath) { + String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath); + + try { + log.info("get remote log on S3 {} to {}", objectName, logPath); + S3Object o = s3Client.getObject(bucketName, objectName); + try ( + S3ObjectInputStream s3is = o.getObjectContent(); + FileOutputStream fos = new FileOutputStream(logPath)) { + byte[] readBuf = new byte[1024]; + int readLen = 0; + while ((readLen = s3is.read(readBuf)) > 0) { + fos.write(readBuf, 0, readLen); + } + } + } catch (Exception e) { + log.error("error while getting remote log on S3 {} to {}", objectName, logPath, e); + } + } + + protected String readAccessKeyID() { + return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_ACCESS_KEY_ID); + } + + protected String readAccessKeySecret() { + return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_ACCESS_KEY_SECRET); + } + + protected String readRegion() { + return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_REGION); + } + + protected String readBucketName() { + return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_BUCKET_NAME); + } + + protected String readEndPoint() { + return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_ENDPOINT); + } + + public void checkBucketNameExists(String bucketName) { + if (StringUtils.isBlank(bucketName)) { + throw new IllegalArgumentException("remote.logging.s3.bucket.name is blank"); + } + + Bucket existsBucket = s3Client.listBuckets() + .stream() + .filter( + bucket -> bucket.getName().equals(bucketName)) + .findFirst() + .orElseThrow(() -> { + return new IllegalArgumentException( + "bucketName: " + bucketName + " is not exists, you need to create them by yourself"); + }); + + log.info("bucketName: {} has been found, the current regionName is {}", existsBucket.getName(), + s3Client.getRegionName()); + } +} diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 513fe53559..23ce6b94df 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -158,4 +158,14 @@ remote.logging.oss.access.key.secret= remote.logging.oss.bucket.name= # oss endpoint, required if you set remote.logging.target=OSS remote.logging.oss.endpoint= +# s3 access key id, required if you set remote.logging.target=S3 +remote.logging.s3.access.key.id= +# s3 access key secret, required if you set remote.logging.target=S3 +remote.logging.s3.access.key.secret= +# s3 bucket name, required if you set remote.logging.target=S3 +remote.logging.s3.bucket.name= +# s3 endpoint, required if you set remote.logging.target=S3 +remote.logging.s3.endpoint= +# s3 region, required if you set remote.logging.target=S3 +remote.logging.s3.region= diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandlerTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerTest.java similarity index 79% rename from dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandlerTest.java rename to dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerTest.java index 726a262938..6513f0c190 100644 --- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandlerTest.java +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerTest.java @@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.common.log.remote; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.common.utils.PropertyUtils; -import java.lang.reflect.Method; - import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -30,22 +28,18 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) -public class OssRemoteLogHandlerTest { +public class RemoteLogHandlerTest { @Test - public void testGetObjectNameFromLogPath() throws Exception { + public void testGetObjectNameFromLogPath() { final String logPath = "/path/to/dolphinscheduler/logs/20230116/8245922982496_1-1-3.log"; final String expectedObjectName = "logs/20230116/8245922982496_1-1-3.log"; - OssRemoteLogHandler ossRemoteLogHandler = new OssRemoteLogHandler(); - try (MockedStatic propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class)) { propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_BASE_DIR)) .thenReturn("logs"); - Method method = OssRemoteLogHandler.class.getDeclaredMethod("getObjectNameFromLogPath", String.class); - method.setAccessible(true); - String objectName = (String) method.invoke(ossRemoteLogHandler, logPath); + String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath); Assertions.assertEquals(expectedObjectName, objectName); }