Browse Source

Support execute shell in different interceptor (#14582)

3.2.1-prepare
Wenjun Ruan 1 year ago committed by GitHub
parent
commit
c30cca9d9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-common/src/main/resources/common.properties
  2. 153
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  3. 36
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
  4. 107
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
  5. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
  6. 171
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java
  7. 50
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptor.java
  8. 141
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptorBuilder.java
  9. 116
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseWindowsShellInterceptorBuilder.java
  10. 30
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptor.java
  11. 52
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptorBuilder.java
  12. 43
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/ShellInterceptorBuilderFactory.java
  13. 30
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptor.java
  14. 56
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptorBuilder.java
  15. 29
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptor.java
  16. 55
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptorBuilder.java
  17. 29
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptor.java
  18. 60
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptorBuilder.java
  19. 61
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
  20. 17
      dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
  21. 60
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
  22. 31
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java
  23. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
  24. 15
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
  25. 15
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
  26. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
  27. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
  28. 19
      dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
  29. 35
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
  30. 18
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
  31. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
  32. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
  33. 17
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
  34. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  35. 59
      dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
  36. 20
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
  37. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java
  38. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java

2
dolphinscheduler-common/src/main/resources/common.properties

@ -153,6 +153,8 @@ appId.collect=log
# The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profile
shell.env_source_list=
# The interceptor type of Shell task, e.g. bash, sh, cmd
shell.interceptor.type=bash
# Whether to enable remote logging
remote.logging.enable=false

153
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -25,22 +25,20 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import org.apache.dolphinscheduler.common.constants.TenantConstants;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptor;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -114,111 +112,61 @@ public abstract class AbstractCommandExecutor {
}
}
public AbstractCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
this.logBuffer = logBuffer;
}
/**
* build process
*
* @param commandFile command file
* @throws IOException IO Exception
*/
private void buildProcess(String commandFile) throws IOException {
// setting up user to run commands
List<String> command = new LinkedList<>();
// init process builder
ProcessBuilder processBuilder = new ProcessBuilder();
// setting up a working directory
processBuilder.directory(new File(taskRequest.getExecutePath()));
// merge error information to standard output stream
processBuilder.redirectErrorStream(true);
// if sudo.enable=true,setting up user to run commands
// todo: Create a ShellExecuteClass to generate the shell and execute shell commands
if (OSUtils.isSudoEnable() && !TenantConstants.DEFAULT_TENANT_CODE.equals(taskRequest.getTenantCode())) {
if (SystemUtils.IS_OS_LINUX
&& PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) {
generateCgroupCommand(command);
} else {
command.add("sudo");
command.add("-u");
command.add(taskRequest.getTenantCode());
command.add("-E");
}
}
command.add(commandInterpreter());
command.add(commandFile);
// setting commands
processBuilder.command(command);
process = processBuilder.start();
printCommand(command);
}
/**
* generate systemd command.
* eg: sudo systemd-run -q --scope -p CPUQuota=100% -p MemoryLimit=200M --uid=root
* @param command command
*/
private void generateCgroupCommand(List<String> command) {
Integer cpuQuota = taskRequest.getCpuQuota();
Integer memoryMax = taskRequest.getMemoryMax();
command.add("sudo");
command.add("systemd-run");
command.add("-q");
command.add("--scope");
if (cpuQuota == -1) {
command.add("-p");
command.add("CPUQuota=");
} else {
command.add("-p");
command.add(String.format("CPUQuota=%s%%", taskRequest.getCpuQuota()));
}
// use `man systemd.resource-control` to find available parameter
if (memoryMax == -1) {
command.add("-p");
command.add(String.format("MemoryLimit=%s", "infinity"));
} else {
command.add("-p");
command.add(String.format("MemoryLimit=%sM", taskRequest.getMemoryMax()));
}
command.add(String.format("--uid=%s", taskRequest.getTenantCode()));
}
public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws Exception {
// todo: We need to build the IShellActuator in outer class, since different task may have specific logic to build
// the IShellActuator
public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
TaskCallBack taskCallBack) throws Exception {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
logger.warn(
"Cannot find the taskInstance: {} from TaskExecutionContextCacheManager, the task might already been killed",
taskInstanceId);
result.setExitStatusCode(EXIT_CODE_KILL);
return result;
}
if (StringUtils.isEmpty(execCommand)) {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
return result;
iShellInterceptorBuilder = iShellInterceptorBuilder
.shellDirectory(taskRequest.getExecutePath())
.shellName(taskRequest.getTaskAppId());
// Set system env
if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv);
}
// Set custom env
if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig());
}
// Set k8s config (This is only work in Linux)
if (taskRequest.getK8sTaskExecutionContext() != null) {
iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml());
}
// Set sudo (This is only work in Linux)
iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable());
// Set tenant (This is only work in Linux)
if (TenantConstants.DEFAULT_TENANT_CODE.equals(taskRequest.getTenantCode())) {
iShellInterceptorBuilder.runUser(TenantConstants.BOOTSTRAPT_SYSTEM_USER);
} else {
iShellInterceptorBuilder.runUser(taskRequest.getTenantCode());
}
// Set CPU Quota (This is only work in Linux)
if (taskRequest.getCpuQuota() != null) {
iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota());
}
// Set memory Quota (This is only work in Linux)
if (taskRequest.getMemoryMax() != null) {
iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax());
}
String commandFilePath = buildCommandFilePath();
// create command file if not exists
createCommandFileIfNotExists(execCommand, commandFilePath);
// build process
buildProcess(commandFilePath);
IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build();
process = iShellInterceptor.execute();
// parse process output
parseProcessOutput(process);
parseProcessOutput(this.process);
// collect pod log
collectPodLogIfNeeded();
int processId = getProcessId(process);
int processId = getProcessId(this.process);
result.setProcessId(processId);
@ -243,7 +191,7 @@ public abstract class AbstractCommandExecutor {
}
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS);
TaskExecutionStatus kubernetesStatus =
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
@ -272,7 +220,7 @@ public abstract class AbstractCommandExecutor {
if (status && kubernetesStatus.isSuccess()) {
// SHELL task state
result.setExitStatusCode(process.exitValue());
result.setExitStatusCode(this.process.exitValue());
} else {
logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
@ -280,7 +228,7 @@ public abstract class AbstractCommandExecutor {
result.setExitStatusCode(EXIT_CODE_FAILURE);
cancelApplication();
}
int exitCode = process.exitValue();
int exitCode = this.process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
@ -446,9 +394,4 @@ public abstract class AbstractCommandExecutor {
return processId;
}
protected abstract String buildCommandFilePath();
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
protected abstract String commandInterpreter();
}

36
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

@ -22,31 +22,17 @@ import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COL
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import java.util.List;
import java.util.regex.Pattern;
import java.util.Map;
/**
* abstract yarn task
*/
public abstract class AbstractYarnTask extends AbstractRemoteTask {
/**
* process task
*/
private ShellCommandExecutor shellCommandExecutor;
/**
* rules for extracting application ID
*/
protected static final Pattern YARN_APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX);
/**
* Abstract Yarn Task
*
* @param taskRequest taskRequest
*/
public AbstractYarnTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
@ -58,8 +44,12 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.properties(getProperties())
// todo: do we need to move the replace to subclass?
.appendScript(getScript().replaceAll("\\r\\n", System.lineSeparator()));
// SHELL task exit code
TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack);
TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(response.getExitStatusCode());
// set appIds
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
@ -115,10 +105,12 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask {
}
/**
* create command
*
* @return String
* Get the script used to bootstrap the task
*/
protected abstract String buildCommand();
protected abstract String getScript();
/**
* Get the properties of the task used to replace the placeholders in the script.
*/
protected abstract Map<String, String> getProperties();
}

107
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java

@ -17,19 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
@ -40,104 +27,10 @@ import org.slf4j.Logger;
*/
public class ShellCommandExecutor extends AbstractCommandExecutor {
/**
* For Unix-like, using bash
*/
private static final String SH = "bash";
/**
* For Windows, using cmd.exe
*/
private static final String CMD = "cmd.exe";
/**
* constructor
*
* @param logHandler logHandler
* @param taskRequest taskRequest
* @param logger logger
*/
public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskExecutionContext taskRequest,
Logger logger) {
super(logHandler, taskRequest, logger);
}
public ShellCommandExecutor(LinkedBlockingQueue<String> logBuffer) {
super(logBuffer);
}
@Override
protected String buildCommandFilePath() {
// command file
return String.format("%s/%s.%s", taskRequest.getExecutePath(), taskRequest.getTaskAppId(),
SystemUtils.IS_OS_WINDOWS ? "bat" : "command");
}
/**
* create command file if not exists
*
* @param execCommand exec command
* @param commandFile command file
* @throws IOException io exception
*/
@Override
protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException {
// create if non existence
logger.info("Begin to create command file:{}", commandFile);
Path commandFilePath = Paths.get(commandFile);
if (Files.exists(commandFilePath)) {
logger.warn("The command file: {} is already exist, will not create a again", commandFile);
return;
}
StringBuilder sb = new StringBuilder();
if (SystemUtils.IS_OS_WINDOWS) {
sb.append("@echo off").append(System.lineSeparator());
sb.append("cd /d %~dp0").append(System.lineSeparator());
if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
for (String envSourceFile : ShellUtils.ENV_SOURCE_LIST) {
sb.append("call ").append(envSourceFile).append("\n");
}
}
if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
sb.append(taskRequest.getEnvironmentConfig()).append(System.lineSeparator());
}
} else {
sb.append("#!/bin/bash").append(System.lineSeparator());
sb.append("BASEDIR=$(cd `dirname $0`; pwd)").append(System.lineSeparator());
sb.append("cd $BASEDIR").append(System.lineSeparator());
if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) {
for (String envSourceFile : ShellUtils.ENV_SOURCE_LIST) {
sb.append("source ").append(envSourceFile).append("\n");
}
}
if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) {
sb.append(taskRequest.getEnvironmentConfig()).append(System.lineSeparator());
}
if (Objects.nonNull(taskRequest.getK8sTaskExecutionContext())) {
String configYaml = taskRequest.getK8sTaskExecutionContext().getConfigYaml();
Path kubeConfigPath = Paths.get(org.apache.dolphinscheduler.common.utils.FileUtils
.getKubeConfigPath(taskRequest.getExecutePath()));
FileUtils.createFileWith755(kubeConfigPath);
Files.write(kubeConfigPath, configYaml.getBytes(), StandardOpenOption.APPEND);
sb.append("export KUBECONFIG=" + kubeConfigPath).append(System.lineSeparator());
logger.info("Create kubernetes configuration file: {}.", kubeConfigPath);
}
}
sb.append(execCommand);
String commandContent = sb.toString();
FileUtils.createFileWith755(commandFilePath);
Files.write(commandFilePath, commandContent.getBytes(), StandardOpenOption.APPEND);
logger.info("Success create command file, command: {}", commandContent);
}
@Override
protected String commandInterpreter() {
return SystemUtils.IS_OS_WINDOWS ? CMD : SH;
}
}

1
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java

@ -173,6 +173,7 @@ public class TaskExecutionContext implements Serializable {
/**
* definedParams
* // todo: we need to rename definedParams, prepareParamsMap, paramsMap, this is confusing
*/
private Map<String, String> definedParams;

171
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java

@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class BaseLinuxShellInterceptorBuilder<T extends BaseLinuxShellInterceptorBuilder<T, Y>, Y extends BaseShellInterceptor>
extends
BaseShellInterceptorBuilder<T, Y> {
protected void generateShellScript() throws IOException {
List<String> finalScripts = new ArrayList<>();
// add shell header
finalScripts.add(shellHeader());
finalScripts.add("BASEDIR=$(cd `dirname $0`; pwd)");
finalScripts.add("cd $BASEDIR");
// add system env
finalScripts.addAll(systemEnvScript());
// add custom env
finalScripts.addAll(customEnvScript());
// add k8s config
finalScripts.addAll(k8sConfig());
// add shell body
finalScripts.add(shellBody());
// create shell file
String finalScript = finalScripts.stream().collect(Collectors.joining(System.lineSeparator()));
Path shellAbsolutePath = shellAbsolutePath();
FileUtils.createFileWith755(shellAbsolutePath);
Files.write(shellAbsolutePath, finalScript.getBytes(), StandardOpenOption.APPEND);
log.info("Final Shell file is : \n{}", finalScript);
}
protected List<String> generateBootstrapCommand() {
if (sudoEnable) {
return bootstrapCommandInSudoMode();
}
return bootstrapCommandInNormalMode();
}
protected abstract String shellHeader();
protected abstract String shellInterpreter();
protected abstract String shellExtension();
private List<String> systemEnvScript() {
if (CollectionUtils.isEmpty(systemEnvs)) {
return Collections.emptyList();
}
return systemEnvs
.stream()
.map(systemEnv -> "source " + systemEnv).collect(Collectors.toList());
}
private List<String> customEnvScript() {
if (CollectionUtils.isEmpty(customEnvScripts)) {
return Collections.emptyList();
}
return customEnvScripts;
}
private List<String> k8sConfig() throws IOException {
if (StringUtils.isEmpty(k8sConfigYaml)) {
return Collections.emptyList();
}
Path kubeConfigPath = Paths.get(FileUtils.getKubeConfigPath(shellDirectory));
FileUtils.createFileWith755(kubeConfigPath);
Files.write(kubeConfigPath, k8sConfigYaml.getBytes(), StandardOpenOption.APPEND);
log.info("Created kubernetes configuration file: {}.", kubeConfigPath);
return Collections.singletonList("export KUBECONFIG=" + kubeConfigPath);
}
private String shellBody() {
if (CollectionUtils.isEmpty(scripts)) {
return StringUtils.EMPTY;
}
String scriptBody = scripts
.stream()
.collect(Collectors.joining(System.lineSeparator()));
scriptBody = scriptBody.replaceAll("\\r\\n", System.lineSeparator());
return ParameterUtils.convertParameterPlaceholders(scriptBody, propertyMap);
}
private Path shellAbsolutePath() {
return Paths.get(shellDirectory, shellName + shellExtension());
}
private List<String> bootstrapCommandInSudoMode() {
if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) {
return bootstrapCommandInResourceLimitMode();
}
List<String> bootstrapCommand = new ArrayList<>();
bootstrapCommand.add("sudo");
if (StringUtils.isNotBlank(runUser)) {
bootstrapCommand.add("-u");
bootstrapCommand.add(runUser);
}
bootstrapCommand.add("-E");
bootstrapCommand.add(shellAbsolutePath().toString());
return bootstrapCommand;
}
private List<String> bootstrapCommandInNormalMode() {
List<String> bootstrapCommand = new ArrayList<>();
bootstrapCommand.add(shellInterpreter());
bootstrapCommand.add(shellAbsolutePath().toString());
return bootstrapCommand;
}
private List<String> bootstrapCommandInResourceLimitMode() {
List<String> bootstrapCommand = new ArrayList<>();
bootstrapCommand.add("sudo");
bootstrapCommand.add("systemd-run");
bootstrapCommand.add("-q");
bootstrapCommand.add("--scope");
if (cpuQuota == -1) {
bootstrapCommand.add("-p");
bootstrapCommand.add("CPUQuota=");
} else {
bootstrapCommand.add("-p");
bootstrapCommand.add(String.format("CPUQuota=%s%%", cpuQuota));
}
// use `man systemd.resource-control` to find available parameter
if (memoryQuota == -1) {
bootstrapCommand.add("-p");
bootstrapCommand.add(String.format("MemoryLimit=%s", "infinity"));
} else {
bootstrapCommand.add("-p");
bootstrapCommand.add(String.format("MemoryLimit=%sM", memoryQuota));
}
bootstrapCommand.add(String.format("--uid=%s", runUser));
return bootstrapCommand;
}
}

50
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptor.java

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell;
import java.io.File;
import java.io.IOException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class BaseShellInterceptor implements IShellInterceptor {
protected final String workingDirectory;
protected final List<String> executeCommands;
protected BaseShellInterceptor(List<String> executeCommands, String workingDirectory) {
this.executeCommands = executeCommands;
this.workingDirectory = workingDirectory;
}
@Override
public Process execute() throws IOException {
// init process builder
ProcessBuilder processBuilder = new ProcessBuilder();
// setting up a working directory
processBuilder.directory(new File(workingDirectory));
// merge error information to standard output stream
processBuilder.redirectErrorStream(true);
processBuilder.command(executeCommands);
log.info("Executing shell command : {}", String.join(" ", executeCommands));
return processBuilder.start();
}
}

141
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptorBuilder.java

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell;
import org.apache.commons.collections4.MapUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public abstract class BaseShellInterceptorBuilder<T extends BaseShellInterceptorBuilder<T, Y>, Y extends BaseShellInterceptor>
implements
IShellInterceptorBuilder<T, Y> {
protected String shellDirectory;
protected String shellName;
protected String runUser;
protected Integer cpuQuota;
protected Integer memoryQuota;
protected List<String> systemEnvs = new ArrayList<>();
protected List<String> customEnvScripts = new ArrayList<>();
protected String k8sConfigYaml;
protected Map<String, String> propertyMap = new HashMap<>();
protected boolean sudoEnable;
protected List<String> scripts = new ArrayList<>();
protected BaseShellInterceptorBuilder() {
}
@Override
public T newBuilder(T builder) {
T newBuilder = newBuilder();
newBuilder.shellDirectory = builder.shellDirectory;
newBuilder.shellName = builder.shellName;
newBuilder.runUser = builder.runUser;
newBuilder.cpuQuota = builder.cpuQuota;
newBuilder.memoryQuota = builder.memoryQuota;
newBuilder.systemEnvs = builder.systemEnvs;
newBuilder.customEnvScripts = builder.customEnvScripts;
newBuilder.k8sConfigYaml = builder.k8sConfigYaml;
newBuilder.propertyMap = builder.propertyMap;
newBuilder.sudoEnable = builder.sudoEnable;
newBuilder.scripts = builder.scripts;
return newBuilder;
}
@Override
public T shellDirectory(String shellDirectory) {
this.shellDirectory = shellDirectory;
return (T) this;
}
@Override
public T shellName(String shellFilename) {
this.shellName = shellFilename;
return (T) this;
}
@Override
public T runUser(String systemUser) {
this.runUser = systemUser;
return (T) this;
}
@Override
public T cpuQuota(Integer cpuQuota) {
this.cpuQuota = cpuQuota;
return (T) this;
}
@Override
public T memoryQuota(Integer memoryQuota) {
this.memoryQuota = memoryQuota;
return (T) this;
}
@Override
public T appendSystemEnv(String envFiles) {
systemEnvs.add(envFiles);
return (T) this;
}
@Override
public T appendCustomEnvScript(String customEnvScript) {
customEnvScripts.add(customEnvScript);
return (T) this;
}
@Override
public T k8sConfigYaml(String k8sConfigYaml) {
this.k8sConfigYaml = k8sConfigYaml;
return (T) this;
}
@Override
public T properties(Map<String, String> propertyMap) {
if (MapUtils.isNotEmpty(propertyMap)) {
this.propertyMap.putAll(propertyMap);
}
return (T) this;
}
@Override
public T sudoMode(boolean sudoEnable) {
this.sudoEnable = sudoEnable;
return (T) this;
}
@Override
public T appendScript(String script) {
scripts.add(script);
return (T) this;
}
}

116
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseWindowsShellInterceptorBuilder.java

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class BaseWindowsShellInterceptorBuilder<T extends BaseWindowsShellInterceptorBuilder<T, Y>, Y extends BaseShellInterceptor>
extends
BaseShellInterceptorBuilder<T, Y> {
protected void generateShellScript() throws IOException {
List<String> finalScripts = new ArrayList<>();
// add shell header
finalScripts.add(shellHeader());
finalScripts.add("cd /d %~dp0");
// add system env
finalScripts.addAll(systemEnvScript());
// add custom env
finalScripts.addAll(customEnvScript());
// add k8s config
finalScripts.addAll(k8sConfig());
// add shell body
finalScripts.add(shellBody());
// create shell file
String finalScript = finalScripts.stream().collect(Collectors.joining(System.lineSeparator()));
Path shellAbsolutePath = shellAbsolutePath();
FileUtils.createFileWith755(shellAbsolutePath);
Files.write(shellAbsolutePath, finalScript.getBytes(), StandardOpenOption.APPEND);
log.info("Final Shell file is : \n{}", finalScript);
}
private String shellBody() {
if (CollectionUtils.isEmpty(scripts)) {
return StringUtils.EMPTY;
}
String scriptBody = scripts
.stream()
.collect(Collectors.joining(System.lineSeparator()));
return ParameterUtils.convertParameterPlaceholders(scriptBody, propertyMap);
}
private Collection<String> k8sConfig() {
log.warn("k8s config is not supported in windows");
return Collections.emptyList();
}
protected List<String> generateBootstrapCommand() {
if (sudoEnable) {
log.warn("sudo is not supported in windows");
}
// todo: support tenant in widnows
List<String> bootstrapCommand = new ArrayList<>();
bootstrapCommand.add(shellInterpreter());
bootstrapCommand.add(shellAbsolutePath().toString());
return bootstrapCommand;
}
protected abstract String shellHeader();
protected abstract String shellInterpreter();
protected abstract String shellExtension();
private List<String> systemEnvScript() {
if (CollectionUtils.isEmpty(systemEnvs)) {
return Collections.emptyList();
}
return systemEnvs.stream()
.map(systemEnv -> "call " + systemEnv)
.collect(Collectors.toList());
}
private List<String> customEnvScript() {
if (CollectionUtils.isEmpty(customEnvScripts)) {
return Collections.emptyList();
}
return customEnvScripts;
}
private Path shellAbsolutePath() {
return Paths.get(shellDirectory, shellName + shellExtension());
}
}

30
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptor.java

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell;
import java.io.IOException;
/**
* This interface is used to execute shell commands.
* It should be created by @{@link IShellInterceptorBuilder}.
*/
public interface IShellInterceptor {
Process execute() throws IOException;
}

52
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptorBuilder.java

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell;
import java.io.IOException;
import java.util.Map;
public interface IShellInterceptorBuilder<T extends IShellInterceptorBuilder<T, Y>, Y extends IShellInterceptor> {
T newBuilder();
T newBuilder(T builder);
T shellDirectory(String directory);
T shellName(String shellFilename);
T runUser(String systemUser);
T cpuQuota(Integer cpuQuota);
T memoryQuota(Integer memoryQuota);
T appendSystemEnv(String envFiles);
T appendCustomEnvScript(String customEnvScript);
T k8sConfigYaml(String k8sConfigYaml);
T properties(Map<String, String> propertyMap);
T sudoMode(boolean sudoEnable);
T appendScript(String script);
Y build() throws IOException;
}

43
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/ShellInterceptorBuilderFactory.java

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.cmd.CmdShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.sh.ShShellInterceptorBuilder;
public class ShellInterceptorBuilderFactory {
private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash");
@SuppressWarnings("unchecked")
public static IShellInterceptorBuilder newBuilder() {
if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) {
return new BashShellInterceptorBuilder();
}
if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) {
return new ShShellInterceptorBuilder();
}
if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) {
return new CmdShellInterceptorBuilder();
}
throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE);
}
}

30
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptor.java

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell.bash;
import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor;
import java.util.List;
public class BashShellInterceptor extends BaseShellInterceptor {
public BashShellInterceptor(List<String> executeCommands, String workingDirectory) {
super(executeCommands, workingDirectory);
}
}

56
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptorBuilder.java

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell.bash;
import org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder;
import java.io.IOException;
import java.util.List;
public class BashShellInterceptorBuilder
extends
BaseLinuxShellInterceptorBuilder<BashShellInterceptorBuilder, BashShellInterceptor> {
@Override
public BashShellInterceptorBuilder newBuilder() {
return new BashShellInterceptorBuilder();
}
@Override
public BashShellInterceptor build() throws IOException {
generateShellScript();
List<String> bootstrapCommand = generateBootstrapCommand();
return new BashShellInterceptor(bootstrapCommand, shellDirectory);
}
@Override
protected String shellInterpreter() {
return "bash";
}
@Override
protected String shellExtension() {
return ".sh";
}
@Override
protected String shellHeader() {
return "#!/bin/bash";
}
}

29
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptor.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell.cmd;
import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor;
import java.util.List;
public class CmdShellInterceptor extends BaseShellInterceptor {
protected CmdShellInterceptor(List<String> executeCommands, String workingDirectory) {
super(executeCommands, workingDirectory);
}
}

55
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptorBuilder.java

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell.cmd;
import org.apache.dolphinscheduler.plugin.task.api.shell.BaseWindowsShellInterceptorBuilder;
import java.io.IOException;
import java.util.List;
public class CmdShellInterceptorBuilder
extends
BaseWindowsShellInterceptorBuilder<CmdShellInterceptorBuilder, CmdShellInterceptor> {
@Override
protected String shellHeader() {
return "@echo off";
}
@Override
protected String shellInterpreter() {
return "cmd.exe";
}
@Override
protected String shellExtension() {
return ".bat";
}
@Override
public CmdShellInterceptorBuilder newBuilder() {
return new CmdShellInterceptorBuilder();
}
@Override
public CmdShellInterceptor build() throws IOException {
generateShellScript();
List<String> bootstrapCommand = generateBootstrapCommand();
return new CmdShellInterceptor(bootstrapCommand, shellDirectory);
}
}

29
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptor.java

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell.sh;
import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor;
import java.util.List;
public class ShShellInterceptor extends BaseShellInterceptor {
protected ShShellInterceptor(List<String> executeCommands, String shellDirectory) {
super(executeCommands, shellDirectory);
}
}

60
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptorBuilder.java

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.shell.sh;
import org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder;
import java.io.IOException;
import java.util.List;
public class ShShellInterceptorBuilder
extends
BaseLinuxShellInterceptorBuilder<ShShellInterceptorBuilder, ShShellInterceptor> {
@Override
public ShShellInterceptorBuilder newBuilder() {
return new ShShellInterceptorBuilder();
}
@Override
public ShShellInterceptorBuilder k8sConfigYaml(String k8sConfigYaml) {
return null;
}
@Override
public ShShellInterceptor build() throws IOException {
generateShellScript();
List<String> bootstrapCommand = generateBootstrapCommand();
return new ShShellInterceptor(bootstrapCommand, shellDirectory);
}
@Override
protected String shellHeader() {
return "#!/bin/sh";
}
@Override
protected String shellInterpreter() {
return "sh";
}
@Override
protected String shellExtension() {
return ".sh";
}
}

61
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.plugin.task.chunjun;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
@ -30,25 +29,21 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.SystemUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* chunjun task
@ -102,19 +97,16 @@ public class ChunJunTask extends AbstractTask {
}
}
/**
* run chunjun process
*
* @throws TaskException exception
*/
@SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
String jsonFilePath = buildChunJunJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.properties(ParameterUtils.convert(paramsMap))
.appendScript(buildCommand(buildChunJunJsonFile(paramsMap)));
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
@ -167,25 +159,7 @@ public class ChunJunTask extends AbstractTask {
return fileName;
}
/**
* create command
*
* @return shell command file name
* @throws Exception if error throws Exception
*/
private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap) throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(),
SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
private String buildCommand(String jobConfigFilePath) {
// chunjun command
List<String> args = new ArrayList<>();
@ -215,24 +189,7 @@ public class ChunJunTask extends AbstractTask {
String command = String.join(" ", args);
// replace placeholder
String chunjunCommand = ParameterUtils.convertParameterPlaceholders(command, ParameterUtils.convert(paramsMap));
log.info("raw script : {}", chunjunCommand);
// create shell command file
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (SystemUtils.IS_OS_WINDOWS) {
Files.createFile(path);
} else {
Files.createFile(path, attr);
}
Files.write(path, chunjunCommand.getBytes(), StandardOpenOption.APPEND);
return fileName;
return command;
}
public String getExecMode(ChunJunParameters chunJunParameters) {

17
dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java

@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.dataquality.DataQualityParameters;
@ -57,6 +56,7 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* In DataQualityTask, the input parameters will be converted into DataQualityConfiguration,
@ -160,19 +160,16 @@ public class DataQualityTask extends AbstractYarnTask {
}
@Override
protected String buildCommand() {
protected String getScript() {
List<String> args = new ArrayList<>();
args.add(SPARK_COMMAND);
args.addAll(SparkArgsUtils.buildArgs(dataQualityParameters.getSparkParameters()));
return args.stream().collect(Collectors.joining(" "));
}
// replace placeholder
Map<String, Property> paramsMap = dqTaskExecutionContext.getPrepareParamsMap();
String command =
ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap));
log.info("data quality task command: {}", command);
return command;
@Override
protected Map<String, String> getProperties() {
return ParameterUtils.convert(dqTaskExecutionContext.getPrepareParamsMap());
}
protected void setMainJarName() {

60
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.datax;
import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.common.log.SensitiveDataConverter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
@ -33,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
@ -40,16 +41,11 @@ import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@ -59,7 +55,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import com.alibaba.druid.sql.ast.SQLStatement;
@ -152,21 +147,18 @@ public class DataxTask extends AbstractTask {
dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper());
}
/**
* run DataX process
*
* @throws TaskException if error throws Exception
*/
@SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
// run datax processDataSourceService
String jsonFilePath = buildDataxJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.properties(ParameterUtils.convert(paramsMap))
.appendScript(buildCommand(buildDataxJsonFile(paramsMap), paramsMap));
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
@ -385,23 +377,10 @@ public class DataxTask extends AbstractTask {
* create command
*
* @return shell command file name
* @throws Exception if error throws Exception
*/
private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap) throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(),
SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
return fileName;
}
protected String buildCommand(String jobConfigFilePath, Map<String, Property> paramsMap) {
// datax python command
String sbr = DATAX_PYTHON +
return DATAX_PYTHON +
" " +
DATAX_PATH +
" " +
@ -409,25 +388,6 @@ public class DataxTask extends AbstractTask {
addCustomParameters(paramsMap) +
" " +
jobConfigFilePath;
// replace placeholder
String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr, ParameterUtils.convert(paramsMap));
log.debug("raw script : {}", dataxCommand);
// create shell command file
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
if (SystemUtils.IS_OS_WINDOWS) {
Files.createFile(path);
} else {
Files.createFile(path, attr);
}
Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND);
return fileName;
}
private StringBuilder addCustomParameters(Map<String, Property> paramsMap) {

31
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java

@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.datax;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@ -40,8 +41,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceP
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -110,7 +109,7 @@ public class DataxTaskTest {
taskResponse.setStatus(TaskRunStatus.SUCCESS);
taskResponse.setExitStatusCode(0);
taskResponse.setProcessId(1);
when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse);
when(shellCommandExecutor.run(any(), eq(taskCallBack))).thenReturn(taskResponse);
dataxTask.handle(taskCallBack);
Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@ -122,14 +121,8 @@ public class DataxTaskTest {
boolean delete = jsonFile.delete();
Assertions.assertTrue(delete);
File shellCommandFile = SystemUtils.IS_OS_WINDOWS ? new File("/tmp/execution/app-id_node.bat")
: new File("/tmp/execution/app-id_node.sh");
InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath());
String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream);
Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " +
" /tmp/execution/app-id_job.json");
delete = shellCommandFile.delete();
Assertions.assertTrue(delete);
Assertions.assertEquals(dataxTask.buildCommand("/tmp/execution/app-id_job.json", null),
"python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" /tmp/execution/app-id_job.json");
}
@Test
@ -151,7 +144,7 @@ public class DataxTaskTest {
taskResponse.setStatus(TaskRunStatus.SUCCESS);
taskResponse.setExitStatusCode(0);
taskResponse.setProcessId(1);
when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse);
when(shellCommandExecutor.run(any(), eq(taskCallBack))).thenReturn(taskResponse);
dataxTask.handle(taskCallBack);
Assertions.assertEquals(0, dataxTask.getExitStatusCode());
@ -163,14 +156,8 @@ public class DataxTaskTest {
boolean delete = jsonFile.delete();
Assertions.assertTrue(delete);
File shellCommandFile = SystemUtils.IS_OS_WINDOWS ? new File("/tmp/execution/app-id_node.bat")
: new File("/tmp/execution/app-id_node.sh");
InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath());
String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream);
Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " +
"-p \"-DDT='DT' -DDS='DS'\" /tmp/execution/app-id_job.json");
delete = shellCommandFile.delete();
Assertions.assertTrue(delete);
Assertions.assertEquals(dataxTask.buildCommand("/tmp/execution/app-id_job.json", createPrepareParamsMap()),
"python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" -p \"-DDT='DT' -DDS='DS'\" /tmp/execution/app-id_job.json");
}
@Test
@ -187,7 +174,7 @@ public class DataxTaskTest {
shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
when(shellCommandExecutor.run(any(), eq(taskCallBack)))
.thenThrow(new InterruptedException("Command execution failed"));
Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
}
@ -206,7 +193,7 @@ public class DataxTaskTest {
shellCommandExecutorFiled.setAccessible(true);
shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor);
when(shellCommandExecutor.run(anyString(), eq(taskCallBack)))
when(shellCommandExecutor.run(any(), eq(taskCallBack)))
.thenThrow(new IOException("Command execution failed"));
Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack));
}

7
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java

@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import java.util.ArrayList;
import java.util.List;
@ -78,8 +80,9 @@ public class DvcTask extends AbstractTask {
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// construct process
String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.appendScript(buildCommand());
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
parameters.dealOutParam(shellCommandExecutor.getVarPool());

15
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java

@ -23,12 +23,13 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.collections4.CollectionUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class FlinkStreamTask extends FlinkTask implements StreamTask {
@ -66,15 +67,15 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
* @return command
*/
@Override
protected String buildCommand() {
protected String getScript() {
// flink run/run-application [OPTIONS] <jar-file> <arguments>
List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
return args.stream().collect(Collectors.joining(" "));
}
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
log.info("flink task command : {}", command);
return command;
@Override
protected Map<String, String> getProperties() {
return taskExecutionContext.getDefinedParams();
}
@Override

15
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

@ -22,11 +22,12 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class FlinkTask extends AbstractYarnTask {
@ -69,15 +70,15 @@ public class FlinkTask extends AbstractYarnTask {
* @return command
*/
@Override
protected String buildCommand() {
protected String getScript() {
// flink run/run-application [OPTIONS] <jar-file> <arguments>
List<String> args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters);
return args.stream().collect(Collectors.joining(" "));
}
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams());
log.info("flink task command : {}", command);
return command;
@Override
protected Map<String, String> getProperties() {
return taskExecutionContext.getDefinedParams();
}
@Override

6
dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java

@ -31,6 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.io.FileUtils;
@ -88,7 +90,9 @@ public class HiveCliTask extends AbstractRemoteTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
final TaskResponse taskResponse = shellCommandExecutor.run(buildCommand(), taskCallBack);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.appendScript(buildCommand());
final TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());

6
dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java

@ -32,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.java.exception.JavaSourceFileExistException;
@ -122,7 +124,9 @@ public class JavaTask extends AbstractTask {
throw new RunTypeNotFoundException("run type is required, but it is null now.");
}
Preconditions.checkNotNull(command, "command not be null.");
TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.appendScript(command);
TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
log.info("java task run result: {}", taskResponse);
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());

19
dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java

@ -26,9 +26,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.lang3.StringUtils;
@ -38,6 +39,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -82,7 +84,11 @@ public class JupyterTask extends AbstractRemoteTask {
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
.appendScript(buildCommand());
TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(response.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(response.getProcessId());
@ -147,14 +153,7 @@ public class JupyterTask extends AbstractRemoteTask {
args.add(String.format(JupyterConstants.REMOVE_ENV, timestamp));
}
// replace placeholder
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
String command = ParameterUtils
.convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap));
log.info("jupyter task command: {}", command);
return command;
return args.stream().collect(Collectors.joining(" "));
}
protected String readCondaPath() {

35
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java

@ -26,9 +26,10 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.lang3.BooleanUtils;
@ -37,7 +38,6 @@ import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -100,8 +100,10 @@ public class LinkisTask extends AbstractRemoteTask {
public void submitApplication() throws TaskException {
try {
// construct process
String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
.appendScript(buildCommand());
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, null);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(findTaskId(commandExecuteResult.getResultString()));
setProcessId(commandExecuteResult.getProcessId());
@ -127,7 +129,9 @@ public class LinkisTask extends AbstractRemoteTask {
args.add(Constants.STATUS_OPTIONS);
args.add(taskId);
String command = String.join(Constants.SPACE, args);
TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.appendScript(command);
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, null);
String status = findStatus(commandExecuteResult.getResultString());
LinkisJobStatus jobStatus = LinkisJobStatus.convertFromJobStatusString(status);
switch (jobStatus) {
@ -160,7 +164,10 @@ public class LinkisTask extends AbstractRemoteTask {
args.add(Constants.KILL_OPTIONS);
args.add(taskId);
String command = String.join(Constants.SPACE, args);
shellCommandExecutor.run(command, null);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.appendScript(command);
shellCommandExecutor.run(shellActuatorBuilder, null);
setExitStatusCode(EXIT_CODE_KILL);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@ -176,7 +183,7 @@ public class LinkisTask extends AbstractRemoteTask {
List<String> args = new ArrayList<>();
args.addAll(buildOptions());
String command = String.join(Constants.SPACE, args);
String command = String.join(" ", args);
log.info("Linkis task command: {}", command);
return command;
@ -187,20 +194,13 @@ public class LinkisTask extends AbstractRemoteTask {
args.add(Constants.SHELL_CLI_OPTIONS);
args.add(Constants.ASYNC_OPTIONS);
if (BooleanUtils.isTrue(linkisParameters.getUseCustom())) {
args.add(buildCustomConfigContent());
args.add(linkisParameters.getRawScript());
} else {
args.add(buildParamConfigContent());
}
return args;
}
private String buildCustomConfigContent() {
log.info("raw custom config content : {}", linkisParameters.getRawScript());
String script = linkisParameters.getRawScript().replaceAll("\\r\\n", "\n");
script = parseScript(script);
return script;
}
private String buildParamConfigContent() {
log.info("raw param config content : {}", linkisParameters.getParamScript());
String script = "";
@ -210,7 +210,6 @@ public class LinkisTask extends AbstractRemoteTask {
.concat(Constants.SPACE)
.concat(param.getValue());
}
script = parseScript(script);
return script;
}
@ -248,8 +247,4 @@ public class LinkisTask extends AbstractRemoteTask {
return linkisParameters;
}
private String parseScript(String script) {
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap));
}
}

18
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java

@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.lang3.StringUtils;
@ -38,6 +40,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* shell task
@ -110,12 +113,15 @@ public class MlflowTask extends AbstractTask {
}
}
@SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// construct process
String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.properties(ParameterUtils.convert(getParamsMap()))
.appendScript(buildCommand());
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
int exitCode;
if (mlflowParameters.getIsDeployDocker()) {
exitCode = checkDockerHealth();
@ -165,7 +171,6 @@ public class MlflowTask extends AbstractTask {
*/
private String buildCommandForMlflowProjects() {
Map<String, Property> paramsMap = getParamsMap();
List<String> args = new ArrayList<>();
args.add(
String.format(MlflowConstants.EXPORT_MLFLOW_TRACKING_URI_ENV, mlflowParameters.getMlflowTrackingUri()));
@ -219,8 +224,7 @@ public class MlflowTask extends AbstractTask {
runCommand = runCommand + " " + versionString;
}
args.add(runCommand);
return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap));
return args.stream().collect(Collectors.joining("\n"));
}
/**
@ -228,7 +232,6 @@ public class MlflowTask extends AbstractTask {
*/
protected String buildCommandForMlflowModels() {
Map<String, Property> paramsMap = getParamsMap();
List<String> args = new ArrayList<>();
args.add(
String.format(MlflowConstants.EXPORT_MLFLOW_TRACKING_URI_ENV, mlflowParameters.getMlflowTrackingUri()));
@ -247,8 +250,7 @@ public class MlflowTask extends AbstractTask {
args.add(String.format(MlflowConstants.DOCKER_RUN, containerName, mlflowParameters.getDeployPort(),
imageName));
}
return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap));
return args.stream().collect(Collectors.joining("\n"));
}
private Map<String, Property> getParamsMap() {

13
dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java

@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* mapreduce task
@ -90,19 +91,19 @@ public class MapReduceTask extends AbstractYarnTask {
* @return command
*/
@Override
protected String buildCommand() {
protected String getScript() {
// hadoop jar <jar> [mainClass] [GENERIC_OPTIONS] args...
List<String> args = new ArrayList<>();
args.add(MAPREDUCE_COMMAND);
// other parameters
args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters, taskExecutionContext));
return args.stream().collect(Collectors.joining(" "));
}
String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args),
taskExecutionContext.getDefinedParams());
log.info("mapreduce task command: {}", command);
return command;
@Override
protected Map<String, String> getProperties() {
return taskExecutionContext.getDefinedParams();
}
@Override

12
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.io.FileUtils;
@ -86,6 +88,7 @@ public class PythonTask extends AbstractTask {
}
}
@SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
@ -96,9 +99,11 @@ public class PythonTask extends AbstractTask {
// create this file
createPythonCommandFileIfNotExists(pythonScriptContent, pythonScriptFile);
String command = buildPythonExecuteCommand(pythonScriptFile);
TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.appendScript(buildPythonExecuteCommand(pythonScriptFile));
TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
@ -165,9 +170,8 @@ public class PythonTask extends AbstractTask {
* build python script content
*
* @return raw python script
* @throws Exception exception
*/
protected String buildPythonScriptContent() throws Exception {
protected String buildPythonScriptContent() {
log.info("raw python script : {}", pythonParameters.getRawScript());
String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
Map<String, Property> paramsMap = mergeParamsWithContext(pythonParameters);

17
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java

@ -24,14 +24,15 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class PytorchTask extends AbstractTask {
@ -64,11 +65,15 @@ public class PytorchTask extends AbstractTask {
pythonEnvManager.setCondaPythonVersion(pytorchParameters.getCondaPythonVersion());
}
@SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
String command = buildPythonExecuteCommand();
TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()))
.appendScript(buildPythonExecuteCommand());
TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
@ -116,9 +121,7 @@ public class PytorchTask extends AbstractTask {
args.add(String.format("%s %s", getPythonCommand(), pytorchParameters.getScriptPath()));
}
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap));
return args.stream().collect(Collectors.joining("\n"));
}
private String getPythonCommand() {

7
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.io.FileUtils;
@ -100,7 +102,10 @@ public class SeatunnelTask extends AbstractRemoteTask {
try {
// construct process
String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.appendScript(command);
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());

59
dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java

@ -19,26 +19,18 @@ package org.apache.dolphinscheduler.plugin.task.shell;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.commons.lang3.SystemUtils;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Map;
/**
* shell task
*/
@ -84,12 +76,15 @@ public class ShellTask extends AbstractTask {
}
}
@SuppressWarnings("unchecked")
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// construct process
String command = buildCommand();
TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack);
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
.properties(ParameterUtils.convert(shellParameters.getLocalParametersMap()))
.appendScript(shellParameters.getRawScript());
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
@ -115,47 +110,9 @@ public class ShellTask extends AbstractTask {
}
}
/**
* create command
*
* @return file name
* @throws Exception exception
*/
private String buildCommand() throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.%s",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId(), SystemUtils.IS_OS_WINDOWS ? "bat" : "sh");
File file = new File(fileName);
Path path = file.toPath();
if (Files.exists(path)) {
// this shouldn't happen
log.warn("The command file: {} is already exist", path);
return fileName;
}
String script = shellParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator());
script = parseScript(script);
shellParameters.setRawScript(script);
log.info("raw script : {}", shellParameters.getRawScript());
log.info("task execute path : {}", taskExecutionContext.getExecutePath());
FileUtils.createFileWith755(path);
Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
return fileName;
}
@Override
public AbstractParameters getParameters() {
return shellParameters;
}
private String parseScript(String script) {
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap));
}
}

20
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java

@ -51,6 +51,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import io.fabric8.kubernetes.client.Config;
@ -88,13 +89,8 @@ public class SparkTask extends AbstractYarnTask {
log.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(sparkParameters));
}
/**
* create command
*
* @return command
*/
@Override
protected String buildCommand() {
protected String getScript() {
/**
* (1) spark-submit [options] <app jar | python file> [app arguments]
* (2) spark-sql [options] -f <filename>
@ -116,14 +112,12 @@ public class SparkTask extends AbstractYarnTask {
args.addAll(populateSparkOptions());
// replace placeholder
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
String command =
ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap));
log.info("spark task command: {}", command);
return args.stream().collect(Collectors.joining(" "));
}
return command;
@Override
protected Map<String, String> getProperties() {
return ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap());
}
/**

4
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java

@ -54,7 +54,7 @@ public class SparkTaskTest {
"--conf spark.executor.memory=1G " +
"--name sparksql " +
"-f /tmp/5536_node.sql",
sparkTask.buildCommand());
sparkTask.getScript());
}
@Test
@ -79,7 +79,7 @@ public class SparkTaskTest {
"--conf spark.executor.memory=1G " +
"--name spark " +
"/lib/dolphinscheduler-task-spark.jar",
sparkTask.buildCommand());
sparkTask.getScript());
}
private String buildSparkParametersWithSparkSql() {

14
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java

@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.sqoop.generator.SqoopJobGenerator;
@ -72,17 +71,16 @@ public class SqoopTask extends AbstractYarnTask {
}
@Override
protected String buildCommand() {
protected String getScript() {
// get sqoop scripts
SqoopJobGenerator generator = new SqoopJobGenerator();
String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
return generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap));
log.info("sqoop script: {}", resultScripts);
return resultScripts;
}
@Override
protected Map<String, String> getProperties() {
return ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap());
}
@Override

Loading…
Cancel
Save