diff --git a/docs/docs/en/guide/remote-logging.md b/docs/docs/en/guide/remote-logging.md index a29dc06582..7753fe4116 100644 --- a/docs/docs/en/guide/remote-logging.md +++ b/docs/docs/en/guide/remote-logging.md @@ -10,7 +10,7 @@ If you deploy DolphinScheduler in `Standalone` mode, you only need to configure ```properties # Whether to enable remote logging remote.logging.enable=false -# if remote.logging.enable = true, set the target of remote logging +# if remote.logging.enable = true, set the target of remote logging, currently support OSS, S3, GCS, ABS remote.logging.target=OSS # if remote.logging.enable = true, set the log base directory remote.logging.base.dir=logs @@ -66,12 +66,12 @@ remote.logging.google.cloud.storage.bucket.name= Configure `common.properties` as follows: ```properties -# abs container name, required if you set resource.storage.type=ABS -resource.azure.blob.storage.container.name= # abs account name, required if you set resource.storage.type=ABS -resource.azure.blob.storage.account.name= -# abs connection string, required if you set resource.storage.type=ABS -resource.azure.blob.storage.connection.string= +remote.logging.abs.account.name= +# abs account key, required if you set resource.storage.type=ABS +remote.logging.abs.account.key= +# abs container name, required if you set resource.storage.type=ABS +remote.logging.abs.container.name= ``` ### Notice diff --git a/docs/docs/zh/guide/remote-logging.md b/docs/docs/zh/guide/remote-logging.md index 7321badb1a..0e45353636 100644 --- a/docs/docs/zh/guide/remote-logging.md +++ b/docs/docs/zh/guide/remote-logging.md @@ -10,7 +10,7 @@ Apache DolphinScheduler支持将任务日志传输到远端存储上。当配置 ```properties # 是否开启远程日志存储 remote.logging.enable=true -# 任务日志写入的远端存储,目前支持OSS, S3, GCS +# 任务日志写入的远端存储,目前支持OSS, S3, GCS, ABS remote.logging.target=OSS # 任务日志在远端存储上的目录 remote.logging.base.dir=logs @@ -66,12 +66,12 @@ remote.logging.google.cloud.storage.bucket.name= 配置`common.propertis`如下: ```properties -# abs container name, required if you set resource.storage.type=ABS -resource.azure.blob.storage.container.name= # abs account name, required if you set resource.storage.type=ABS -resource.azure.blob.storage.account.name= -# abs connection string, required if you set resource.storage.type=ABS -resource.azure.blob.storage.connection.string= +remote.logging.abs.account.name= +# abs account key, required if you set resource.storage.type=ABS +remote.logging.abs.account.key= +# abs container name, required if you set resource.storage.type=ABS +remote.logging.abs.container.name= ``` ### 注意事项 diff --git a/dolphinscheduler-common/pom.xml b/dolphinscheduler-common/pom.xml index b03fdd7483..eda9a72e30 100644 --- a/dolphinscheduler-common/pom.xml +++ b/dolphinscheduler-common/pom.xml @@ -98,6 +98,11 @@ esdk-obs-java-bundle + + com.azure + azure-storage-blob + + com.github.oshi oshi-core diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java index 7556636216..054a9410d5 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java @@ -725,6 +725,13 @@ public final class Constants { public static final String REMOTE_LOGGING_GCS_BUCKET_NAME = "remote.logging.google.cloud.storage.bucket.name"; + /** + * remote logging for ABS + */ + public static final String REMOTE_LOGGING_ABS_ACCOUNT_NAME = "remote.logging.abs.account.name"; + public static final String REMOTE_LOGGING_ABS_ACCOUNT_KEY = "remote.logging.abs.account.key"; + public static final String REMOTE_LOGGING_ABS_CONTAINER_NAME = "remote.logging.abs.container.name"; + /** * data quality */ diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/AbsRemoteLogHandler.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/AbsRemoteLogHandler.java new file mode 100644 index 0000000000..c0df3f6287 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/AbsRemoteLogHandler.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.log.remote; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; + +import org.apache.commons.lang3.StringUtils; + +import java.io.Closeable; +import java.io.FileOutputStream; +import java.io.IOException; + +import lombok.extern.slf4j.Slf4j; + +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.specialized.BlobInputStream; +import com.azure.storage.common.StorageSharedKeyCredential; + +@Slf4j +public class AbsRemoteLogHandler implements RemoteLogHandler, Closeable { + + private String accountName; + + private String accountKey; + + private String containerName; + + private BlobContainerClient blobContainerClient; + + private static AbsRemoteLogHandler instance; + + private AbsRemoteLogHandler() { + accountName = readAccountName(); + accountKey = readAccountKey(); + containerName = readContainerName(); + blobContainerClient = buildBlobContainerClient(); + } + + public static synchronized AbsRemoteLogHandler getInstance() { + if (instance == null) { + instance = new AbsRemoteLogHandler(); + } + + return instance; + } + + protected BlobContainerClient buildBlobContainerClient() { + + BlobServiceClient serviceClient = new BlobServiceClientBuilder() + .endpoint(String.format("https://%s.blob.core.windows.net/", accountName)) + .credential(new StorageSharedKeyCredential(accountName, accountKey)) + .buildClient(); + + if (StringUtils.isBlank(containerName)) { + throw new IllegalArgumentException("remote.logging.abs.container.name is blank"); + } + + try { + this.blobContainerClient = serviceClient.getBlobContainerClient(containerName); + } catch (Exception ex) { + throw new IllegalArgumentException( + "containerName: " + containerName + " is not exists, you need to create them by yourself"); + } + + log.info("containerName: {} has been found.", containerName); + + return blobContainerClient; + } + + @Override + public void close() throws IOException { + // no need to close blobContainerClient + } + + @Override + public void sendRemoteLog(String logPath) { + String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath); + + try { + log.info("send remote log {} to Azure Blob {}", logPath, objectName); + blobContainerClient.getBlobClient(objectName).uploadFromFile(logPath); + } catch (Exception e) { + log.error("error while sending remote log {} to Azure Blob {}", logPath, objectName, e); + } + } + + @Override + public void getRemoteLog(String logPath) { + String objectName = RemoteLogUtils.getObjectNameFromLogPath(logPath); + + try { + log.info("get remote log on Azure Blob {} to {}", objectName, logPath); + + try ( + BlobInputStream bis = blobContainerClient.getBlobClient(objectName).openInputStream(); + FileOutputStream fos = new FileOutputStream(logPath)) { + byte[] readBuf = new byte[1024]; + int readLen = 0; + while ((readLen = bis.read(readBuf)) > 0) { + fos.write(readBuf, 0, readLen); + } + } + } catch (Exception e) { + log.error("error while getting remote log on Azure Blob {} to {}", objectName, logPath, e); + } + } + + protected String readAccountName() { + return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME); + } + + protected String readAccountKey() { + return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY); + } + + protected String readContainerName() { + return PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java index 73ab41a134..ac75a23f2d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/log/remote/RemoteLogHandlerFactory.java @@ -39,6 +39,8 @@ public class RemoteLogHandlerFactory { return S3RemoteLogHandler.getInstance(); } else if ("GCS".equals(target)) { return GcsRemoteLogHandler.getInstance(); + } else if ("ABS".equals(target)) { + return AbsRemoteLogHandler.getInstance(); } log.error("No suitable remote logging target for {}", target); diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index 669d3dfef3..fdb553b4bc 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -202,4 +202,9 @@ remote.logging.s3.region= remote.logging.google.cloud.storage.credential=/path/to/credential # gcs bucket name, required if you set remote.logging.target=GCS remote.logging.google.cloud.storage.bucket.name= - +# abs account name, required if you set resource.storage.type=ABS +remote.logging.abs.account.name= +# abs account key, required if you set resource.storage.type=ABS +remote.logging.abs.account.key= +# abs container name, required if you set resource.storage.type=ABS +remote.logging.abs.container.name= diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/AbsRemoteLogHandlerTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/AbsRemoteLogHandlerTest.java new file mode 100644 index 0000000000..bc18f952a9 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/log/remote/AbsRemoteLogHandlerTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.common.log.remote; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import org.apache.dolphinscheduler.common.constants.Constants; +import org.apache.dolphinscheduler.common.utils.LogUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; + +import lombok.extern.slf4j.Slf4j; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.common.StorageSharedKeyCredential; + +@Slf4j +@ExtendWith(MockitoExtension.class) +public class AbsRemoteLogHandlerTest { + + @Mock + BlobServiceClient blobServiceClient; + + @Mock + BlobContainerClient blobContainerClient; + + @Mock + BlobClient blobClient; + + @Test + public void testAbsRemoteLogHandlerContainerNameBlack() { + try ( + MockedStatic propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class); + MockedStatic remoteLogUtilsMockedStatic = Mockito.mockStatic(LogUtils.class)) { + propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME)) + .thenReturn("account_name"); + propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY)) + .thenReturn("account_key"); + propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME)) + .thenReturn(""); + remoteLogUtilsMockedStatic.when(LogUtils::getLocalLogBaseDir).thenReturn("logs"); + + IllegalArgumentException thrown = Assertions.assertThrows(IllegalArgumentException.class, () -> { + AbsRemoteLogHandler.getInstance(); + }); + Assertions.assertEquals("remote.logging.abs.container.name is blank", thrown.getMessage()); + } + } + + @Test + public void testAbsRemoteLogHandlerContainerNotExists() { + try ( + MockedStatic propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class); + MockedStatic remoteLogUtilsMockedStatic = Mockito.mockStatic(LogUtils.class); + MockedConstruction k8sClientWrapperMockedConstruction = + Mockito.mockConstruction(BlobServiceClientBuilder.class, (mock, context) -> { + when(mock.endpoint(any(String.class))).thenReturn(mock); + when(mock.credential(any(StorageSharedKeyCredential.class))).thenReturn(mock); + when(mock.buildClient()) + .thenReturn(blobServiceClient); + })) { + propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME)) + .thenReturn("account_name"); + propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY)) + .thenReturn("account_key"); + propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME)) + .thenReturn("container_name"); + remoteLogUtilsMockedStatic.when(LogUtils::getLocalLogBaseDir).thenReturn("logs"); + + when(blobServiceClient.getBlobContainerClient(any(String.class))).thenThrow( + new NullPointerException("container not exists")); + IllegalArgumentException thrown = Assertions.assertThrows(IllegalArgumentException.class, () -> { + AbsRemoteLogHandler.getInstance(); + }); + Assertions.assertEquals("containerName: container_name is not exists, you need to create them by yourself", + thrown.getMessage()); + } + } + + @Test + public void testAbsRemoteLogHandler() { + + try ( + MockedStatic propertyUtilsMockedStatic = Mockito.mockStatic(PropertyUtils.class); + MockedStatic remoteLogUtilsMockedStatic = Mockito.mockStatic(LogUtils.class); + MockedConstruction blobServiceClientBuilderMockedConstruction = + Mockito.mockConstruction(BlobServiceClientBuilder.class, (mock, context) -> { + when(mock.endpoint(any(String.class))).thenReturn(mock); + when(mock.credential(any(StorageSharedKeyCredential.class))).thenReturn(mock); + when(mock.buildClient()) + .thenReturn(blobServiceClient); + }); + MockedStatic remoteLogUtilsMockedStatic1 = Mockito.mockStatic(RemoteLogUtils.class)) { + propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_NAME)) + .thenReturn("account_name"); + propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_ACCOUNT_KEY)) + .thenReturn("account_key"); + propertyUtilsMockedStatic.when(() -> PropertyUtils.getString(Constants.REMOTE_LOGGING_ABS_CONTAINER_NAME)) + .thenReturn("container_name"); + remoteLogUtilsMockedStatic.when(LogUtils::getLocalLogBaseDir).thenReturn("logs"); + String logPath = "logpath"; + String objectName = "objectname"; + remoteLogUtilsMockedStatic1.when(() -> RemoteLogUtils.getObjectNameFromLogPath(logPath)) + .thenReturn(objectName); + + when(blobServiceClient.getBlobContainerClient(any(String.class))).thenReturn(blobContainerClient); + when(blobContainerClient.getBlobClient(objectName)).thenReturn(blobClient); + + AbsRemoteLogHandler absRemoteLogHandler = AbsRemoteLogHandler.getInstance(); + Assertions.assertNotNull(absRemoteLogHandler); + + absRemoteLogHandler.sendRemoteLog(logPath); + Mockito.verify(blobClient, times(1)).uploadFromFile(logPath); + } + } +}