Browse Source

[Feature-13419][Remote Logging] Add support for writing task logs to S3 (#13649)

* [Feature-13419][Remote Logging] Add support for writing task logs to S3

* Update dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/S3RemoteLogHandler.java

Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>

* use try with resource

* remove unnecessary log

---------

Co-authored-by: caishunfeng <caishunfeng2021@gmail.com>
3.2.0-release
Rick Cheng 2 years ago committed by GitHub
parent
commit
aa79c01ce9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 17
      docs/docs/en/guide/remote-logging.md
  2. 19
      docs/docs/zh/guide/remote-logging.md
  3. 5
      dolphinscheduler-common/pom.xml
  4. 14
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
  5. 38
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandler.java
  6. 16
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java
  7. 3
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogService.java
  8. 17
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java
  9. 178
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/S3RemoteLogHandler.java
  10. 10
      dolphinscheduler-common/src/main/resources/common.properties
  11. 12
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerTest.java

17
docs/docs/en/guide/remote-logging.md

@ -33,3 +33,20 @@ remote.logging.oss.bucket.name=<bucket.name>
remote.logging.oss.endpoint=<endpoint>
```
## Writing task logs to [Amazon S3](https://aws.amazon.com/cn/s3/)
Configure `common.properties` as follows:
```properties
# s3 access key id, required if you set remote.logging.target=S3
remote.logging.s3.access.key.id=<access.key.id>
# s3 access key secret, required if you set remote.logging.target=S3
remote.logging.s3.access.key.secret=<access.key.secret>
# s3 bucket name, required if you set remote.logging.target=S3
remote.logging.s3.bucket.name=<bucket.name>
# s3 endpoint, required if you set remote.logging.target=S3
remote.logging.s3.endpoint=<endpoint>
# s3 region, required if you set remote.logging.target=S3
remote.logging.s3.region=<region>
```

19
docs/docs/zh/guide/remote-logging.md

@ -10,7 +10,7 @@ Apache DolphinScheduler支持将任务日志传输到远端存储上。当配置
```properties
# 是否开启远程日志存储
remote.logging.enable=true
# 任务日志写入的远端存储,目前支持OSS
# 任务日志写入的远端存储,目前支持OSS, S3
remote.logging.target=OSS
# 任务日志在远端存储上的目录
remote.logging.base.dir=logs
@ -33,3 +33,20 @@ remote.logging.oss.bucket.name=<bucket.name>
remote.logging.oss.endpoint=<endpoint>
```
## 将任务日志写入[Amazon S3](https://aws.amazon.com/cn/s3/)
配置`common.propertis`如下:
```properties
# s3 access key id, required if you set remote.logging.target=S3
remote.logging.s3.access.key.id=<access.key.id>
# s3 access key secret, required if you set remote.logging.target=S3
remote.logging.s3.access.key.secret=<access.key.secret>
# s3 bucket name, required if you set remote.logging.target=S3
remote.logging.s3.bucket.name=<bucket.name>
# s3 endpoint, required if you set remote.logging.target=S3
remote.logging.s3.endpoint=<endpoint>
# s3 region, required if you set remote.logging.target=S3
remote.logging.s3.region=<region>
```

5
dolphinscheduler-common/pom.xml

@ -88,6 +88,11 @@
<artifactId>aliyun-sdk-oss</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>
<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>

14
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

@ -829,4 +829,18 @@ public final class Constants {
public static final String REMOTE_LOGGING_OSS_BUCKET_NAME = "remote.logging.oss.bucket.name";
public static final String REMOTE_LOGGING_OSS_ENDPOINT = "remote.logging.oss.endpoint";
/**
* remote logging for S3
*/
public static final String REMOTE_LOGGING_S3_ACCESS_KEY_ID = "remote.logging.s3.access.key.id";
public static final String REMOTE_LOGGING_S3_ACCESS_KEY_SECRET = "remote.logging.s3.access.key.secret";
public static final String REMOTE_LOGGING_S3_BUCKET_NAME = "remote.logging.s3.bucket.name";
public static final String REMOTE_LOGGING_S3_ENDPOINT = "remote.logging.s3.endpoint";
public static final String REMOTE_LOGGING_S3_REGION = "remote.logging.s3.region";
}

38
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandler.java

@ -27,8 +27,6 @@ import org.apache.commons.lang3.StringUtils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import lombok.extern.slf4j.Slf4j;
@ -40,13 +38,23 @@ import com.aliyun.oss.model.PutObjectRequest;
@Slf4j
public class OssRemoteLogHandler implements RemoteLogHandler, Closeable {
private static final int OBJECT_NAME_COUNT = 2;
private OSS ossClient;
private String bucketName;
public OssRemoteLogHandler() {
private static OssRemoteLogHandler instance;
private OssRemoteLogHandler() {
}
public static synchronized OssRemoteLogHandler getInstance() {
if (instance == null) {
instance = new OssRemoteLogHandler();
instance.init();
}
return instance;
}
public void init() {
@ -61,7 +69,7 @@ public class OssRemoteLogHandler implements RemoteLogHandler, Closeable {
@Override
public void sendRemoteLog(String logPath) {
String objectName = getObjectNameFromLogPath(logPath);
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);
try {
log.info("send remote log {} to OSS {}", logPath, objectName);
@ -74,7 +82,7 @@ public class OssRemoteLogHandler implements RemoteLogHandler, Closeable {
@Override
public void getRemoteLog(String logPath) {
String objectName = getObjectNameFromLogPath(logPath);
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);
try {
log.info("get remote log on OSS {} to {}", objectName, logPath);
@ -91,18 +99,6 @@ public class OssRemoteLogHandler implements RemoteLogHandler, Closeable {
}
}
private String getObjectNameFromLogPath(String logPath) {
Path path = Paths.get(logPath);
int nameCount = path.getNameCount();
if (nameCount < OBJECT_NAME_COUNT) {
return Paths.get(readOssBaseDir(), logPath).toString();
} else {
return Paths.get(readOssBaseDir(), path.subpath(nameCount - OBJECT_NAME_COUNT, nameCount).toString())
.toString();
}
}
private void checkBucketNameExists(String bucketName) {
if (StringUtils.isBlank(bucketName)) {
throw new IllegalArgumentException(Constants.REMOTE_LOGGING_OSS_BUCKET_NAME + " is empty");
@ -136,8 +132,4 @@ public class OssRemoteLogHandler implements RemoteLogHandler, Closeable {
private String readOssBucketName() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_OSS_BUCKET_NAME);
}
private String readOssBaseDir() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_BASE_DIR);
}
}

16
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java

@ -21,19 +21,25 @@ import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
@UtilityClass
@Slf4j
public class RemoteLogHandlerFactory {
public RemoteLogHandler getRemoteLogHandler() {
if (!RemoteLogUtils.isRemoteLoggingEnable()) {
return null;
}
if (!"OSS".equals(PropertyUtils.getUpperCaseString(Constants.REMOTE_LOGGING_TARGET))) {
return null;
String target = PropertyUtils.getUpperCaseString(Constants.REMOTE_LOGGING_TARGET);
if ("OSS".equals(target)) {
return OssRemoteLogHandler.getInstance();
} else if ("S3".equals(target)) {
return S3RemoteLogHandler.getInstance();
}
OssRemoteLogHandler ossRemoteLogHandler = new OssRemoteLogHandler();
ossRemoteLogHandler.init();
return ossRemoteLogHandler;
log.error("No suitable remote logging target for {}", target);
return null;
}
}

3
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogService.java

@ -37,11 +37,10 @@ public class RemoteLogService {
RemoteLogHandler remoteLogHandler = RemoteLogHandlerFactory.getRemoteLogHandler();
if (remoteLogHandler == null) {
log.error("remote log handler is null");
return;
}
remoteLogHandler.sendRemoteLog(logPath);
log.info("Succeed to send log {} to remote target {}", logPath,
log.info("End send log {} to remote target {}", logPath,
PropertyUtils.getString(Constants.REMOTE_LOGGING_TARGET));
}
}

17
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogUtils.java

@ -36,6 +36,8 @@ public class RemoteLogUtils {
private static RemoteLogService remoteLogService;
private static final int OBJECT_NAME_COUNT = 2;
@Autowired
private RemoteLogService autowiredRemoteLogService;
@ -59,7 +61,6 @@ public class RemoteLogUtils {
mkdirOfLog(logPath);
RemoteLogHandler remoteLogHandler = RemoteLogHandlerFactory.getRemoteLogHandler();
if (remoteLogHandler == null) {
log.error("remote log handler is null");
return;
}
remoteLogHandler.getRemoteLog(logPath);
@ -76,4 +77,18 @@ public class RemoteLogUtils {
public static boolean isRemoteLoggingEnable() {
return PropertyUtils.getBoolean(Constants.REMOTE_LOGGING_ENABLE, Boolean.FALSE);
}
public static String getObjectNameFromLogPath(String logPath) {
Path path = Paths.get(logPath);
int nameCount = path.getNameCount();
String logBaseDir = PropertyUtils.getString(Constants.REMOTE_LOGGING_BASE_DIR);
if (nameCount < OBJECT_NAME_COUNT) {
return Paths.get(logBaseDir, logPath).toString();
} else {
return Paths.get(logBaseDir, path.subpath(nameCount - OBJECT_NAME_COUNT, nameCount).toString())
.toString();
}
}
}

178
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/S3RemoteLogHandler.java

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.log.remote;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
@Slf4j
public class S3RemoteLogHandler implements RemoteLogHandler, Closeable {
private String accessKeyId;
private String accessKeySecret;
private String region;
private String bucketName;
private String endPoint;
private AmazonS3 s3Client;
private static S3RemoteLogHandler instance;
private S3RemoteLogHandler() {
}
public static synchronized S3RemoteLogHandler getInstance() {
if (instance == null) {
instance = new S3RemoteLogHandler();
instance.init();
}
return instance;
}
public void init() {
accessKeyId = readAccessKeyID();
accessKeySecret = readAccessKeySecret();
region = readRegion();
bucketName = readBucketName();
endPoint = readEndPoint();
s3Client = buildS3Client();
checkBucketNameExists(bucketName);
}
protected AmazonS3 buildS3Client() {
if (StringUtils.isNotEmpty(endPoint)) {
return AmazonS3ClientBuilder
.standard()
.withPathStyleAccessEnabled(true)
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
endPoint, Regions.fromName(region).getName()))
.withCredentials(
new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, accessKeySecret)))
.build();
} else {
return AmazonS3ClientBuilder
.standard()
.withCredentials(
new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKeyId, accessKeySecret)))
.withRegion(Regions.fromName(region))
.build();
}
}
@Override
public void close() throws IOException {
s3Client.shutdown();
}
@Override
public void sendRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);
try {
log.info("send remote log {} to S3 {}", logPath, objectName);
s3Client.putObject(bucketName, objectName, new File(logPath));
} catch (Exception e) {
log.error("error while sending remote log {} to S3 {}", logPath, objectName, e);
}
}
@Override
public void getRemoteLog(String logPath) {
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);
try {
log.info("get remote log on S3 {} to {}", objectName, logPath);
S3Object o = s3Client.getObject(bucketName, objectName);
try (
S3ObjectInputStream s3is = o.getObjectContent();
FileOutputStream fos = new FileOutputStream(logPath)) {
byte[] readBuf = new byte[1024];
int readLen = 0;
while ((readLen = s3is.read(readBuf)) > 0) {
fos.write(readBuf, 0, readLen);
}
}
} catch (Exception e) {
log.error("error while getting remote log on S3 {} to {}", objectName, logPath, e);
}
}
protected String readAccessKeyID() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_ACCESS_KEY_ID);
}
protected String readAccessKeySecret() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_ACCESS_KEY_SECRET);
}
protected String readRegion() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_REGION);
}
protected String readBucketName() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_BUCKET_NAME);
}
protected String readEndPoint() {
return PropertyUtils.getString(Constants.REMOTE_LOGGING_S3_ENDPOINT);
}
public void checkBucketNameExists(String bucketName) {
if (StringUtils.isBlank(bucketName)) {
throw new IllegalArgumentException("remote.logging.s3.bucket.name is blank");
}
Bucket existsBucket = s3Client.listBuckets()
.stream()
.filter(
bucket -> bucket.getName().equals(bucketName))
.findFirst()
.orElseThrow(() -> {
return new IllegalArgumentException(
"bucketName: " + bucketName + " is not exists, you need to create them by yourself");
});
log.info("bucketName: {} has been found, the current regionName is {}", existsBucket.getName(),
s3Client.getRegionName());
}
}

10
dolphinscheduler-common/src/main/resources/common.properties

@ -158,4 +158,14 @@ remote.logging.oss.access.key.secret=<access.key.secret>
remote.logging.oss.bucket.name=<bucket.name>
# oss endpoint, required if you set remote.logging.target=OSS
remote.logging.oss.endpoint=<endpoint>
# s3 access key id, required if you set remote.logging.target=S3
remote.logging.s3.access.key.id=<access.key.id>
# s3 access key secret, required if you set remote.logging.target=S3
remote.logging.s3.access.key.secret=<access.key.secret>
# s3 bucket name, required if you set remote.logging.target=S3
remote.logging.s3.bucket.name=<bucket.name>
# s3 endpoint, required if you set remote.logging.target=S3
remote.logging.s3.endpoint=<endpoint>
# s3 region, required if you set remote.logging.target=S3
remote.logging.s3.region=<region>

12
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/OssRemoteLogHandlerTest.java → dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerTest.java

@ -20,8 +20,6 @@ package org.apache.dolphinscheduler.common.log.remote;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import java.lang.reflect.Method;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -30,22 +28,18 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class OssRemoteLogHandlerTest {
public class RemoteLogHandlerTest {
@Test
public void testGetObjectNameFromLogPath() throws Exception {
public void testGetObjectNameFromLogPath() {
final String logPath = "/path/to/dolphinscheduler/logs/20230116/8245922982496_1-1-3.log";
final String expectedObjectName = "logs/20230116/8245922982496_1-1-3.log";
OssRemoteLogHandler ossRemoteLogHandler = new OssRemoteLogHandler();
try (MockedStatic<PropertyUtils> propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class)) {
propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_BASE_DIR))
.thenReturn("logs");
Method method = OssRemoteLogHandler.class.getDeclaredMethod("getObjectNameFromLogPath", String.class);
method.setAccessible(true);
String objectName = (String) method.invoke(ossRemoteLogHandler, logPath);
String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath);
Assertions.assertEquals(expectedObjectName, objectName);
}
Loading…
Cancel
Save