From 9964c4c1e18d7abf9b116fdb8b3deef49156c0fd Mon Sep 17 00:00:00 2001 From: WangJPLeo <103574007+WangJPLeo@users.noreply.github.com> Date: Wed, 20 Apr 2022 09:58:37 +0800 Subject: [PATCH] [Fix-9593] Storage Management StorageOperate No Instance (#9594) * Storage Management StorageOperate No Instance * Add StorageOperateManager unit test * Add license header * Fix issues in SonarCloud code analysis Co-authored-by: WangJPLeo --- .../common/storage/StorageOperateManager.java | 48 ++++++++++++++++++ .../common/utils/HadoopUtils.java | 2 + .../common/utils/S3Utils.java | 2 + .../storage/StorageOperateManagerTest.java | 50 +++++++++++++++++++ .../processor/TaskExecuteProcessor.java | 4 +- .../worker/runner/TaskExecuteThread.java | 4 +- 6 files changed, 108 insertions(+), 2 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java new file mode 100644 index 0000000000..184a264899 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.storage; + +import org.apache.dolphinscheduler.common.enums.ResUploadType; + +import java.util.EnumMap; +import java.util.Objects; +import java.util.ServiceLoader; +/** + * @author Storage Operate Manager + */ +public class StorageOperateManager { + + public static final EnumMap OPERATE_MAP = new EnumMap<>(ResUploadType.class); + + static { + ServiceLoader load = ServiceLoader.load(StorageOperate.class); + for (StorageOperate storageOperate : load) { + OPERATE_MAP.put(storageOperate.returnStorageType(), storageOperate); + } + } + + public static StorageOperate getStorageOperate(ResUploadType resUploadType) { + if (Objects.isNull(resUploadType)){ + resUploadType = ResUploadType.HDFS; + } + StorageOperate storageOperate = OPERATE_MAP.get(resUploadType); + if (Objects.isNull(storageOperate)){ + storageOperate = OPERATE_MAP.get(ResUploadType.HDFS); + } + return storageOperate; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java index 41fa66933a..8ac51c67ba 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java @@ -37,6 +37,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.client.cli.RMAdminCLI; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import java.io.*; import java.nio.charset.StandardCharsets; @@ -55,6 +56,7 @@ import static org.apache.dolphinscheduler.common.Constants.*; * hadoop utils * single instance */ +@Component public class HadoopUtils implements Closeable, StorageOperate { private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class); diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java index ad0f6ce3a1..80fb464661 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType; import org.jets3t.service.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import java.io.*; import java.util.Collections; @@ -45,6 +46,7 @@ import java.util.stream.Stream; import static org.apache.dolphinscheduler.common.Constants.*; +@Component public class S3Utils implements Closeable, StorageOperate { private static final Logger logger = LoggerFactory.getLogger(S3Utils.class); diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java new file mode 100644 index 0000000000..05f2e45a73 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.storage; + +import org.apache.dolphinscheduler.common.enums.ResUploadType; +import org.apache.dolphinscheduler.common.utils.HadoopUtils; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.EnumMap; + +/** + * @author StorageOperateManagerTest + */ +@RunWith(MockitoJUnitRunner.class) +public class StorageOperateManagerTest { + + @Mock + private HadoopUtils hadoopUtils; + + @Test + public void testManager() { + StorageOperateManager mock = Mockito.mock(StorageOperateManager.class); + Assert.assertNotNull(mock); + + EnumMap storageOperateMap = StorageOperateManager.OPERATE_MAP; + storageOperateMap.put(ResUploadType.HDFS, hadoopUtils); + + StorageOperate storageOperate = StorageOperateManager.getStorageOperate(ResUploadType.HDFS); + Assert.assertNotNull(storageOperate); + } +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java index a376587aad..0d2e4880aa 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java @@ -18,6 +18,8 @@ package org.apache.dolphinscheduler.server.worker.processor; import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.enums.ResUploadType; +import org.apache.dolphinscheduler.common.storage.StorageOperateManager; import org.apache.dolphinscheduler.common.utils.CommonUtils; import org.apache.dolphinscheduler.common.utils.DateUtils; import org.apache.dolphinscheduler.common.utils.FileUtils; @@ -161,7 +163,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { } // submit task to manager - boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager)); + boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager, StorageOperateManager.getStorageOperate(ResUploadType.HDFS))); if (!offer) { logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}", workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId()); diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 7670d49b0a..9fffec7662 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -115,11 +115,13 @@ public class TaskExecuteThread implements Runnable, Delayed { public TaskExecuteThread(TaskExecutionContext taskExecutionContext, TaskCallbackService taskCallbackService, AlertClientService alertClientService, - TaskPluginManager taskPluginManager) { + TaskPluginManager taskPluginManager, + StorageOperate storageOperate) { this.taskExecutionContext = taskExecutionContext; this.taskCallbackService = taskCallbackService; this.alertClientService = alertClientService; this.taskPluginManager = taskPluginManager; + this.storageOperate = storageOperate; } @Override