diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties index e7c119197e..3107929d56 100644 --- a/dolphinscheduler-common/src/main/resources/common.properties +++ b/dolphinscheduler-common/src/main/resources/common.properties @@ -153,6 +153,8 @@ appId.collect=log # The default env list will be load by Shell task, e.g. /etc/profile,~/.bash_profile shell.env_source_list= +# The interceptor type of Shell task, e.g. bash, sh, cmd +shell.interceptor.type=bash # Whether to enable remote logging remote.logging.enable=false diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 0967c642fc..4c8edc6dac 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -25,22 +25,20 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD import org.apache.dolphinscheduler.common.constants.TenantConstants; import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; -import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; -import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptor; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.SystemUtils; import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; import java.io.InputStreamReader; import java.lang.reflect.Field; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -114,111 +112,61 @@ public abstract class AbstractCommandExecutor { } } - public AbstractCommandExecutor(LinkedBlockingQueue logBuffer) { - this.logBuffer = logBuffer; - } - - /** - * build process - * - * @param commandFile command file - * @throws IOException IO Exception - */ - private void buildProcess(String commandFile) throws IOException { - // setting up user to run commands - List command = new LinkedList<>(); - - // init process builder - ProcessBuilder processBuilder = new ProcessBuilder(); - // setting up a working directory - processBuilder.directory(new File(taskRequest.getExecutePath())); - // merge error information to standard output stream - processBuilder.redirectErrorStream(true); - - // if sudo.enable=true,setting up user to run commands - // todo: Create a ShellExecuteClass to generate the shell and execute shell commands - if (OSUtils.isSudoEnable() && !TenantConstants.DEFAULT_TENANT_CODE.equals(taskRequest.getTenantCode())) { - if (SystemUtils.IS_OS_LINUX - && PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) { - generateCgroupCommand(command); - } else { - command.add("sudo"); - command.add("-u"); - command.add(taskRequest.getTenantCode()); - command.add("-E"); - } - } - command.add(commandInterpreter()); - command.add(commandFile); - - // setting commands - processBuilder.command(command); - process = processBuilder.start(); - - printCommand(command); - } - - /** - * generate systemd command. - * eg: sudo systemd-run -q --scope -p CPUQuota=100% -p MemoryLimit=200M --uid=root - * @param command command - */ - private void generateCgroupCommand(List command) { - Integer cpuQuota = taskRequest.getCpuQuota(); - Integer memoryMax = taskRequest.getMemoryMax(); - - command.add("sudo"); - command.add("systemd-run"); - command.add("-q"); - command.add("--scope"); - - if (cpuQuota == -1) { - command.add("-p"); - command.add("CPUQuota="); - } else { - command.add("-p"); - command.add(String.format("CPUQuota=%s%%", taskRequest.getCpuQuota())); - } - - // use `man systemd.resource-control` to find available parameter - if (memoryMax == -1) { - command.add("-p"); - command.add(String.format("MemoryLimit=%s", "infinity")); - } else { - command.add("-p"); - command.add(String.format("MemoryLimit=%sM", taskRequest.getMemoryMax())); - } - - command.add(String.format("--uid=%s", taskRequest.getTenantCode())); - } - - public TaskResponse run(String execCommand, TaskCallBack taskCallBack) throws Exception { + // todo: We need to build the IShellActuator in outer class, since different task may have specific logic to build + // the IShellActuator + public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder, + TaskCallBack taskCallBack) throws Exception { TaskResponse result = new TaskResponse(); int taskInstanceId = taskRequest.getTaskInstanceId(); if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { + logger.warn( + "Cannot find the taskInstance: {} from TaskExecutionContextCacheManager, the task might already been killed", + taskInstanceId); result.setExitStatusCode(EXIT_CODE_KILL); return result; } - if (StringUtils.isEmpty(execCommand)) { - TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); - return result; + iShellInterceptorBuilder = iShellInterceptorBuilder + .shellDirectory(taskRequest.getExecutePath()) + .shellName(taskRequest.getTaskAppId()); + // Set system env + if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) { + ShellUtils.ENV_SOURCE_LIST.forEach(iShellInterceptorBuilder::appendSystemEnv); + } + // Set custom env + if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) { + iShellInterceptorBuilder.appendCustomEnvScript(taskRequest.getEnvironmentConfig()); + } + // Set k8s config (This is only work in Linux) + if (taskRequest.getK8sTaskExecutionContext() != null) { + iShellInterceptorBuilder.k8sConfigYaml(taskRequest.getK8sTaskExecutionContext().getConfigYaml()); + } + // Set sudo (This is only work in Linux) + iShellInterceptorBuilder.sudoMode(OSUtils.isSudoEnable()); + // Set tenant (This is only work in Linux) + if (TenantConstants.DEFAULT_TENANT_CODE.equals(taskRequest.getTenantCode())) { + iShellInterceptorBuilder.runUser(TenantConstants.BOOTSTRAPT_SYSTEM_USER); + } else { + iShellInterceptorBuilder.runUser(taskRequest.getTenantCode()); + } + // Set CPU Quota (This is only work in Linux) + if (taskRequest.getCpuQuota() != null) { + iShellInterceptorBuilder.cpuQuota(taskRequest.getCpuQuota()); + } + // Set memory Quota (This is only work in Linux) + if (taskRequest.getMemoryMax() != null) { + iShellInterceptorBuilder.memoryQuota(taskRequest.getMemoryMax()); } - String commandFilePath = buildCommandFilePath(); - - // create command file if not exists - createCommandFileIfNotExists(execCommand, commandFilePath); - - // build process - buildProcess(commandFilePath); + IShellInterceptor iShellInterceptor = iShellInterceptorBuilder.build(); + process = iShellInterceptor.execute(); // parse process output - parseProcessOutput(process); + parseProcessOutput(this.process); // collect pod log collectPodLogIfNeeded(); - int processId = getProcessId(process); + int processId = getProcessId(this.process); result.setProcessId(processId); @@ -243,7 +191,7 @@ public abstract class AbstractCommandExecutor { } // waiting for the run to finish - boolean status = process.waitFor(remainTime, TimeUnit.SECONDS); + boolean status = this.process.waitFor(remainTime, TimeUnit.SECONDS); TaskExecutionStatus kubernetesStatus = ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId()); @@ -272,7 +220,7 @@ public abstract class AbstractCommandExecutor { if (status && kubernetesStatus.isSuccess()) { // SHELL task state - result.setExitStatusCode(process.exitValue()); + result.setExitStatusCode(this.process.exitValue()); } else { logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", @@ -280,7 +228,7 @@ public abstract class AbstractCommandExecutor { result.setExitStatusCode(EXIT_CODE_FAILURE); cancelApplication(); } - int exitCode = process.exitValue(); + int exitCode = this.process.exitValue(); String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited."; logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode); @@ -446,9 +394,4 @@ public abstract class AbstractCommandExecutor { return processId; } - protected abstract String buildCommandFilePath(); - - protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; - - protected abstract String commandInterpreter(); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java index 406e78a80b..69f6aceb99 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java @@ -22,31 +22,17 @@ import static org.apache.dolphinscheduler.common.constants.Constants.DEFAULT_COL import org.apache.dolphinscheduler.common.utils.PropertyUtils; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import java.util.List; -import java.util.regex.Pattern; +import java.util.Map; -/** - * abstract yarn task - */ public abstract class AbstractYarnTask extends AbstractRemoteTask { - /** - * process task - */ private ShellCommandExecutor shellCommandExecutor; - /** - * rules for extracting application ID - */ - protected static final Pattern YARN_APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX); - - /** - * Abstract Yarn Task - * - * @param taskRequest taskRequest - */ public AbstractYarnTask(TaskExecutionContext taskRequest) { super(taskRequest); this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, @@ -58,8 +44,12 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask { @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .properties(getProperties()) + // todo: do we need to move the replace to subclass? + .appendScript(getScript().replaceAll("\\r\\n", System.lineSeparator())); // SHELL task exit code - TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack); + TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(response.getExitStatusCode()); // set appIds setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); @@ -115,10 +105,12 @@ public abstract class AbstractYarnTask extends AbstractRemoteTask { } /** - * create command - * - * @return String + * Get the script used to bootstrap the task */ - protected abstract String buildCommand(); + protected abstract String getScript(); + /** + * Get the properties of the task used to replace the placeholders in the script. + */ + protected abstract Map getProperties(); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java index 9bcc765bb3..2dbea62287 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java @@ -17,19 +17,6 @@ package org.apache.dolphinscheduler.plugin.task.api; -import org.apache.dolphinscheduler.common.utils.FileUtils; -import org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils; - -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.SystemUtils; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.util.Objects; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; @@ -40,104 +27,10 @@ import org.slf4j.Logger; */ public class ShellCommandExecutor extends AbstractCommandExecutor { - /** - * For Unix-like, using bash - */ - private static final String SH = "bash"; - - /** - * For Windows, using cmd.exe - */ - private static final String CMD = "cmd.exe"; - - /** - * constructor - * - * @param logHandler logHandler - * @param taskRequest taskRequest - * @param logger logger - */ public ShellCommandExecutor(Consumer> logHandler, TaskExecutionContext taskRequest, Logger logger) { super(logHandler, taskRequest, logger); } - public ShellCommandExecutor(LinkedBlockingQueue logBuffer) { - super(logBuffer); - } - - @Override - protected String buildCommandFilePath() { - // command file - return String.format("%s/%s.%s", taskRequest.getExecutePath(), taskRequest.getTaskAppId(), - SystemUtils.IS_OS_WINDOWS ? "bat" : "command"); - } - - /** - * create command file if not exists - * - * @param execCommand exec command - * @param commandFile command file - * @throws IOException io exception - */ - @Override - protected void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException { - // create if non existence - logger.info("Begin to create command file:{}", commandFile); - - Path commandFilePath = Paths.get(commandFile); - if (Files.exists(commandFilePath)) { - logger.warn("The command file: {} is already exist, will not create a again", commandFile); - return; - } - - StringBuilder sb = new StringBuilder(); - if (SystemUtils.IS_OS_WINDOWS) { - sb.append("@echo off").append(System.lineSeparator()); - sb.append("cd /d %~dp0").append(System.lineSeparator()); - if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) { - for (String envSourceFile : ShellUtils.ENV_SOURCE_LIST) { - sb.append("call ").append(envSourceFile).append("\n"); - } - } - if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) { - sb.append(taskRequest.getEnvironmentConfig()).append(System.lineSeparator()); - } - } else { - sb.append("#!/bin/bash").append(System.lineSeparator()); - sb.append("BASEDIR=$(cd `dirname $0`; pwd)").append(System.lineSeparator()); - sb.append("cd $BASEDIR").append(System.lineSeparator()); - if (CollectionUtils.isNotEmpty(ShellUtils.ENV_SOURCE_LIST)) { - for (String envSourceFile : ShellUtils.ENV_SOURCE_LIST) { - sb.append("source ").append(envSourceFile).append("\n"); - } - } - if (StringUtils.isNotBlank(taskRequest.getEnvironmentConfig())) { - sb.append(taskRequest.getEnvironmentConfig()).append(System.lineSeparator()); - } - if (Objects.nonNull(taskRequest.getK8sTaskExecutionContext())) { - String configYaml = taskRequest.getK8sTaskExecutionContext().getConfigYaml(); - Path kubeConfigPath = Paths.get(org.apache.dolphinscheduler.common.utils.FileUtils - .getKubeConfigPath(taskRequest.getExecutePath())); - FileUtils.createFileWith755(kubeConfigPath); - Files.write(kubeConfigPath, configYaml.getBytes(), StandardOpenOption.APPEND); - sb.append("export KUBECONFIG=" + kubeConfigPath).append(System.lineSeparator()); - logger.info("Create kubernetes configuration file: {}.", kubeConfigPath); - } - } - sb.append(execCommand); - String commandContent = sb.toString(); - - FileUtils.createFileWith755(commandFilePath); - Files.write(commandFilePath, commandContent.getBytes(), StandardOpenOption.APPEND); - - logger.info("Success create command file, command: {}", commandContent); - } - - @Override - protected String commandInterpreter() { - return SystemUtils.IS_OS_WINDOWS ? CMD : SH; - } - } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index b8d125e5a9..de9a727567 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -173,6 +173,7 @@ public class TaskExecutionContext implements Serializable { /** * definedParams + * // todo: we need to rename definedParams, prepareParamsMap, paramsMap, this is confusing */ private Map definedParams; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java new file mode 100644 index 0000000000..c0f03fdccd --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseLinuxShellInterceptorBuilder.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell; + +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.AbstractCommandExecutorConstants; +import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class BaseLinuxShellInterceptorBuilder, Y extends BaseShellInterceptor> + extends + BaseShellInterceptorBuilder { + + protected void generateShellScript() throws IOException { + List finalScripts = new ArrayList<>(); + // add shell header + finalScripts.add(shellHeader()); + finalScripts.add("BASEDIR=$(cd `dirname $0`; pwd)"); + finalScripts.add("cd $BASEDIR"); + // add system env + finalScripts.addAll(systemEnvScript()); + // add custom env + finalScripts.addAll(customEnvScript()); + // add k8s config + finalScripts.addAll(k8sConfig()); + // add shell body + finalScripts.add(shellBody()); + // create shell file + String finalScript = finalScripts.stream().collect(Collectors.joining(System.lineSeparator())); + Path shellAbsolutePath = shellAbsolutePath(); + FileUtils.createFileWith755(shellAbsolutePath); + Files.write(shellAbsolutePath, finalScript.getBytes(), StandardOpenOption.APPEND); + log.info("Final Shell file is : \n{}", finalScript); + } + + protected List generateBootstrapCommand() { + if (sudoEnable) { + return bootstrapCommandInSudoMode(); + } + return bootstrapCommandInNormalMode(); + } + + protected abstract String shellHeader(); + + protected abstract String shellInterpreter(); + + protected abstract String shellExtension(); + + private List systemEnvScript() { + if (CollectionUtils.isEmpty(systemEnvs)) { + return Collections.emptyList(); + } + return systemEnvs + .stream() + .map(systemEnv -> "source " + systemEnv).collect(Collectors.toList()); + } + + private List customEnvScript() { + if (CollectionUtils.isEmpty(customEnvScripts)) { + return Collections.emptyList(); + } + return customEnvScripts; + } + + private List k8sConfig() throws IOException { + if (StringUtils.isEmpty(k8sConfigYaml)) { + return Collections.emptyList(); + } + Path kubeConfigPath = Paths.get(FileUtils.getKubeConfigPath(shellDirectory)); + FileUtils.createFileWith755(kubeConfigPath); + Files.write(kubeConfigPath, k8sConfigYaml.getBytes(), StandardOpenOption.APPEND); + log.info("Created kubernetes configuration file: {}.", kubeConfigPath); + return Collections.singletonList("export KUBECONFIG=" + kubeConfigPath); + } + + private String shellBody() { + if (CollectionUtils.isEmpty(scripts)) { + return StringUtils.EMPTY; + } + String scriptBody = scripts + .stream() + .collect(Collectors.joining(System.lineSeparator())); + scriptBody = scriptBody.replaceAll("\\r\\n", System.lineSeparator()); + return ParameterUtils.convertParameterPlaceholders(scriptBody, propertyMap); + } + + private Path shellAbsolutePath() { + return Paths.get(shellDirectory, shellName + shellExtension()); + } + + private List bootstrapCommandInSudoMode() { + if (PropertyUtils.getBoolean(AbstractCommandExecutorConstants.TASK_RESOURCE_LIMIT_STATE)) { + return bootstrapCommandInResourceLimitMode(); + } + List bootstrapCommand = new ArrayList<>(); + bootstrapCommand.add("sudo"); + if (StringUtils.isNotBlank(runUser)) { + bootstrapCommand.add("-u"); + bootstrapCommand.add(runUser); + } + bootstrapCommand.add("-E"); + bootstrapCommand.add(shellAbsolutePath().toString()); + return bootstrapCommand; + } + + private List bootstrapCommandInNormalMode() { + List bootstrapCommand = new ArrayList<>(); + bootstrapCommand.add(shellInterpreter()); + bootstrapCommand.add(shellAbsolutePath().toString()); + return bootstrapCommand; + } + + private List bootstrapCommandInResourceLimitMode() { + List bootstrapCommand = new ArrayList<>(); + bootstrapCommand.add("sudo"); + bootstrapCommand.add("systemd-run"); + bootstrapCommand.add("-q"); + bootstrapCommand.add("--scope"); + + if (cpuQuota == -1) { + bootstrapCommand.add("-p"); + bootstrapCommand.add("CPUQuota="); + } else { + bootstrapCommand.add("-p"); + bootstrapCommand.add(String.format("CPUQuota=%s%%", cpuQuota)); + } + + // use `man systemd.resource-control` to find available parameter + if (memoryQuota == -1) { + bootstrapCommand.add("-p"); + bootstrapCommand.add(String.format("MemoryLimit=%s", "infinity")); + } else { + bootstrapCommand.add("-p"); + bootstrapCommand.add(String.format("MemoryLimit=%sM", memoryQuota)); + } + + bootstrapCommand.add(String.format("--uid=%s", runUser)); + return bootstrapCommand; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptor.java new file mode 100644 index 0000000000..0f82956204 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class BaseShellInterceptor implements IShellInterceptor { + + protected final String workingDirectory; + protected final List executeCommands; + + protected BaseShellInterceptor(List executeCommands, String workingDirectory) { + this.executeCommands = executeCommands; + this.workingDirectory = workingDirectory; + } + + @Override + public Process execute() throws IOException { + // init process builder + ProcessBuilder processBuilder = new ProcessBuilder(); + // setting up a working directory + processBuilder.directory(new File(workingDirectory)); + // merge error information to standard output stream + processBuilder.redirectErrorStream(true); + processBuilder.command(executeCommands); + log.info("Executing shell command : {}", String.join(" ", executeCommands)); + return processBuilder.start(); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptorBuilder.java new file mode 100644 index 0000000000..329e79661c --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseShellInterceptorBuilder.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell; + +import org.apache.commons.collections4.MapUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public abstract class BaseShellInterceptorBuilder, Y extends BaseShellInterceptor> + implements + IShellInterceptorBuilder { + + protected String shellDirectory; + + protected String shellName; + + protected String runUser; + + protected Integer cpuQuota; + + protected Integer memoryQuota; + + protected List systemEnvs = new ArrayList<>(); + + protected List customEnvScripts = new ArrayList<>(); + + protected String k8sConfigYaml; + + protected Map propertyMap = new HashMap<>(); + + protected boolean sudoEnable; + + protected List scripts = new ArrayList<>(); + + protected BaseShellInterceptorBuilder() { + } + + @Override + public T newBuilder(T builder) { + T newBuilder = newBuilder(); + newBuilder.shellDirectory = builder.shellDirectory; + newBuilder.shellName = builder.shellName; + newBuilder.runUser = builder.runUser; + newBuilder.cpuQuota = builder.cpuQuota; + newBuilder.memoryQuota = builder.memoryQuota; + newBuilder.systemEnvs = builder.systemEnvs; + newBuilder.customEnvScripts = builder.customEnvScripts; + newBuilder.k8sConfigYaml = builder.k8sConfigYaml; + newBuilder.propertyMap = builder.propertyMap; + newBuilder.sudoEnable = builder.sudoEnable; + newBuilder.scripts = builder.scripts; + return newBuilder; + } + + @Override + public T shellDirectory(String shellDirectory) { + this.shellDirectory = shellDirectory; + return (T) this; + } + + @Override + public T shellName(String shellFilename) { + this.shellName = shellFilename; + return (T) this; + } + + @Override + public T runUser(String systemUser) { + this.runUser = systemUser; + return (T) this; + } + + @Override + public T cpuQuota(Integer cpuQuota) { + this.cpuQuota = cpuQuota; + return (T) this; + } + + @Override + public T memoryQuota(Integer memoryQuota) { + this.memoryQuota = memoryQuota; + return (T) this; + } + + @Override + public T appendSystemEnv(String envFiles) { + systemEnvs.add(envFiles); + return (T) this; + } + + @Override + public T appendCustomEnvScript(String customEnvScript) { + customEnvScripts.add(customEnvScript); + return (T) this; + } + + @Override + public T k8sConfigYaml(String k8sConfigYaml) { + this.k8sConfigYaml = k8sConfigYaml; + return (T) this; + } + + @Override + public T properties(Map propertyMap) { + if (MapUtils.isNotEmpty(propertyMap)) { + this.propertyMap.putAll(propertyMap); + } + return (T) this; + } + + @Override + public T sudoMode(boolean sudoEnable) { + this.sudoEnable = sudoEnable; + return (T) this; + } + + @Override + public T appendScript(String script) { + scripts.add(script); + return (T) this; + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseWindowsShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseWindowsShellInterceptorBuilder.java new file mode 100644 index 0000000000..9d31bf34c3 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/BaseWindowsShellInterceptorBuilder.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell; + +import org.apache.dolphinscheduler.common.utils.FileUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public abstract class BaseWindowsShellInterceptorBuilder, Y extends BaseShellInterceptor> + extends + BaseShellInterceptorBuilder { + + protected void generateShellScript() throws IOException { + List finalScripts = new ArrayList<>(); + // add shell header + finalScripts.add(shellHeader()); + finalScripts.add("cd /d %~dp0"); + // add system env + finalScripts.addAll(systemEnvScript()); + // add custom env + finalScripts.addAll(customEnvScript()); + // add k8s config + finalScripts.addAll(k8sConfig()); + // add shell body + finalScripts.add(shellBody()); + // create shell file + String finalScript = finalScripts.stream().collect(Collectors.joining(System.lineSeparator())); + Path shellAbsolutePath = shellAbsolutePath(); + FileUtils.createFileWith755(shellAbsolutePath); + Files.write(shellAbsolutePath, finalScript.getBytes(), StandardOpenOption.APPEND); + log.info("Final Shell file is : \n{}", finalScript); + } + + private String shellBody() { + if (CollectionUtils.isEmpty(scripts)) { + return StringUtils.EMPTY; + } + String scriptBody = scripts + .stream() + .collect(Collectors.joining(System.lineSeparator())); + return ParameterUtils.convertParameterPlaceholders(scriptBody, propertyMap); + } + + private Collection k8sConfig() { + log.warn("k8s config is not supported in windows"); + return Collections.emptyList(); + } + + protected List generateBootstrapCommand() { + if (sudoEnable) { + log.warn("sudo is not supported in windows"); + } + // todo: support tenant in widnows + List bootstrapCommand = new ArrayList<>(); + bootstrapCommand.add(shellInterpreter()); + bootstrapCommand.add(shellAbsolutePath().toString()); + return bootstrapCommand; + } + + protected abstract String shellHeader(); + + protected abstract String shellInterpreter(); + + protected abstract String shellExtension(); + + private List systemEnvScript() { + if (CollectionUtils.isEmpty(systemEnvs)) { + return Collections.emptyList(); + } + return systemEnvs.stream() + .map(systemEnv -> "call " + systemEnv) + .collect(Collectors.toList()); + } + + private List customEnvScript() { + if (CollectionUtils.isEmpty(customEnvScripts)) { + return Collections.emptyList(); + } + return customEnvScripts; + } + + private Path shellAbsolutePath() { + return Paths.get(shellDirectory, shellName + shellExtension()); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptor.java new file mode 100644 index 0000000000..45b9bf96f7 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell; + +import java.io.IOException; + +/** + * This interface is used to execute shell commands. + * It should be created by @{@link IShellInterceptorBuilder}. + */ +public interface IShellInterceptor { + + Process execute() throws IOException; + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptorBuilder.java new file mode 100644 index 0000000000..cab50e3bdf --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/IShellInterceptorBuilder.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell; + +import java.io.IOException; +import java.util.Map; + +public interface IShellInterceptorBuilder, Y extends IShellInterceptor> { + + T newBuilder(); + + T newBuilder(T builder); + + T shellDirectory(String directory); + + T shellName(String shellFilename); + + T runUser(String systemUser); + + T cpuQuota(Integer cpuQuota); + + T memoryQuota(Integer memoryQuota); + + T appendSystemEnv(String envFiles); + + T appendCustomEnvScript(String customEnvScript); + + T k8sConfigYaml(String k8sConfigYaml); + + T properties(Map propertyMap); + + T sudoMode(boolean sudoEnable); + + T appendScript(String script); + + Y build() throws IOException; +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/ShellInterceptorBuilderFactory.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/ShellInterceptorBuilderFactory.java new file mode 100644 index 0000000000..9654d091cd --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/ShellInterceptorBuilderFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell; + +import org.apache.dolphinscheduler.common.utils.PropertyUtils; +import org.apache.dolphinscheduler.plugin.task.api.shell.bash.BashShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.cmd.CmdShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.sh.ShShellInterceptorBuilder; + +public class ShellInterceptorBuilderFactory { + + private final static String INTERCEPTOR_TYPE = PropertyUtils.getString("shell.interceptor.type", "bash"); + + @SuppressWarnings("unchecked") + public static IShellInterceptorBuilder newBuilder() { + if (INTERCEPTOR_TYPE.equalsIgnoreCase("bash")) { + return new BashShellInterceptorBuilder(); + } + if (INTERCEPTOR_TYPE.equalsIgnoreCase("sh")) { + return new ShShellInterceptorBuilder(); + } + if (INTERCEPTOR_TYPE.equalsIgnoreCase("cmd")) { + return new CmdShellInterceptorBuilder(); + } + throw new IllegalArgumentException("not support shell type: " + INTERCEPTOR_TYPE); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptor.java new file mode 100644 index 0000000000..f0a237c742 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell.bash; + +import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor; + +import java.util.List; + +public class BashShellInterceptor extends BaseShellInterceptor { + + public BashShellInterceptor(List executeCommands, String workingDirectory) { + super(executeCommands, workingDirectory); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptorBuilder.java new file mode 100644 index 0000000000..15ffd2cc88 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/bash/BashShellInterceptorBuilder.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell.bash; + +import org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder; + +import java.io.IOException; +import java.util.List; + +public class BashShellInterceptorBuilder + extends + BaseLinuxShellInterceptorBuilder { + + @Override + public BashShellInterceptorBuilder newBuilder() { + return new BashShellInterceptorBuilder(); + } + + @Override + public BashShellInterceptor build() throws IOException { + generateShellScript(); + List bootstrapCommand = generateBootstrapCommand(); + return new BashShellInterceptor(bootstrapCommand, shellDirectory); + } + + @Override + protected String shellInterpreter() { + return "bash"; + } + + @Override + protected String shellExtension() { + return ".sh"; + } + + @Override + protected String shellHeader() { + return "#!/bin/bash"; + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptor.java new file mode 100644 index 0000000000..cb4d8b05d6 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell.cmd; + +import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor; + +import java.util.List; + +public class CmdShellInterceptor extends BaseShellInterceptor { + + protected CmdShellInterceptor(List executeCommands, String workingDirectory) { + super(executeCommands, workingDirectory); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptorBuilder.java new file mode 100644 index 0000000000..61f7725fe0 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/cmd/CmdShellInterceptorBuilder.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell.cmd; + +import org.apache.dolphinscheduler.plugin.task.api.shell.BaseWindowsShellInterceptorBuilder; + +import java.io.IOException; +import java.util.List; + +public class CmdShellInterceptorBuilder + extends + BaseWindowsShellInterceptorBuilder { + + @Override + protected String shellHeader() { + return "@echo off"; + } + + @Override + protected String shellInterpreter() { + return "cmd.exe"; + } + + @Override + protected String shellExtension() { + return ".bat"; + } + + @Override + public CmdShellInterceptorBuilder newBuilder() { + return new CmdShellInterceptorBuilder(); + } + + @Override + public CmdShellInterceptor build() throws IOException { + generateShellScript(); + List bootstrapCommand = generateBootstrapCommand(); + return new CmdShellInterceptor(bootstrapCommand, shellDirectory); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptor.java new file mode 100644 index 0000000000..fae2a8462c --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell.sh; + +import org.apache.dolphinscheduler.plugin.task.api.shell.BaseShellInterceptor; + +import java.util.List; + +public class ShShellInterceptor extends BaseShellInterceptor { + + protected ShShellInterceptor(List executeCommands, String shellDirectory) { + super(executeCommands, shellDirectory); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptorBuilder.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptorBuilder.java new file mode 100644 index 0000000000..cef1e5843a --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/shell/sh/ShShellInterceptorBuilder.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.shell.sh; + +import org.apache.dolphinscheduler.plugin.task.api.shell.BaseLinuxShellInterceptorBuilder; + +import java.io.IOException; +import java.util.List; + +public class ShShellInterceptorBuilder + extends + BaseLinuxShellInterceptorBuilder { + + @Override + public ShShellInterceptorBuilder newBuilder() { + return new ShShellInterceptorBuilder(); + } + + @Override + public ShShellInterceptorBuilder k8sConfigYaml(String k8sConfigYaml) { + return null; + } + + @Override + public ShShellInterceptor build() throws IOException { + generateShellScript(); + List bootstrapCommand = generateBootstrapCommand(); + return new ShShellInterceptor(bootstrapCommand, shellDirectory); + } + + @Override + protected String shellHeader() { + return "#!/bin/sh"; + } + + @Override + protected String shellInterpreter() { + return "sh"; + } + + @Override + protected String shellExtension() { + return ".sh"; + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java index 92bd3ea69e..2e4731d704 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.plugin.task.chunjun; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; @@ -30,25 +29,21 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.dolphinscheduler.spi.enums.Flag; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.SystemUtils; import java.io.File; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.nio.file.attribute.FileAttribute; -import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; /** * chunjun task @@ -102,19 +97,16 @@ public class ChunJunTask extends AbstractTask { } } - /** - * run chunjun process - * - * @throws TaskException exception - */ + @SuppressWarnings("unchecked") @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { Map paramsMap = taskExecutionContext.getPrepareParamsMap(); - String jsonFilePath = buildChunJunJsonFile(paramsMap); - String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); - TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .properties(ParameterUtils.convert(paramsMap)) + .appendScript(buildCommand(buildChunJunJsonFile(paramsMap))); + TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); @@ -167,25 +159,7 @@ public class ChunJunTask extends AbstractTask { return fileName; } - /** - * create command - * - * @return shell command file name - * @throws Exception if error throws Exception - */ - private String buildShellCommandFile(String jobConfigFilePath, Map paramsMap) throws Exception { - // generate scripts - String fileName = String.format("%s/%s_node.%s", - taskExecutionContext.getExecutePath(), - taskExecutionContext.getTaskAppId(), - SystemUtils.IS_OS_WINDOWS ? "bat" : "sh"); - - Path path = new File(fileName).toPath(); - - if (Files.exists(path)) { - return fileName; - } - + private String buildCommand(String jobConfigFilePath) { // chunjun command List args = new ArrayList<>(); @@ -215,24 +189,7 @@ public class ChunJunTask extends AbstractTask { String command = String.join(" ", args); - // replace placeholder - String chunjunCommand = ParameterUtils.convertParameterPlaceholders(command, ParameterUtils.convert(paramsMap)); - - log.info("raw script : {}", chunjunCommand); - - // create shell command file - Set perms = PosixFilePermissions.fromString(RWXR_XR_X); - FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); - - if (SystemUtils.IS_OS_WINDOWS) { - Files.createFile(path); - } else { - Files.createFile(path, attr); - } - - Files.write(path, chunjunCommand.getBytes(), StandardOpenOption.APPEND); - - return fileName; + return command; } public String getExecMode(ChunJunParameters chunJunParameters) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java index 5fe451d916..381b8cbd37 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java @@ -38,7 +38,6 @@ import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; import org.apache.dolphinscheduler.plugin.task.api.DataQualityTaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.parameters.dataquality.DataQualityParameters; @@ -57,6 +56,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * In DataQualityTask, the input parameters will be converted into DataQualityConfiguration, @@ -160,19 +160,16 @@ public class DataQualityTask extends AbstractYarnTask { } @Override - protected String buildCommand() { + protected String getScript() { List args = new ArrayList<>(); - args.add(SPARK_COMMAND); args.addAll(SparkArgsUtils.buildArgs(dataQualityParameters.getSparkParameters())); + return args.stream().collect(Collectors.joining(" ")); + } - // replace placeholder - Map paramsMap = dqTaskExecutionContext.getPrepareParamsMap(); - String command = - ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap)); - log.info("data quality task command: {}", command); - - return command; + @Override + protected Map getProperties() { + return ParameterUtils.convert(dqTaskExecutionContext.getPrepareParamsMap()); } protected void setMainJarName() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index 4477b42eb7..80de6cc7cd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.plugin.task.datax; import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X; import org.apache.dolphinscheduler.common.log.SensitiveDataConverter; import org.apache.dolphinscheduler.common.utils.JSONUtils; @@ -33,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -40,16 +41,11 @@ import org.apache.dolphinscheduler.spi.enums.Flag; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.SystemUtils; import java.io.File; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.nio.file.attribute.FileAttribute; -import java.nio.file.attribute.PosixFilePermission; -import java.nio.file.attribute.PosixFilePermissions; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -59,7 +55,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutionException; import com.alibaba.druid.sql.ast.SQLStatement; @@ -152,21 +147,18 @@ public class DataxTask extends AbstractTask { dataXParameters.generateExtendedContext(taskExecutionContext.getResourceParametersHelper()); } - /** - * run DataX process - * - * @throws TaskException if error throws Exception - */ + @SuppressWarnings("unchecked") @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { // replace placeholder,and combine local and global parameters Map paramsMap = taskExecutionContext.getPrepareParamsMap(); - // run datax processDataSourceService - String jsonFilePath = buildDataxJsonFile(paramsMap); - String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap); - TaskResponse commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath, taskCallBack); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .properties(ParameterUtils.convert(paramsMap)) + .appendScript(buildCommand(buildDataxJsonFile(paramsMap), paramsMap)); + + TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setProcessId(commandExecuteResult.getProcessId()); @@ -385,23 +377,10 @@ public class DataxTask extends AbstractTask { * create command * * @return shell command file name - * @throws Exception if error throws Exception */ - private String buildShellCommandFile(String jobConfigFilePath, Map paramsMap) throws Exception { - // generate scripts - String fileName = String.format("%s/%s_node.%s", - taskExecutionContext.getExecutePath(), - taskExecutionContext.getTaskAppId(), - SystemUtils.IS_OS_WINDOWS ? "bat" : "sh"); - - Path path = new File(fileName).toPath(); - - if (Files.exists(path)) { - return fileName; - } - + protected String buildCommand(String jobConfigFilePath, Map paramsMap) { // datax python command - String sbr = DATAX_PYTHON + + return DATAX_PYTHON + " " + DATAX_PATH + " " + @@ -409,25 +388,6 @@ public class DataxTask extends AbstractTask { addCustomParameters(paramsMap) + " " + jobConfigFilePath; - - // replace placeholder - String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr, ParameterUtils.convert(paramsMap)); - - log.debug("raw script : {}", dataxCommand); - - // create shell command file - Set perms = PosixFilePermissions.fromString(RWXR_XR_X); - FileAttribute> attr = PosixFilePermissions.asFileAttribute(perms); - - if (SystemUtils.IS_OS_WINDOWS) { - Files.createFile(path); - } else { - Files.createFile(path, attr); - } - - Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND); - - return fileName; } private StringBuilder addCustomParameters(Map paramsMap) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java index d5054e836d..e423fcb89d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/test/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTaskTest.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.plugin.task.datax; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -40,8 +41,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceP import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; -import org.apache.commons.lang3.SystemUtils; - import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -110,7 +109,7 @@ public class DataxTaskTest { taskResponse.setStatus(TaskRunStatus.SUCCESS); taskResponse.setExitStatusCode(0); taskResponse.setProcessId(1); - when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse); + when(shellCommandExecutor.run(any(), eq(taskCallBack))).thenReturn(taskResponse); dataxTask.handle(taskCallBack); Assertions.assertEquals(0, dataxTask.getExitStatusCode()); @@ -122,14 +121,8 @@ public class DataxTaskTest { boolean delete = jsonFile.delete(); Assertions.assertTrue(delete); - File shellCommandFile = SystemUtils.IS_OS_WINDOWS ? new File("/tmp/execution/app-id_node.bat") - : new File("/tmp/execution/app-id_node.sh"); - InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath()); - String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream); - Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " + - " /tmp/execution/app-id_job.json"); - delete = shellCommandFile.delete(); - Assertions.assertTrue(delete); + Assertions.assertEquals(dataxTask.buildCommand("/tmp/execution/app-id_job.json", null), + "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" /tmp/execution/app-id_job.json"); } @Test @@ -151,7 +144,7 @@ public class DataxTaskTest { taskResponse.setStatus(TaskRunStatus.SUCCESS); taskResponse.setExitStatusCode(0); taskResponse.setProcessId(1); - when(shellCommandExecutor.run(anyString(), eq(taskCallBack))).thenReturn(taskResponse); + when(shellCommandExecutor.run(any(), eq(taskCallBack))).thenReturn(taskResponse); dataxTask.handle(taskCallBack); Assertions.assertEquals(0, dataxTask.getExitStatusCode()); @@ -163,14 +156,8 @@ public class DataxTaskTest { boolean delete = jsonFile.delete(); Assertions.assertTrue(delete); - File shellCommandFile = SystemUtils.IS_OS_WINDOWS ? new File("/tmp/execution/app-id_node.bat") - : new File("/tmp/execution/app-id_node.sh"); - InputStream shellCommandInputStream = Files.newInputStream(shellCommandFile.toPath()); - String shellCommandStr = FileUtils.readFile2Str(shellCommandInputStream); - Assertions.assertEquals(shellCommandStr, "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" " + - "-p \"-DDT='DT' -DDS='DS'\" /tmp/execution/app-id_job.json"); - delete = shellCommandFile.delete(); - Assertions.assertTrue(delete); + Assertions.assertEquals(dataxTask.buildCommand("/tmp/execution/app-id_job.json", createPrepareParamsMap()), + "python2.7 ${DATAX_HOME}/bin/datax.py --jvm=\"-Xms1G -Xmx1G\" -p \"-DDT='DT' -DDS='DS'\" /tmp/execution/app-id_job.json"); } @Test @@ -187,7 +174,7 @@ public class DataxTaskTest { shellCommandExecutorFiled.setAccessible(true); shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor); - when(shellCommandExecutor.run(anyString(), eq(taskCallBack))) + when(shellCommandExecutor.run(any(), eq(taskCallBack))) .thenThrow(new InterruptedException("Command execution failed")); Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack)); } @@ -206,7 +193,7 @@ public class DataxTaskTest { shellCommandExecutorFiled.setAccessible(true); shellCommandExecutorFiled.set(dataxTask, shellCommandExecutor); - when(shellCommandExecutor.run(anyString(), eq(taskCallBack))) + when(shellCommandExecutor.run(any(), eq(taskCallBack))) .thenThrow(new IOException("Command execution failed")); Assertions.assertThrows(TaskException.class, () -> dataxTask.handle(taskCallBack)); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java index 37d2d2f8f9..032d953858 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java @@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import java.util.ArrayList; import java.util.List; @@ -78,8 +80,9 @@ public class DvcTask extends AbstractTask { public void handle(TaskCallBack taskCallBack) throws TaskException { try { // construct process - String command = buildCommand(); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .appendScript(buildCommand()); + TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setProcessId(commandExecuteResult.getProcessId()); parameters.dealOutParam(shellCommandExecutor.getVarPool()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java index bc53696077..3a71df62c7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java @@ -23,12 +23,13 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.stream.StreamTask; -import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.commons.collections4.CollectionUtils; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; public class FlinkStreamTask extends FlinkTask implements StreamTask { @@ -66,15 +67,15 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask { * @return command */ @Override - protected String buildCommand() { + protected String getScript() { // flink run/run-application [OPTIONS] List args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters); + return args.stream().collect(Collectors.joining(" ")); + } - String command = ParameterUtils - .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); - - log.info("flink task command : {}", command); - return command; + @Override + protected Map getProperties() { + return taskExecutionContext.getDefinedParams(); } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index 33afd7fc7f..75764c3677 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -22,11 +22,12 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; -import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import java.util.List; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; public class FlinkTask extends AbstractYarnTask { @@ -69,15 +70,15 @@ public class FlinkTask extends AbstractYarnTask { * @return command */ @Override - protected String buildCommand() { + protected String getScript() { // flink run/run-application [OPTIONS] List args = FlinkArgsUtils.buildRunCommandLine(taskExecutionContext, flinkParameters); + return args.stream().collect(Collectors.joining(" ")); + } - String command = ParameterUtils - .convertParameterPlaceholders(String.join(" ", args), taskExecutionContext.getDefinedParams()); - - log.info("flink task command : {}", command); - return command; + @Override + protected Map getProperties() { + return taskExecutionContext.getDefinedParams(); } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java index 8ba8409567..28443423c2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java @@ -31,6 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.commons.io.FileUtils; @@ -88,7 +90,9 @@ public class HiveCliTask extends AbstractRemoteTask { @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { - final TaskResponse taskResponse = shellCommandExecutor.run(buildCommand(), taskCallBack); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .appendScript(buildCommand()); + final TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(taskResponse.getExitStatusCode()); setAppIds(taskResponse.getAppIds()); setProcessId(taskResponse.getProcessId()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java index 9486ba5003..e9850b6459 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java @@ -32,6 +32,8 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.java.exception.JavaSourceFileExistException; @@ -122,7 +124,9 @@ public class JavaTask extends AbstractTask { throw new RunTypeNotFoundException("run type is required, but it is null now."); } Preconditions.checkNotNull(command, "command not be null."); - TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .appendScript(command); + TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); log.info("java task run result: {}", taskResponse); setExitStatusCode(taskResponse.getExitStatusCode()); setAppIds(taskResponse.getAppIds()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java index e3058ee329..735b6237e2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java @@ -26,9 +26,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.commons.lang3.StringUtils; @@ -38,6 +39,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import com.fasterxml.jackson.databind.ObjectMapper; @@ -82,7 +84,11 @@ public class JupyterTask extends AbstractRemoteTask { @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { - TaskResponse response = shellCommandExecutor.run(buildCommand(), taskCallBack); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap())) + .appendScript(buildCommand()); + + TaskResponse response = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(response.getExitStatusCode()); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(response.getProcessId()); @@ -147,14 +153,7 @@ public class JupyterTask extends AbstractRemoteTask { args.add(String.format(JupyterConstants.REMOVE_ENV, timestamp)); } - // replace placeholder - Map paramsMap = taskExecutionContext.getPrepareParamsMap(); - String command = ParameterUtils - .convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap)); - - log.info("jupyter task command: {}", command); - - return command; + return args.stream().collect(Collectors.joining(" ")); } protected String readCondaPath() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java index 132e6f6e56..c091ee6f64 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java @@ -26,9 +26,10 @@ import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.commons.lang3.BooleanUtils; @@ -37,7 +38,6 @@ import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -100,8 +100,10 @@ public class LinkisTask extends AbstractRemoteTask { public void submitApplication() throws TaskException { try { // construct process - String command = buildCommand(); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap())) + .appendScript(buildCommand()); + TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, null); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(findTaskId(commandExecuteResult.getResultString())); setProcessId(commandExecuteResult.getProcessId()); @@ -127,7 +129,9 @@ public class LinkisTask extends AbstractRemoteTask { args.add(Constants.STATUS_OPTIONS); args.add(taskId); String command = String.join(Constants.SPACE, args); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command, null); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .appendScript(command); + TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, null); String status = findStatus(commandExecuteResult.getResultString()); LinkisJobStatus jobStatus = LinkisJobStatus.convertFromJobStatusString(status); switch (jobStatus) { @@ -160,7 +164,10 @@ public class LinkisTask extends AbstractRemoteTask { args.add(Constants.KILL_OPTIONS); args.add(taskId); String command = String.join(Constants.SPACE, args); - shellCommandExecutor.run(command, null); + + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .appendScript(command); + shellCommandExecutor.run(shellActuatorBuilder, null); setExitStatusCode(EXIT_CODE_KILL); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -176,7 +183,7 @@ public class LinkisTask extends AbstractRemoteTask { List args = new ArrayList<>(); args.addAll(buildOptions()); - String command = String.join(Constants.SPACE, args); + String command = String.join(" ", args); log.info("Linkis task command: {}", command); return command; @@ -187,20 +194,13 @@ public class LinkisTask extends AbstractRemoteTask { args.add(Constants.SHELL_CLI_OPTIONS); args.add(Constants.ASYNC_OPTIONS); if (BooleanUtils.isTrue(linkisParameters.getUseCustom())) { - args.add(buildCustomConfigContent()); + args.add(linkisParameters.getRawScript()); } else { args.add(buildParamConfigContent()); } return args; } - private String buildCustomConfigContent() { - log.info("raw custom config content : {}", linkisParameters.getRawScript()); - String script = linkisParameters.getRawScript().replaceAll("\\r\\n", "\n"); - script = parseScript(script); - return script; - } - private String buildParamConfigContent() { log.info("raw param config content : {}", linkisParameters.getParamScript()); String script = ""; @@ -210,7 +210,6 @@ public class LinkisTask extends AbstractRemoteTask { .concat(Constants.SPACE) .concat(param.getValue()); } - script = parseScript(script); return script; } @@ -248,8 +247,4 @@ public class LinkisTask extends AbstractRemoteTask { return linkisParameters; } - private String parseScript(String script) { - Map paramsMap = taskExecutionContext.getPrepareParamsMap(); - return ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap)); - } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java index e4fe074c05..3c1f73747c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java @@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.commons.lang3.StringUtils; @@ -38,6 +40,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * shell task @@ -110,12 +113,15 @@ public class MlflowTask extends AbstractTask { } } + @SuppressWarnings("unchecked") @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { // construct process - String command = buildCommand(); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .properties(ParameterUtils.convert(getParamsMap())) + .appendScript(buildCommand()); + TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); int exitCode; if (mlflowParameters.getIsDeployDocker()) { exitCode = checkDockerHealth(); @@ -165,7 +171,6 @@ public class MlflowTask extends AbstractTask { */ private String buildCommandForMlflowProjects() { - Map paramsMap = getParamsMap(); List args = new ArrayList<>(); args.add( String.format(MlflowConstants.EXPORT_MLFLOW_TRACKING_URI_ENV, mlflowParameters.getMlflowTrackingUri())); @@ -219,8 +224,7 @@ public class MlflowTask extends AbstractTask { runCommand = runCommand + " " + versionString; } args.add(runCommand); - - return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap)); + return args.stream().collect(Collectors.joining("\n")); } /** @@ -228,7 +232,6 @@ public class MlflowTask extends AbstractTask { */ protected String buildCommandForMlflowModels() { - Map paramsMap = getParamsMap(); List args = new ArrayList<>(); args.add( String.format(MlflowConstants.EXPORT_MLFLOW_TRACKING_URI_ENV, mlflowParameters.getMlflowTrackingUri())); @@ -247,8 +250,7 @@ public class MlflowTask extends AbstractTask { args.add(String.format(MlflowConstants.DOCKER_RUN, containerName, mlflowParameters.getDeployPort(), imageName)); } - - return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap)); + return args.stream().collect(Collectors.joining("\n")); } private Map getParamsMap() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java index 2ae5c6e707..dd51314dfa 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** * mapreduce task @@ -90,19 +91,19 @@ public class MapReduceTask extends AbstractYarnTask { * @return command */ @Override - protected String buildCommand() { + protected String getScript() { // hadoop jar [mainClass] [GENERIC_OPTIONS] args... List args = new ArrayList<>(); args.add(MAPREDUCE_COMMAND); // other parameters args.addAll(MapReduceArgsUtils.buildArgs(mapreduceParameters, taskExecutionContext)); + return args.stream().collect(Collectors.joining(" ")); + } - String command = ParameterUtils.convertParameterPlaceholders(String.join(" ", args), - taskExecutionContext.getDefinedParams()); - log.info("mapreduce task command: {}", command); - - return command; + @Override + protected Map getProperties() { + return taskExecutionContext.getDefinedParams(); } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java index f60d5f1107..4b4d8cbd66 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java @@ -27,6 +27,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.commons.io.FileUtils; @@ -86,6 +88,7 @@ public class PythonTask extends AbstractTask { } } + @SuppressWarnings("unchecked") @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { @@ -96,9 +99,11 @@ public class PythonTask extends AbstractTask { // create this file createPythonCommandFileIfNotExists(pythonScriptContent, pythonScriptFile); - String command = buildPythonExecuteCommand(pythonScriptFile); - TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .appendScript(buildPythonExecuteCommand(pythonScriptFile)); + + TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(taskResponse.getExitStatusCode()); setProcessId(taskResponse.getProcessId()); setVarPool(shellCommandExecutor.getVarPool()); @@ -165,9 +170,8 @@ public class PythonTask extends AbstractTask { * build python script content * * @return raw python script - * @throws Exception exception */ - protected String buildPythonScriptContent() throws Exception { + protected String buildPythonScriptContent() { log.info("raw python script : {}", pythonParameters.getRawScript()); String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator()); Map paramsMap = mergeParamsWithContext(pythonParameters); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java index afe371628c..806750877c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java @@ -24,14 +24,15 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.stream.Collectors; public class PytorchTask extends AbstractTask { @@ -64,11 +65,15 @@ public class PytorchTask extends AbstractTask { pythonEnvManager.setCondaPythonVersion(pytorchParameters.getCondaPythonVersion()); } + @SuppressWarnings("unchecked") @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { - String command = buildPythonExecuteCommand(); - TaskResponse taskResponse = shellCommandExecutor.run(command, taskCallBack); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .properties(ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap())) + .appendScript(buildPythonExecuteCommand()); + + TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(taskResponse.getExitStatusCode()); setProcessId(taskResponse.getProcessId()); setVarPool(shellCommandExecutor.getVarPool()); @@ -116,9 +121,7 @@ public class PytorchTask extends AbstractTask { args.add(String.format("%s %s", getPythonCommand(), pytorchParameters.getScriptPath())); } - - Map paramsMap = taskExecutionContext.getPrepareParamsMap(); - return ParameterUtils.convertParameterPlaceholders(String.join("\n", args), ParameterUtils.convert(paramsMap)); + return args.stream().collect(Collectors.joining("\n")); } private String getPythonCommand() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java index bbf42ed8fa..0b837fa568 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java @@ -30,6 +30,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.commons.io.FileUtils; @@ -100,7 +102,10 @@ public class SeatunnelTask extends AbstractRemoteTask { try { // construct process String command = buildCommand(); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .appendScript(command); + + TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(commandExecuteResult.getProcessId()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java index 7d8fdd2e5a..c3c2bf62a1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java @@ -19,26 +19,18 @@ package org.apache.dolphinscheduler.plugin.task.shell; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; -import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractTask; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; +import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; +import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; -import org.apache.commons.lang3.SystemUtils; - -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardOpenOption; -import java.util.Map; - /** * shell task */ @@ -84,12 +76,15 @@ public class ShellTask extends AbstractTask { } } + @SuppressWarnings("unchecked") @Override public void handle(TaskCallBack taskCallBack) throws TaskException { try { - // construct process - String command = buildCommand(); - TaskResponse commandExecuteResult = shellCommandExecutor.run(command, taskCallBack); + IShellInterceptorBuilder shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder() + .properties(ParameterUtils.convert(shellParameters.getLocalParametersMap())) + .appendScript(shellParameters.getRawScript()); + + TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setProcessId(commandExecuteResult.getProcessId()); shellParameters.dealOutParam(shellCommandExecutor.getVarPool()); @@ -115,47 +110,9 @@ public class ShellTask extends AbstractTask { } } - /** - * create command - * - * @return file name - * @throws Exception exception - */ - private String buildCommand() throws Exception { - // generate scripts - String fileName = String.format("%s/%s_node.%s", - taskExecutionContext.getExecutePath(), - taskExecutionContext.getTaskAppId(), SystemUtils.IS_OS_WINDOWS ? "bat" : "sh"); - - File file = new File(fileName); - Path path = file.toPath(); - - if (Files.exists(path)) { - // this shouldn't happen - log.warn("The command file: {} is already exist", path); - return fileName; - } - - String script = shellParameters.getRawScript().replaceAll("\\r\\n", System.lineSeparator()); - script = parseScript(script); - shellParameters.setRawScript(script); - - log.info("raw script : {}", shellParameters.getRawScript()); - log.info("task execute path : {}", taskExecutionContext.getExecutePath()); - - FileUtils.createFileWith755(path); - Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND); - - return fileName; - } - @Override public AbstractParameters getParameters() { return shellParameters; } - private String parseScript(String script) { - Map paramsMap = taskExecutionContext.getPrepareParamsMap(); - return ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap)); - } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java index 675320d49f..ef0b71c8fe 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java @@ -51,6 +51,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import io.fabric8.kubernetes.client.Config; @@ -88,13 +89,8 @@ public class SparkTask extends AbstractYarnTask { log.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(sparkParameters)); } - /** - * create command - * - * @return command - */ @Override - protected String buildCommand() { + protected String getScript() { /** * (1) spark-submit [options] [app arguments] * (2) spark-sql [options] -f @@ -116,14 +112,12 @@ public class SparkTask extends AbstractYarnTask { args.addAll(populateSparkOptions()); // replace placeholder - Map paramsMap = taskExecutionContext.getPrepareParamsMap(); - - String command = - ParameterUtils.convertParameterPlaceholders(String.join(" ", args), ParameterUtils.convert(paramsMap)); - - log.info("spark task command: {}", command); + return args.stream().collect(Collectors.joining(" ")); + } - return command; + @Override + protected Map getProperties() { + return ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java index 14ccbcd56f..ba031bb304 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/test/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTaskTest.java @@ -54,7 +54,7 @@ public class SparkTaskTest { "--conf spark.executor.memory=1G " + "--name sparksql " + "-f /tmp/5536_node.sql", - sparkTask.buildCommand()); + sparkTask.getScript()); } @Test @@ -79,7 +79,7 @@ public class SparkTaskTest { "--conf spark.executor.memory=1G " + "--name spark " + "/lib/dolphinscheduler-task-spark.jar", - sparkTask.buildCommand()); + sparkTask.getScript()); } private String buildSparkParametersWithSparkSql() { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java index b8d35e4fdf..016a07a56b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java @@ -22,7 +22,6 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; import org.apache.dolphinscheduler.plugin.task.sqoop.generator.SqoopJobGenerator; @@ -72,17 +71,16 @@ public class SqoopTask extends AbstractYarnTask { } @Override - protected String buildCommand() { + protected String getScript() { // get sqoop scripts SqoopJobGenerator generator = new SqoopJobGenerator(); - String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext); + return generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext); - Map paramsMap = taskExecutionContext.getPrepareParamsMap(); - - String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParameterUtils.convert(paramsMap)); - log.info("sqoop script: {}", resultScripts); - return resultScripts; + } + @Override + protected Map getProperties() { + return ParameterUtils.convert(taskExecutionContext.getPrepareParamsMap()); } @Override