Browse Source

[Fix-9593] Storage Management StorageOperate No Instance (#9594)

* Storage Management StorageOperate No Instance

* Add StorageOperateManager unit test

* Add license header

* Fix issues in SonarCloud code analysis

Co-authored-by: WangJPLeo <wangjipeng@whaleops.com>
3.0.0/version-upgrade
WangJPLeo 2 years ago committed by GitHub
parent
commit
9964c4c1e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 48
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java
  2. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
  3. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java
  4. 50
      dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java
  5. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
  6. 4
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

48
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/storage/StorageOperateManager.java

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.storage;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import java.util.EnumMap;
import java.util.Objects;
import java.util.ServiceLoader;
/**
* @author Storage Operate Manager
*/
public class StorageOperateManager {
public static final EnumMap<ResUploadType, StorageOperate> OPERATE_MAP = new EnumMap<>(ResUploadType.class);
static {
ServiceLoader<StorageOperate> load = ServiceLoader.load(StorageOperate.class);
for (StorageOperate storageOperate : load) {
OPERATE_MAP.put(storageOperate.returnStorageType(), storageOperate);
}
}
public static StorageOperate getStorageOperate(ResUploadType resUploadType) {
if (Objects.isNull(resUploadType)){
resUploadType = ResUploadType.HDFS;
}
StorageOperate storageOperate = OPERATE_MAP.get(resUploadType);
if (Objects.isNull(storageOperate)){
storageOperate = OPERATE_MAP.get(ResUploadType.HDFS);
}
return storageOperate;
}
}

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java

@ -37,6 +37,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.client.cli.RMAdminCLI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.*;
import java.nio.charset.StandardCharsets;
@ -55,6 +56,7 @@ import static org.apache.dolphinscheduler.common.Constants.*;
* hadoop utils
* single instance
*/
@Component
public class HadoopUtils implements Closeable, StorageOperate {
private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/S3Utils.java

@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.jets3t.service.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.io.*;
import java.util.Collections;
@ -45,6 +46,7 @@ import java.util.stream.Stream;
import static org.apache.dolphinscheduler.common.Constants.*;
@Component
public class S3Utils implements Closeable, StorageOperate {
private static final Logger logger = LoggerFactory.getLogger(S3Utils.class);

50
dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/storage/StorageOperateManagerTest.java

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.storage;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.EnumMap;
/**
* @author StorageOperateManagerTest
*/
@RunWith(MockitoJUnitRunner.class)
public class StorageOperateManagerTest {
@Mock
private HadoopUtils hadoopUtils;
@Test
public void testManager() {
StorageOperateManager mock = Mockito.mock(StorageOperateManager.class);
Assert.assertNotNull(mock);
EnumMap<ResUploadType, StorageOperate> storageOperateMap = StorageOperateManager.OPERATE_MAP;
storageOperateMap.put(ResUploadType.HDFS, hadoopUtils);
StorageOperate storageOperate = StorageOperateManager.getStorageOperate(ResUploadType.HDFS);
Assert.assertNotNull(storageOperate);
}
}

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java

@ -18,6 +18,8 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.dolphinscheduler.common.storage.StorageOperateManager;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
@ -161,7 +163,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
}
// submit task to manager
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager, StorageOperateManager.getStorageOperate(ResUploadType.HDFS)));
if (!offer) {
logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());

4
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java

@ -115,11 +115,13 @@ public class TaskExecuteThread implements Runnable, Delayed {
public TaskExecuteThread(TaskExecutionContext taskExecutionContext,
TaskCallbackService taskCallbackService,
AlertClientService alertClientService,
TaskPluginManager taskPluginManager) {
TaskPluginManager taskPluginManager,
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.taskCallbackService = taskCallbackService;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
}
@Override

Loading…
Cancel
Save