Browse Source

[Improvement-14292][Common] Remove the duplicate OSUtils (#14293)

3.2.1-prepare
Rick Cheng 1 year ago committed by GitHub
parent
commit
2c057d6a8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 29
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/OSUtils.java
  2. 1
      dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java
  3. 4
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java
  4. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  5. 361
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractShell.java
  6. 177
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellExecutor.java
  7. 84
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/OSUtils.java
  8. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java
  9. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
  10. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
  11. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/GitProjectManager.java

29
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/OSUtils.java

@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.alert.script;
public final class OSUtils {
private OSUtils() {
throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
}
static Boolean isWindows() {
return System.getProperty("os.name").startsWith("Windows");
}
}

1
dolphinscheduler-alert/dolphinscheduler-alert-plugins/dolphinscheduler-alert-script/src/main/java/org/apache/dolphinscheduler/plugin/alert/script/ScriptSender.java

@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.plugin.alert.script;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.commons.lang3.StringUtils;

4
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/OSUtils.java

@ -460,4 +460,8 @@ public class OSUtils {
return false;
}
public static Boolean isWindows() {
return System.getProperty("os.name").startsWith("Windows");
}
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -24,12 +24,12 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import org.apache.dolphinscheduler.common.constants.TenantConstants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.commons.lang3.StringUtils;

361
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractShell.java

@ -1,361 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
/**
* A base class for running a Unix command.
*
* <code>AbstractShell</code> can be used to run unix commands like <code>du</code> or
* <code>df</code>. It also offers facilities to gate commands by
* time-intervals.
*/
@Slf4j
public abstract class AbstractShell {
/**
* Time after which the executing script would be timedout
*/
protected long timeOutInterval = 0L;
/**
* If or not script timed out
*/
private AtomicBoolean timedOut;
/**
* refresh interval in msec
*/
private long interval;
/**
* last time the command was performed
*/
private long lastTime;
/**
* env for the command execution
*/
private Map<String, String> environment;
private File dir;
/**
* sub process used to execute the command
*/
private Process process;
private int exitCode;
/**
* If or not script finished executing
*/
private AtomicBoolean completed;
AbstractShell() {
this(0L);
}
/**
* @param interval the minimum duration to wait before re-executing the
* command.
*/
AbstractShell(long interval) {
this.interval = interval;
this.lastTime = (interval < 0) ? 0 : -interval;
}
/**
* set the environment for the command
*
* @param env Mapping of environment variables
*/
protected void setEnvironment(Map<String, String> env) {
this.environment = env;
}
/**
* set the working directory
*
* @param dir The directory where the command would be executed
*/
protected void setWorkingDirectory(File dir) {
this.dir = dir;
}
/**
* check to see if a command needs to be executed and execute if needed
*
* @throws IOException errors
*/
protected void run() throws IOException {
if (lastTime + interval > System.currentTimeMillis()) {
return;
}
// reset for next run
exitCode = 0;
runCommand();
}
/**
* Run a command actual work
*/
private void runCommand() throws IOException {
ProcessBuilder builder = new ProcessBuilder(getExecString());
Timer timeOutTimer = null;
ShellTimeoutTimerTask timeoutTimerTask;
timedOut = new AtomicBoolean(false);
completed = new AtomicBoolean(false);
if (environment != null) {
builder.environment().putAll(this.environment);
}
if (dir != null) {
builder.directory(this.dir);
}
process = builder.start();
ProcessContainer.putProcess(process);
if (timeOutInterval > 0) {
timeOutTimer = new Timer();
timeoutTimerTask = new ShellTimeoutTimerTask(
this);
// One time scheduling.
timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
}
final BufferedReader errReader =
new BufferedReader(
new InputStreamReader(process.getErrorStream()));
BufferedReader inReader =
new BufferedReader(
new InputStreamReader(process.getInputStream()));
final StringBuilder errMsg = new StringBuilder();
// read error and input streams as this would free up the buffers
// free the error stream buffer
Thread errThread = new Thread() {
@Override
public void run() {
try {
String line = errReader.readLine();
while ((line != null) && !isInterrupted()) {
errMsg.append(line);
errMsg.append(System.getProperty("line.separator"));
line = errReader.readLine();
}
} catch (IOException ioe) {
log.warn("Error reading the error stream", ioe);
}
}
};
Thread inThread = new Thread() {
@Override
public void run() {
try {
parseExecResult(inReader);
} catch (IOException ioe) {
log.warn("Error reading the in stream", ioe);
}
super.run();
}
};
try {
errThread.start();
inThread.start();
} catch (IllegalStateException e) {
log.error(" read error and input streams start error", e);
}
try {
// parse the output
exitCode = process.waitFor();
try {
// make sure that the error and in thread exits
errThread.join();
inThread.join();
} catch (InterruptedException ie) {
log.warn("Interrupted while reading the error and in stream", ie);
}
completed.compareAndSet(false, true);
// the timeout thread handling
// taken care in finally block
if (exitCode != 0 || errMsg.length() > 0) {
throw new ExitCodeException(exitCode, errMsg.toString());
}
} catch (InterruptedException ie) {
throw new IOException(ie.toString());
} finally {
if ((timeOutTimer != null) && !timedOut.get()) {
timeOutTimer.cancel();
}
// close the input stream
try {
inReader.close();
} catch (IOException ioe) {
log.warn("Error while closing the input stream", ioe);
}
if (!completed.get()) {
errThread.interrupt();
}
try {
errReader.close();
} catch (IOException ioe) {
log.warn("Error while closing the error stream", ioe);
}
ProcessContainer.removeProcess(process);
process.destroy();
lastTime = System.currentTimeMillis();
}
}
/**
* @return an array containing the command name and its parameters
*/
protected abstract String[] getExecString();
/**
* Parse the execution result
*
* @param lines lines
* @throws IOException errors
*/
protected abstract void parseExecResult(BufferedReader lines) throws IOException;
/**
* get the current sub-process executing the given command
*
* @return process executing the command
*/
public Process getProcess() {
return process;
}
/**
* get the exit code
*
* @return the exit code of the process
*/
public int getExitCode() {
return exitCode;
}
/**
* Set if the command has timed out.
*/
private void setTimedOut() {
this.timedOut.set(true);
}
/**
* Timer which is used to timeout scripts spawned off by shell.
*/
private static class ShellTimeoutTimerTask extends TimerTask {
private AbstractShell shell;
public ShellTimeoutTimerTask(AbstractShell shell) {
this.shell = shell;
}
@Override
public void run() {
Process p = shell.getProcess();
try {
p.exitValue();
} catch (Exception e) {
// Process has not terminated.
// So check if it has completed
// if not just destroy it.
if (p != null && !shell.completed.get()) {
shell.setTimedOut();
p.destroy();
}
}
}
}
/**
* This is an IOException with exit code added.
*/
public static class ExitCodeException extends IOException {
private final int exitCode;
public ExitCodeException(int exitCode, String message) {
super(message);
this.exitCode = exitCode;
}
public int getExitCode() {
return exitCode;
}
}
/**
* process manage container
*/
public static class ProcessContainer extends ConcurrentHashMap<Integer, Process> {
private static final ProcessContainer container = new ProcessContainer();
private ProcessContainer() {
super();
}
public static final ProcessContainer getInstance() {
return container;
}
public static void putProcess(Process process) {
getInstance().put(process.hashCode(), process);
}
public static int processSize() {
return getInstance().size();
}
public static void removeProcess(Process process) {
getInstance().remove(process.hashCode());
}
public static void destroyAllProcess() {
Set<Entry<Integer, Process>> set = getInstance().entrySet();
for (Entry<Integer, Process> entry : set) {
try {
entry.getValue().destroy();
} catch (Exception e) {
log.error("Destroy All Processes error", e);
}
}
log.info("close " + set.size() + " executing process tasks");
}
}
}

177
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellExecutor.java

@ -1,177 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.util.Map;
/**
* shell command executor.
*
* <code>ShellExecutor</code> should be used in cases where the output
* of the command needs no explicit parsing and where the command, working
* directory and the environment remains unchanged. The output of the command
* is stored as-is and is expected to be small.
*/
public class ShellExecutor extends AbstractShell {
private String[] command;
private StringBuilder output;
public ShellExecutor(String... execString) {
this(execString, null);
}
public ShellExecutor(String[] execString, File dir) {
this(execString, dir, null);
}
public ShellExecutor(String[] execString, File dir,
Map<String, String> env) {
this(execString, dir, env, 0L);
}
/**
* Create a new instance of the ShellExecutor to execute a command.
*
* @param execString The command to execute with arguments
* @param dir If not-null, specifies the directory which should be set
* as the current working directory for the command.
* If null, the current working directory is not modified.
* @param env If not-null, environment of the command will include the
* key-value pairs specified in the map. If null, the current
* environment is not modified.
* @param timeout Specifies the time in milliseconds, after which the
* command will be killed and the status marked as timedout.
* If 0, the command will not be timed out.
*/
public ShellExecutor(String[] execString, File dir,
Map<String, String> env, long timeout) {
command = execString.clone();
if (dir != null) {
setWorkingDirectory(dir);
}
if (env != null) {
setEnvironment(env);
}
timeOutInterval = timeout;
}
/**
* Static method to execute a shell command.
* Covers most of the simple cases without requiring the user to implement
* the <code>AbstractShell</code> interface.
*
* @param cmd shell command to execute.
* @return the output of the executed command.
* @throws IOException errors
*/
public static String execCommand(String... cmd) throws IOException {
return execCommand(null, cmd, 0L);
}
/**
* Static method to execute a shell command.
* Covers most of the simple cases without requiring the user to implement
* the <code>AbstractShell</code> interface.
*
* @param env the map of environment key=value
* @param cmd shell command to execute.
* @param timeout time in milliseconds after which script should be marked timeout
* @return the output of the executed command.
* @throws IOException errors
*/
public static String execCommand(Map<String, String> env, String[] cmd,
long timeout) throws IOException {
ShellExecutor exec = new ShellExecutor(cmd, null, env,
timeout);
exec.execute();
return exec.getOutput();
}
/**
* Static method to execute a shell command.
* Covers most of the simple cases without requiring the user to implement
* the <code>AbstractShell</code> interface.
*
* @param env the map of environment key=value
* @param cmd shell command to execute.
* @return the output of the executed command.
* @throws IOException errors
*/
public static String execCommand(Map<String, String> env, String... cmd) throws IOException {
return execCommand(env, cmd, 0L);
}
/**
* Execute the shell command
*
* @throws IOException errors
*/
public void execute() throws IOException {
this.run();
}
@Override
protected String[] getExecString() {
return command;
}
@Override
protected void parseExecResult(BufferedReader lines) throws IOException {
output = new StringBuilder();
char[] buf = new char[1024];
int nRead;
String line = "";
while ((nRead = lines.read(buf, 0, buf.length)) > 0) {
line = new String(buf, 0, nRead);
output.append(line);
}
}
/**
* @return the output of the shell command
*/
public String getOutput() {
return (output == null) ? "" : output.toString();
}
/**
* Returns the commands of this instance.
* Arguments with spaces in are presented with quotes round; other
* arguments are presented raw
*
* @return a string representation of the object
*/
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
String[] args = getExecString();
for (String s : args) {
if (s.indexOf(' ') >= 0) {
builder.append('"').append(s).append('"');
} else {
builder.append(s);
}
builder.append(' ');
}
return builder.toString();
}
}

84
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/OSUtils.java

@ -1,84 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.utils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.ShellExecutor;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.util.StringTokenizer;
public class OSUtils {
private OSUtils() {
throw new IllegalStateException("Utility class");
}
/**
* get sudo command
*
* @param tenantCode tenantCode
* @param command command
* @return result of sudo execute command
*/
public static String getSudoCmd(String tenantCode, String command) {
return StringUtils.isEmpty(tenantCode) || !isSudoEnable() ? command : "sudo -u " + tenantCode + " " + command;
}
/**
* use sudo or not
*
* @return true is use sudo
*/
public static boolean isSudoEnable() {
return PropertyUtils.getBoolean("sudo.enable", Boolean.TRUE);
}
/**
* Execute the corresponding command of Linux or Windows
*
* @param command command
* @return result of execute command
* @throws IOException errors
*/
public static String exeCmd(String command) throws IOException {
StringTokenizer st = new StringTokenizer(command);
String[] cmdArray = new String[st.countTokens()];
for (int i = 0; st.hasMoreTokens(); i++) {
cmdArray[i] = st.nextToken();
}
return exeShell(cmdArray);
}
/**
* Execute the shell
*
* @param command command
* @return result of execute the shell
* @throws IOException errors
*/
public static String exeShell(String[] command) throws IOException {
return ShellExecutor.execCommand(command);
}
public static String getOSName() {
return System.getProperty("os.name");
}
}

1
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java

@ -23,6 +23,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMMA;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S;
import org.apache.dolphinscheduler.common.enums.ResourceManagerType;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;

2
dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java

@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.plugin.kubeflow;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -26,7 +27,6 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import java.io.IOException;
import java.nio.file.Files;

2
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java

@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
@ -31,7 +32,6 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.commons.lang3.StringUtils;

4
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/GitProjectManager.java

@ -17,9 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.pytorch;
import static org.apache.dolphinscheduler.plugin.task.api.AbstractShell.ExitCodeException;
import static org.apache.dolphinscheduler.common.shell.AbstractShell.ExitCodeException;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import java.io.File;
import java.nio.file.Paths;

Loading…
Cancel
Save