diff --git a/docs/docs/en/guide/parameter/context.md b/docs/docs/en/guide/parameter/context.md index 15cf0f4aed..9d6131d92e 100644 --- a/docs/docs/en/guide/parameter/context.md +++ b/docs/docs/en/guide/parameter/context.md @@ -122,7 +122,7 @@ Although the two parameters var1 and var2 are output in the A task, only the `OU #### Pass parameter from Kubernetes task to downstream -Different programming languages may use different logging frameworks in Kubernetes tasks. To be compatible with these frameworks, DolphinScheduler provides a universal logging data format `${(key=value)dsVal}`. Users can output log data in the format `${(key=value)dsVal}` in the terminal logs of their applications, where `key` is the corresponding parameter prop and `value` is the value of that parameter. DolphinScheduler will capture the `${(key=value)dsVal}` in the output logs to capture the parameters and pass them downstream. +Different programming languages may use different logging frameworks in Kubernetes tasks. To be compatible with these frameworks, DolphinScheduler provides a universal logging data format `${(key=value)}` or `#{(key=value)}`. Users can output log data in the format in the terminal logs of their applications, where `key` is the corresponding parameter prop and `value` is the value of that parameter. DolphinScheduler will capture the `${(key=value)}` or `#{(key=value)}` in the output logs to capture the parameters and pass them downstream. For example diff --git a/docs/docs/zh/guide/parameter/context.md b/docs/docs/zh/guide/parameter/context.md index f02f774142..7f5870458d 100644 --- a/docs/docs/zh/guide/parameter/context.md +++ b/docs/docs/zh/guide/parameter/context.md @@ -121,7 +121,7 @@ Node_mysql 运行结果如下: #### Kubernetes 任务传递参数 -在Kubernetes任务中不同的程序开发语言可能会采用不同的日志框架,DolphinScheduler为了兼容不同的日志框架,提供了一种通用的日志数据格式,即 `${(key=value)dsVal}`,用户可以在应用程序的终端日志中输出以格式为 `${(key=value)dsVal}` 结束的日志数据,key 为对应参数的 prop,value 为该参数的值。DolphinScheduler会捕捉输出日志中的 `${(key=value)dsVal}`来进行参数捕捉,从而传递到下游。 +在Kubernetes任务中不同的程序开发语言可能会采用不同的日志框架,DolphinScheduler为了兼容不同的日志框架,提供了一种通用的日志数据格式,即 `${(key=value)}` 或 `#{(key=value)}`,用户可以在应用程序的终端日志中输出以这种格式的日志数据,key 为对应参数的 prop,value 为该参数的值。DolphinScheduler会捕捉输出日志中的 `${(key=value)}` 和 `#{(key=value)}` 来进行参数捕捉,从而传递到下游。 如下图所示: diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index eef4c98e69..e54df42944 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser; import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptor; import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; @@ -39,31 +40,25 @@ import org.apache.commons.lang3.StringUtils; import java.io.BufferedReader; import java.io.InputStreamReader; import java.lang.reflect.Field; -import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.slf4j.Logger; +import lombok.extern.slf4j.Slf4j; import io.fabric8.kubernetes.client.dsl.LogWatch; /** * abstract command executor */ +@Slf4j public abstract class AbstractCommandExecutor { - /** - * rules for extracting Var Pool - */ - protected static final Pattern SETVALUE_REGEX = Pattern.compile(TaskConstants.SETVALUE_REGEX); - - protected StringBuilder varPool = new StringBuilder(); + protected volatile Map taskOutputParams = new HashMap<>(); /** * process */ @@ -74,11 +69,6 @@ public abstract class AbstractCommandExecutor { */ protected Consumer> logHandler; - /** - * logger - */ - protected Logger logger; - /** * log list */ @@ -98,11 +88,9 @@ public abstract class AbstractCommandExecutor { protected Future podLogOutputFuture; public AbstractCommandExecutor(Consumer> logHandler, - TaskExecutionContext taskRequest, - Logger logger) { + TaskExecutionContext taskRequest) { this.logHandler = logHandler; this.taskRequest = taskRequest; - this.logger = logger; this.logBuffer = new LinkedBlockingQueue<>(); this.logBuffer.add(EMPTY_STRING); @@ -119,7 +107,7 @@ public abstract class AbstractCommandExecutor { TaskResponse result = new TaskResponse(); int taskInstanceId = taskRequest.getTaskInstanceId(); if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) { - logger.warn( + log.warn( "Cannot find the taskInstance: {} from TaskExecutionContextCacheManager, the task might already been killed", taskInstanceId); result.setExitStatusCode(EXIT_CODE_KILL); @@ -180,7 +168,7 @@ public abstract class AbstractCommandExecutor { return result; } // print process id - logger.info("process start, process id is: {}", processId); + log.info("process start, process id is: {}", processId); // if timeout occurs, exit directly long remainTime = getRemainTime(); @@ -201,7 +189,7 @@ public abstract class AbstractCommandExecutor { // Wait the task log process finished. taskOutputFuture.get(); } catch (ExecutionException e) { - logger.error("Handle task log error", e); + log.error("Handle task log error", e); } } @@ -212,7 +200,7 @@ public abstract class AbstractCommandExecutor { // delete pod after successful execution and log collection ProcessUtils.cancelApplication(taskRequest); } catch (ExecutionException e) { - logger.error("Handle pod log error", e); + log.error("Handle pod log error", e); } } @@ -223,21 +211,21 @@ public abstract class AbstractCommandExecutor { result.setExitStatusCode(this.process.exitValue()); } else { - logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", + log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", taskRequest.getTaskTimeout()); result.setExitStatusCode(EXIT_CODE_FAILURE); cancelApplication(); } int exitCode = this.process.exitValue(); String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited."; - logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", + log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode); return result; } - public String getVarPool() { - return varPool.toString(); + public Map getTaskOutputParams() { + return taskOutputParams; } public void cancelApplication() throws InterruptedException { @@ -246,16 +234,12 @@ public abstract class AbstractCommandExecutor { } // soft kill - logger.info("Begin to kill process process, pid is : {}", taskRequest.getProcessId()); + log.info("Begin to kill process process, pid is : {}", taskRequest.getProcessId()); process.destroy(); if (!process.waitFor(5, TimeUnit.SECONDS)) { process.destroyForcibly(); } - logger.info("Success kill task: {}, pid: {}", taskRequest.getTaskAppId(), taskRequest.getProcessId()); - } - - private void printCommand(List commands) { - logger.info("task run command: {}", String.join(" ", commands)); + log.info("Success kill task: {}, pid: {}", taskRequest.getTaskAppId(), taskRequest.getProcessId()); } private void collectPodLogIfNeeded() { @@ -299,24 +283,22 @@ public abstract class AbstractCommandExecutor { ExecutorService getOutputLogService = ThreadUtils .newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + taskRequest.getTaskName()); getOutputLogService.submit(() -> { + TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); String line; while ((line = inReader.readLine()) != null) { - if (line.startsWith("${setValue(") || line.startsWith("#{setValue(")) { - varPool.append(findVarPool(line)); - varPool.append("$VarPool$"); - } else { - logBuffer.add(line); - } + logBuffer.add(line); + taskOutputParameterParser.appendParseLog(line); } processLogOutputIsSuccess = true; } catch (Exception e) { - logger.error("Parse var pool error", e); + log.error("Parse var pool error", e); processLogOutputIsSuccess = true; } finally { LogUtils.removeTaskInstanceLogFullPathMDC(); } + taskOutputParams = taskOutputParameterParser.getTaskOutputParams(); }); getOutputLogService.shutdown(); @@ -336,7 +318,7 @@ public abstract class AbstractCommandExecutor { } } } catch (Exception e) { - logger.error("Output task log error", e); + log.error("Output task log error", e); } finally { LogUtils.removeTaskInstanceLogFullPathMDC(); } @@ -344,20 +326,6 @@ public abstract class AbstractCommandExecutor { parseProcessOutputExecutorService.shutdown(); } - /** - * find var pool - * - * @param line - * @return - */ - private String findVarPool(String line) { - Matcher matcher = SETVALUE_REGEX.matcher(line); - if (matcher.find()) { - return matcher.group(1); - } - return null; - } - /** * get remain time(s) * @@ -389,7 +357,7 @@ public abstract class AbstractCommandExecutor { processId = f.getInt(process); } catch (Exception e) { - logger.error("Get task pid failed", e); + log.error("Get task pid failed", e); } return processId; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java index 95ae22f272..4437df763b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java @@ -28,24 +28,20 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; -/** - * executive task - */ +@Slf4j public abstract class AbstractTask { - protected final Logger log = LoggerFactory.getLogger(AbstractTask.class); - private static String groupName1 = "paramName1"; private static String groupName2 = "paramName2"; public String rgex = String.format("['\"]\\$\\{(?<%s>.*?)}['\"]|\\$\\{(?<%s>.*?)}", groupName1, groupName2); - /** - * varPool string - */ - protected String varPool; + @Getter + @Setter + protected Map taskOutputParams; /** * taskExecutionContext @@ -91,14 +87,6 @@ public abstract class AbstractTask { public abstract void cancel() throws TaskException; - public void setVarPool(String varPool) { - this.varPool = varPool; - } - - public String getVarPool() { - return varPool; - } - /** * get exit status code * diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java index 69f6aceb99..716947faa2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java @@ -29,15 +29,16 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import java.util.List; import java.util.Map; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public abstract class AbstractYarnTask extends AbstractRemoteTask { private ShellCommandExecutor shellCommandExecutor; public AbstractYarnTask(TaskExecutionContext taskRequest) { super(taskRequest); - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskRequest, - log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest); } // todo split handle to submit and track diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java index 2dbea62287..c9834adb86 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java @@ -20,17 +20,11 @@ package org.apache.dolphinscheduler.plugin.task.api; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Consumer; -import org.slf4j.Logger; - -/** - * shell command executor - */ public class ShellCommandExecutor extends AbstractCommandExecutor { public ShellCommandExecutor(Consumer> logHandler, - TaskExecutionContext taskRequest, - Logger logger) { - super(logHandler, taskRequest, logger); + TaskExecutionContext taskRequest) { + super(logHandler, taskRequest); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java index 97f2943950..83f4b3a678 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java @@ -35,10 +35,6 @@ public class TaskConstants { public static final String FLINK_APPLICATION_REGEX = "JobID \\w+"; - public static final String SETVALUE_REGEX = "[\\$#]\\{setValue\\((.*?)\\)}"; - - public static final String DSVALUE_REGEX = "[\\$#]\\{\\((.*?)\\)dsVal}$"; - /** * string false */ diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java index 1d9784ace4..f77170f482 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java @@ -24,6 +24,11 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import java.util.Map; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j public abstract class AbstractK8sTask extends AbstractRemoteTask { /** @@ -37,7 +42,7 @@ public abstract class AbstractK8sTask extends AbstractRemoteTask { */ protected AbstractK8sTask(TaskExecutionContext taskRequest) { super(taskRequest); - this.abstractK8sTaskExecutor = new K8sTaskExecutor(log, taskRequest); + this.abstractK8sTaskExecutor = new K8sTaskExecutor(taskRequest); } // todo split handle to submit and track @@ -47,7 +52,7 @@ public abstract class AbstractK8sTask extends AbstractRemoteTask { TaskResponse response = abstractK8sTaskExecutor.run(buildCommand()); setExitStatusCode(response.getExitStatusCode()); setAppIds(response.getAppIds()); - dealOutParam(abstractK8sTaskExecutor.getVarPool()); + dealOutParam(abstractK8sTaskExecutor.getTaskOutputParams()); } catch (Exception e) { log.error("k8s task submit failed with error"); exitStatusCode = -1; @@ -86,5 +91,5 @@ public abstract class AbstractK8sTask extends AbstractRemoteTask { */ protected abstract String buildCommand(); - protected abstract void dealOutParam(String result); + protected abstract void dealOutParam(Map taskOutputParams); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java index 6ad8869d33..1313dc23a6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java @@ -22,25 +22,25 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils; -import org.slf4j.Logger; +import java.util.HashMap; +import java.util.Map; + import org.yaml.snakeyaml.Yaml; public abstract class AbstractK8sTaskExecutor { - protected Logger log; protected TaskExecutionContext taskRequest; protected K8sUtils k8sUtils; protected Yaml yaml; - protected StringBuilder varPool; - protected AbstractK8sTaskExecutor(Logger log, TaskExecutionContext taskRequest) { - this.log = log; + protected volatile Map taskOutputParams; + protected AbstractK8sTaskExecutor(TaskExecutionContext taskRequest) { this.taskRequest = taskRequest; this.k8sUtils = new K8sUtils(); this.yaml = new Yaml(); - this.varPool = new StringBuilder(); + this.taskOutputParams = new HashMap<>(); } - public String getVarPool() { - return varPool.toString(); + public Map getTaskOutputParams() { + return taskOutputParams; } public abstract TaskResponse run(String k8sParameterStr) throws Exception; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index 66665512b6..476ed85be9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -44,10 +44,10 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy; import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor; import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters; import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse; +import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; -import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils; import org.apache.commons.lang3.StringUtils; @@ -64,8 +64,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; - +import lombok.extern.slf4j.Slf4j; import io.fabric8.kubernetes.api.model.Affinity; import io.fabric8.kubernetes.api.model.AffinityBuilder; import io.fabric8.kubernetes.api.model.EnvVar; @@ -84,14 +83,15 @@ import io.fabric8.kubernetes.client.dsl.LogWatch; /** * K8sTaskExecutor used to submit k8s task to K8S */ +@Slf4j public class K8sTaskExecutor extends AbstractK8sTaskExecutor { private Job job; protected boolean podLogOutputIsFinished = false; protected Future podLogOutputFuture; - public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) { - super(logger, taskRequest); + public K8sTaskExecutor(TaskExecutionContext taskRequest) { + super(taskRequest); } public void buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) { @@ -255,6 +255,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT); String containerName = String.format("%s-%s", taskName, taskInstanceId); podLogOutputFuture = collectPodLogExecutorService.submit(() -> { + TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); try ( LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId(), containerName)) { @@ -263,11 +264,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) { while ((line = reader.readLine()) != null) { log.info("[K8S-pod-log] {}", line); - - if (line.endsWith(VarPoolUtils.VAR_SUFFIX)) { - varPool.append(VarPoolUtils.findVarPool(line)); - varPool.append(VarPoolUtils.VAR_DELIMITER); - } + taskOutputParameterParser.appendParseLog(line); } } } catch (Exception e) { @@ -276,6 +273,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor { LogUtils.removeTaskInstanceLogFullPathMDC(); podLogOutputIsFinished = true; } + taskOutputParams = taskOutputParameterParser.getTaskOutputParams(); }); collectPodLogExecutorService.shutdown(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java index 7b75def685..4d52a38f73 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java @@ -30,12 +30,14 @@ import java.time.Duration; import javax.annotation.Nullable; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; /** * This class is the base class for all loop task type. *

* The loop task type means, we will submit a task, and loop the task status until the task is finished. */ +@Slf4j public abstract class BaseLoopTaskExecutor extends AbstractRemoteTask { /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java index 78812fcb24..a57eececf5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java @@ -24,21 +24,21 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import lombok.extern.slf4j.Slf4j; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; -/** - * job params related class - */ +@Slf4j public abstract class AbstractParameters implements IParameters { @Override @@ -130,7 +130,7 @@ public abstract class AbstractParameters implements IParameters { } } - public void dealOutParam(String result) { + public void dealOutParam(Map taskOutputParams) { if (CollectionUtils.isEmpty(localParams)) { return; } @@ -138,19 +138,18 @@ public abstract class AbstractParameters implements IParameters { if (CollectionUtils.isEmpty(outProperty)) { return; } - if (StringUtils.isEmpty(result)) { + if (MapUtils.isEmpty(taskOutputParams)) { outProperty.forEach(this::addPropertyToValPool); return; } - Map taskResult = getMapByString(result); - if (taskResult.size() == 0) { - return; - } + for (Property info : outProperty) { - String propValue = taskResult.get(info.getProp()); + String propValue = taskOutputParams.get(info.getProp()); if (StringUtils.isNotEmpty(propValue)) { info.setValue(propValue); addPropertyToValPool(info); + } else { + log.warn("Cannot find the output parameter {} in the task output parameters", info.getProp()); } } } @@ -178,23 +177,6 @@ public abstract class AbstractParameters implements IParameters { return allParams; } - /** - * shell's result format is key=value$VarPool$key=value$VarPool$ - * @param result - * @return - */ - public static Map getMapByString(String result) { - String[] formatResult = result.split("\\$VarPool\\$"); - Map format = new HashMap<>(); - for (String info : formatResult) { - if (StringUtils.isNotEmpty(info) && info.contains("=")) { - String[] keyValue = info.split("="); - format.put(keyValue[0], keyValue[1]); - } - } - return format; - } - public ResourceParametersHelper getResources() { return new ResourceParametersHelper(); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java index bff2fe20d7..0f1a893a30 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java @@ -236,7 +236,6 @@ public class SqlParameters extends AbstractParameters { return new ArrayList<>(); } - @Override public void dealOutParam(String result) { if (CollectionUtils.isEmpty(localParams)) { return; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java new file mode 100644 index 0000000000..e79d44e4da --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.plugin.task.api.parser; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.concurrent.NotThreadSafe; + +import lombok.extern.slf4j.Slf4j; + +/** + * Used to parse ${setValue()} and #{setValue()} from given lines. + */ +@Slf4j +@NotThreadSafe +public class TaskOutputParameterParser { + + private final Map taskOutputParams = new HashMap<>(); + + private List currentTaskOutputParam; + + public void appendParseLog(String log) { + if (log == null) { + return; + } + + if (currentTaskOutputParam != null) { + // continue to parse the rest of line + int i = log.indexOf(")}"); + if (i == -1) { + // the end of var pool not found + currentTaskOutputParam.add(log); + } else { + // the end of var pool found + currentTaskOutputParam.add(log.substring(0, i + 2)); + Pair keyValue = parseOutputParam(String.join("\n", currentTaskOutputParam)); + if (keyValue.getKey() != null && keyValue.getValue() != null) { + taskOutputParams.put(keyValue.getKey(), keyValue.getValue()); + } + currentTaskOutputParam = null; + // continue to parse the rest of line + if (i + 2 != log.length()) { + appendParseLog(log.substring(i + 2)); + } + } + return; + } + + int indexOfVarPoolBegin = log.indexOf("${setValue("); + if (indexOfVarPoolBegin == -1) { + indexOfVarPoolBegin = log.indexOf("#{setValue("); + } + if (indexOfVarPoolBegin == -1) { + return; + } + currentTaskOutputParam = new ArrayList<>(); + appendParseLog(log.substring(indexOfVarPoolBegin)); + } + + public Map getTaskOutputParams() { + return taskOutputParams; + } + + // #{setValue(xx=xx)} + protected Pair parseOutputParam(String outputParam) { + if (StringUtils.isEmpty(outputParam)) { + log.info("The task output param is empty"); + return ImmutablePair.nullPair(); + } + if ((!outputParam.startsWith("${setValue(") && !outputParam.startsWith("#{setValue(")) + || !outputParam.endsWith(")}")) { + log.info("The task output param {} should start with '${setValue(' or '#{setValue(' and end with ')}'", + outputParam); + return ImmutablePair.nullPair(); + } + String keyValueExpression = outputParam.substring(11, outputParam.length() - 2); + if (!keyValueExpression.contains("=")) { + log.warn("The task output param {} should composite with key=value", outputParam); + return ImmutablePair.nullPair(); + } + + String[] keyValue = keyValueExpression.split("=", 2); + return ImmutablePair.of(keyValue[0], keyValue[1]); + } + +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java deleted file mode 100644 index bb777e7126..0000000000 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.task.api.utils; - -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import lombok.experimental.UtilityClass; - -@UtilityClass -public class VarPoolUtils { - - static final Pattern DSVALUE_REGEX = Pattern.compile(TaskConstants.DSVALUE_REGEX); - public static final String VAR_SUFFIX = ")dsVal}"; - - public static final String VAR_DELIMITER = "$VarPool$"; - /** - * find var pool - * - * @param line - * @return - */ - public static String findVarPool(String line) { - Matcher matcher = DSVALUE_REGEX.matcher(line); - if (matcher.find()) { - return matcher.group(1); - } - return null; - } -} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java index 9cf407e00f..a7439aef67 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java @@ -38,24 +38,4 @@ public class AbstractTaskTest { Assertions.assertEquals(jobId, str.substring(6)); } - @Test - public void testSetValue() { - Pattern SETVALUE_REGEX = Pattern.compile(TaskConstants.SETVALUE_REGEX); - String line1 = "${setValue(sql=\"INSERT INTO a VALUES (1, 2);\")}"; - String line2 = "${setValue(a=2))}"; - Matcher matcher1 = SETVALUE_REGEX.matcher(line1); - String str1 = null; - if (matcher1.find()) { - str1 = matcher1.group(); - } - String str2 = null; - Matcher matcher2 = SETVALUE_REGEX.matcher(line2); - if (matcher2.find()) { - str2 = matcher2.group(); - } - Assertions.assertNotNull(str1); - Assertions.assertNotNull(str2); - Assertions.assertEquals(str1.length(), line1.length()); - Assertions.assertEquals(str2.length(), line2.length()); - } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java index 2a793dc80a..4bbd29d18e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.plugin.task.api.k8s; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; -import static org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; @@ -65,7 +64,7 @@ public class K8sTaskExecutorTest { requirement.setKey("node-label"); requirement.setOperator("In"); requirement.setValues(Arrays.asList("1234", "123456")); - k8sTaskExecutor = new K8sTaskExecutor(logger, taskRequest); + k8sTaskExecutor = new K8sTaskExecutor(taskRequest); k8sTaskMainParameters = new K8sTaskMainParameters(); k8sTaskMainParameters.setImage(image); k8sTaskMainParameters.setImagePullPolicy(imagePullPolicy); @@ -102,10 +101,4 @@ public class K8sTaskExecutorTest { } } - @Test - public void testValpool() { - String result = "key=value" + VAR_DELIMITER; - k8sTaskExecutor.varPool.append(result); - Assertions.assertEquals(result, k8sTaskExecutor.getVarPool()); - } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java new file mode 100644 index 0000000000..950ed822c0 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.parser; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.ImmutableMap; + +class TaskOutputParameterParserTest { + + @Test + void testEmptyLog() throws IOException, URISyntaxException { + List varPools = getLogs("/outputParam/emptyVarPoolLog.txt"); + TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); + varPools.forEach(taskOutputParameterParser::appendParseLog); + Assertions.assertTrue(taskOutputParameterParser.getTaskOutputParams().isEmpty()); + } + + @Test + void testOneLineLog() throws IOException, URISyntaxException { + List varPools = getLogs("/outputParam/onelineVarPoolLog.txt"); + TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); + varPools.forEach(taskOutputParameterParser::appendParseLog); + assertEquals(ImmutableMap.of("name", "name=tom"), taskOutputParameterParser.getTaskOutputParams()); + } + + @Test + void testOneVarPollInMultiLineLog() throws IOException, URISyntaxException { + List varPools = getLogs("/outputParam/oneVarPollInMultiLineLog.txt"); + TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); + varPools.forEach(taskOutputParameterParser::appendParseLog); + assertEquals(ImmutableMap.of("sql", + "select * from table\n" + + "where\n" + + "id = 1\n"), + taskOutputParameterParser.getTaskOutputParams()); + } + + @Test + void testVarPollInMultiLineLog() throws IOException, URISyntaxException { + List varPools = getLogs("/outputParam/multipleVarPoll.txt"); + TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); + varPools.forEach(taskOutputParameterParser::appendParseLog); + assertEquals(ImmutableMap.of("name", "tom", "age", "1"), taskOutputParameterParser.getTaskOutputParams()); + } + + private List getLogs(String file) throws IOException, URISyntaxException { + URI uri = TaskOutputParameterParserTest.class.getResource(file).toURI(); + return Files.lines(Paths.get(uri)).collect(Collectors.toList()); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java deleted file mode 100644 index e083554bde..0000000000 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.task.api.utils; - -import java.util.HashMap; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -class VarPoolUtilsTest { - - @Test - void findVar() { - HashMap tcs = new HashMap<>(); - tcs.put("${(set_val=123)dsVal}", "set_val=123"); - tcs.put("1970-01-01 ${(set_val=123)dsVal}", "set_val=123"); - tcs.put("1970-01-01 ${(set_val=123)dsVal}123", null); - tcs.put("${(set_val=123}dsVal", null); - tcs.put("#{(set_val=123)dsVal}", "set_val=123"); - tcs.put("1970-01-01 #{(set_val=123)dsVal}", "set_val=123"); - tcs.put("1970-01-01 #{(set_val=123)dsVal}123", null); - tcs.put("#{(set_val=123)dsVal}123", null); - tcs.put("#{(set_val=123dsVal}", null); - - tcs.put("${(set_val=123)dsVal}${(set_val=456)dsVal}", "set_val=123)dsVal}${(set_val=456"); - tcs.put("1970-01-01$#{(set_val=123)dsVal}", "set_val=123"); - tcs.put("1970-01-01{(set_val=123)dsVal}123", null); - tcs.put("1970-01-01$#{(${(set_val=123)})dsVal}", "${(set_val=123)}"); - tcs.put("1970-01-01$#{(${(set_val=123\\)})dsVal}", "${(set_val=123\\)}"); - - for (String tc : tcs.keySet()) { - Assertions.assertEquals(tcs.get(tc), VarPoolUtils.findVarPool(tc)); - } - } -} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/emptyVarPoolLog.txt b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/emptyVarPoolLog.txt new file mode 100644 index 0000000000..a9fd83fea0 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/emptyVarPoolLog.txt @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt new file mode 100644 index 0000000000..994cee503e --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +INFO: ${setValue(name=tom)} +INFO: ${setValue(age=1)} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/oneVarPollInMultiLineLog.txt b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/oneVarPollInMultiLineLog.txt new file mode 100644 index 0000000000..b26467cb27 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/oneVarPollInMultiLineLog.txt @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +${setValue(sql=select * from table +where +id = 1 +)} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/onelineVarPoolLog.txt b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/onelineVarPoolLog.txt new file mode 100644 index 0000000000..6d80b15695 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/onelineVarPoolLog.txt @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +${setValue(name=name=tom)} \ No newline at end of file diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java index 2e4731d704..e3814a076d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java @@ -45,9 +45,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; -/** - * chunjun task - */ +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class ChunJunTask extends AbstractTask { /** @@ -79,8 +79,7 @@ public class ChunJunTask extends AbstractTask { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskExecutionContext, log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java index e36f0659d3..394a58f417 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java @@ -30,9 +30,11 @@ import java.util.List; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; @Setter @Getter +@Slf4j public class DatafactoryTask extends AbstractRemoteTask { private final TaskExecutionContext taskExecutionContext; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java index 381b8cbd37..1bf1a6454c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java @@ -58,11 +58,14 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + /** * In DataQualityTask, the input parameters will be converted into DataQualityConfiguration, * which will be converted into a string as the parameter of DataQualityApplication, * and DataQualityApplication is spark application */ +@Slf4j public class DataQualityTask extends AbstractYarnTask { /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java index c9e3afd29b..2353dc5c4a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java @@ -36,6 +36,7 @@ import java.util.Collections; import java.util.List; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -43,6 +44,7 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategies; import com.fasterxml.jackson.databind.json.JsonMapper; @Setter +@Slf4j public class DatasyncTask extends AbstractRemoteTask { private static final ObjectMapper objectMapper = diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index 9b55f00fc9..6490b8b0b0 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -56,6 +56,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import lombok.extern.slf4j.Slf4j; + import com.alibaba.druid.sql.ast.SQLStatement; import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr; import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr; @@ -68,6 +70,7 @@ import com.alibaba.druid.sql.parser.SQLStatementParser; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +@Slf4j public class DataxTask extends AbstractTask { /** @@ -125,8 +128,7 @@ public class DataxTask extends AbstractTask { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskExecutionContext, log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java index fdaf70add4..694317122e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java @@ -45,28 +45,20 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import lombok.extern.slf4j.Slf4j; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.MissingNode; +@Slf4j public class DinkyTask extends AbstractRemoteTask { - /** - * taskExecutionContext - */ private final TaskExecutionContext taskExecutionContext; - /** - * dinky parameters - */ private DinkyParameters dinkyParameters; - /** - * constructor - * - * @param taskExecutionContext taskExecutionContext - */ protected DinkyTask(TaskExecutionContext taskExecutionContext) { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java index 74cf5b9b29..29424a5b76 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java @@ -35,12 +35,15 @@ import org.apache.commons.lang3.StringUtils; import java.util.Collections; import java.util.List; +import lombok.extern.slf4j.Slf4j; + import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException; import com.amazonaws.services.databasemigrationservice.model.ReplicationTask; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategy; import com.fasterxml.jackson.databind.json.JsonMapper; +@Slf4j public class DmsTask extends AbstractRemoteTask { private static final ObjectMapper objectMapper = diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java index 032d953858..d1a8bf8b1b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java @@ -33,9 +33,9 @@ import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilder import java.util.ArrayList; import java.util.List; -/** - * shell task - */ +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class DvcTask extends AbstractTask { /** @@ -62,7 +62,7 @@ public class DvcTask extends AbstractTask { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext); } @Override @@ -85,7 +85,7 @@ public class DvcTask extends AbstractTask { TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setProcessId(commandExecuteResult.getProcessId()); - parameters.dealOutParam(shellCommandExecutor.getVarPool()); + parameters.dealOutParam(shellCommandExecutor.getTaskOutputParams()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("The current DvcTask has been interrupted", e); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java index b4603c4605..6f6ec63a29 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java @@ -31,6 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters import java.util.TimeZone; +import lombok.extern.slf4j.Slf4j; + import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; @@ -44,6 +46,7 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy; * * @since v3.1.0 */ +@Slf4j public abstract class AbstractEmrTask extends AbstractRemoteTask { final TaskExecutionContext taskExecutionContext; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java index ff3331d530..753b206e21 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java @@ -26,6 +26,8 @@ import java.util.HashSet; import java.util.List; import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; + import com.amazonaws.SdkBaseException; import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest; import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult; @@ -45,6 +47,7 @@ import com.google.common.collect.Sets; * * @since v3.1.0 */ +@Slf4j public class EmrAddStepsTask extends AbstractEmrTask { private String stepId; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java index 9abff26818..f4b0534065 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java @@ -26,6 +26,8 @@ import java.util.HashSet; import java.util.List; import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; + import com.amazonaws.SdkBaseException; import com.amazonaws.services.elasticmapreduce.model.ClusterState; import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReason; @@ -40,6 +42,7 @@ import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsResult; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Sets; +@Slf4j public class EmrJobFlowTask extends AbstractEmrTask { private final HashSet waitingStateSet = Sets.newHashSet( diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java index 3a71df62c7..9ec10889bb 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java @@ -31,17 +31,14 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class FlinkStreamTask extends FlinkTask implements StreamTask { - /** - * flink parameters - */ private FlinkStreamParameters flinkParameters; - /** - * taskExecutionContext - */ - private TaskExecutionContext taskExecutionContext; + private final TaskExecutionContext taskExecutionContext; public FlinkStreamTask(TaskExecutionContext taskExecutionContext) { super(taskExecutionContext); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index 75764c3677..4f2963116c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -29,6 +29,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class FlinkTask extends AbstractYarnTask { /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java index 28443423c2..0ca6a2c39a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java @@ -53,6 +53,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class HiveCliTask extends AbstractRemoteTask { private HiveCliParameters hiveCliParameters; @@ -65,9 +68,7 @@ public class HiveCliTask extends AbstractRemoteTask { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskExecutionContext, - log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext); } @Override @@ -96,7 +97,7 @@ public class HiveCliTask extends AbstractRemoteTask { setExitStatusCode(taskResponse.getExitStatusCode()); setAppIds(taskResponse.getAppIds()); setProcessId(taskResponse.getProcessId()); - setVarPool(shellCommandExecutor.getVarPool()); + setTaskOutputParams(shellCommandExecutor.getTaskOutputParams()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("The current HiveCLI Task has been interrupted", e); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java index 8d696a19b1..d51c99b2a7 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java @@ -52,8 +52,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import lombok.extern.slf4j.Slf4j; + import com.fasterxml.jackson.databind.node.ObjectNode; +@Slf4j public class HttpTask extends AbstractTask { /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java index e9850b6459..5b4811bf8b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java @@ -52,8 +52,11 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import lombok.extern.slf4j.Slf4j; + import com.google.common.base.Preconditions; +@Slf4j public class JavaTask extends AbstractTask { /** @@ -79,9 +82,7 @@ public class JavaTask extends AbstractTask { public JavaTask(TaskExecutionContext taskRequest) { super(taskRequest); this.taskRequest = taskRequest; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskRequest, - log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest); } /** @@ -131,7 +132,7 @@ public class JavaTask extends AbstractTask { setExitStatusCode(taskResponse.getExitStatusCode()); setAppIds(taskResponse.getAppIds()); setProcessId(taskResponse.getProcessId()); - setVarPool(shellCommandExecutor.getVarPool()); + setTaskOutputParams(shellCommandExecutor.getTaskOutputParams()); } catch (InterruptedException e) { log.error("java task interrupted ", e); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java index 735b6237e2..5f79b139a9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java @@ -41,8 +41,11 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + import com.fasterxml.jackson.databind.ObjectMapper; +@Slf4j public class JupyterTask extends AbstractRemoteTask { private JupyterParameters jupyterParameters; @@ -54,9 +57,7 @@ public class JupyterTask extends AbstractRemoteTask { public JupyterTask(TaskExecutionContext taskExecutionContext) { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskExecutionContext, - log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext); } @Override diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java index c02adbfa76..fceb29e163 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java @@ -43,26 +43,19 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; import io.fabric8.kubernetes.api.model.NodeSelectorRequirement; +@Slf4j public class K8sTask extends AbstractK8sTask { - /** - * taskExecutionContext - */ private final TaskExecutionContext taskExecutionContext; - /** - * task parameters - */ private K8sTaskParameters k8sTaskParameters; private K8sTaskExecutionContext k8sTaskExecutionContext; private K8sConnectionParam k8sConnectionParam; - /** - * @param taskRequest taskRequest - */ public K8sTask(TaskExecutionContext taskRequest) { super(taskRequest); this.taskExecutionContext = taskRequest; @@ -119,8 +112,8 @@ public class K8sTask extends AbstractK8sTask { } @Override - protected void dealOutParam(String result) { - this.k8sTaskParameters.dealOutParam(result); + protected void dealOutParam(Map taskOutputParams) { + this.k8sTaskParameters.dealOutParam(taskOutputParams); } public List convertToNodeSelectorRequirements(List expressions) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java index ef21d34e8e..3895190cf2 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.plugin.task.k8s; -import static org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -47,6 +46,7 @@ import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; import org.mockito.Mockito; +import com.google.common.collect.ImmutableMap; import io.fabric8.kubernetes.api.model.NodeSelectorRequirement; public class K8sTaskTest { @@ -167,7 +167,7 @@ public class K8sTaskTest { @Test public void testDealOutParam() { - String result = "key=123" + VAR_DELIMITER; + Map result = ImmutableMap.of("key", "123"); k8sTask.getParameters().localParams.add(new Property("key", Direct.OUT, DataType.VARCHAR, "value")); k8sTask.dealOutParam(result); k8sTask.getParameters().getVarPool().forEach(property -> { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java index 4ae9a95a5d..0a65cc160c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java @@ -36,6 +36,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class KubeflowTask extends AbstractRemoteTask { private final TaskExecutionContext taskExecutionContext; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java index c091ee6f64..a4ab0b96a9 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java @@ -41,9 +41,9 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; -/** - * linkis task - */ +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class LinkisTask extends AbstractRemoteTask { /** @@ -76,9 +76,7 @@ public class LinkisTask extends AbstractRemoteTask { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskExecutionContext, - log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext); } @Override @@ -107,7 +105,7 @@ public class LinkisTask extends AbstractRemoteTask { setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(findTaskId(commandExecuteResult.getResultString())); setProcessId(commandExecuteResult.getProcessId()); - linkisParameters.dealOutParam(shellCommandExecutor.getVarPool()); + linkisParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("The current Linkis task has been interrupted", e); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java index 3c1f73747c..bc97be04e6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java @@ -42,9 +42,9 @@ import java.util.Map; import java.util.regex.Pattern; import java.util.stream.Collectors; -/** - * shell task - */ +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class MlflowTask extends AbstractTask { private static final Pattern GIT_CHECK_PATTERN = Pattern.compile("^(git@|https?://)"); @@ -71,7 +71,7 @@ public class MlflowTask extends AbstractTask { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext); } static public String getPresetRepository() { @@ -130,7 +130,7 @@ public class MlflowTask extends AbstractTask { } setExitStatusCode(exitCode); setProcessId(commandExecuteResult.getProcessId()); - mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool()); + mlflowParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("The current Mlflow task has been interrupted", e); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java index dd51314dfa..0979dc73cd 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java @@ -30,9 +30,9 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -/** - * mapreduce task - */ +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class MapReduceTask extends AbstractYarnTask { /** @@ -41,20 +41,10 @@ public class MapReduceTask extends AbstractYarnTask { */ private static final String MAPREDUCE_COMMAND = TaskConstants.HADOOP; - /** - * mapreduce parameters - */ private MapReduceParameters mapreduceParameters; - /** - * taskExecutionContext - */ - private TaskExecutionContext taskExecutionContext; + private final TaskExecutionContext taskExecutionContext; - /** - * constructor - * @param taskExecutionContext taskExecutionContext - */ public MapReduceTask(TaskExecutionContext taskExecutionContext) { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java index 9142a364a1..6f97d0867c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java @@ -32,11 +32,11 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import lombok.extern.slf4j.Slf4j; + import com.google.common.base.Preconditions; -/** - * openmldb task - */ +@Slf4j public class OpenmldbTask extends PythonTask { /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java index 8bab12133e..55af378e83 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java @@ -46,12 +46,15 @@ import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; /** * TIS DataX Task **/ +@Slf4j public class PigeonTask extends AbstractRemoteTask { public static final String KEY_POOL_VAR_PIGEON_HOST = "p_host"; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java index 2bdbcd5e23..6a66ba1965 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java @@ -46,24 +46,18 @@ import java.sql.Types; import java.util.HashMap; import java.util.Map; +import lombok.extern.slf4j.Slf4j; + import com.google.common.collect.Maps; -/** - * procedure task - */ +@Slf4j public class ProcedureTask extends AbstractTask { - /** - * procedure parameters - */ - private ProcedureParameters procedureParameters; + private final ProcedureParameters procedureParameters; - /** - * taskExecutionContext - */ - private TaskExecutionContext taskExecutionContext; + private final TaskExecutionContext taskExecutionContext; - private ProcedureTaskExecutionContext procedureTaskExecutionContext; + private final ProcedureTaskExecutionContext procedureTaskExecutionContext; /** * constructor diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java index 3fd938d3a5..539ecd7240 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java @@ -40,21 +40,15 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Map; +import lombok.extern.slf4j.Slf4j; + import com.google.common.base.Preconditions; -/** - * python task - */ +@Slf4j public class PythonTask extends AbstractTask { - /** - * python parameters - */ protected PythonParameters pythonParameters; - /** - * shell command executor - */ private ShellCommandExecutor shellCommandExecutor; protected TaskExecutionContext taskRequest; @@ -70,9 +64,7 @@ public class PythonTask extends AbstractTask { super(taskRequest); this.taskRequest = taskRequest; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskRequest, - log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest); } @Override @@ -104,8 +96,8 @@ public class PythonTask extends AbstractTask { TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(taskResponse.getExitStatusCode()); setProcessId(taskResponse.getProcessId()); - setVarPool(shellCommandExecutor.getVarPool()); - pythonParameters.dealOutParam(shellCommandExecutor.getVarPool()); + setTaskOutputParams(shellCommandExecutor.getTaskOutputParams()); + pythonParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams()); } catch (Exception e) { log.error("python task failure", e); setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java index ffefc83a8e..1d7a874dfc 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java @@ -34,6 +34,9 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class PytorchTask extends AbstractTask { private final ShellCommandExecutor shellCommandExecutor; @@ -45,9 +48,7 @@ public class PytorchTask extends AbstractTask { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskExecutionContext, - log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext); } @Override @@ -76,7 +77,7 @@ public class PytorchTask extends AbstractTask { TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(taskResponse.getExitStatusCode()); setProcessId(taskResponse.getProcessId()); - setVarPool(shellCommandExecutor.getVarPool()); + setTaskOutputParams(shellCommandExecutor.getTaskOutputParams()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("The current Pytorch task has been interrupted", e); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java index 650bf84a69..4de28b7fb5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java @@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.remoteshell; import org.apache.dolphinscheduler.plugin.datasource.ssh.SSHUtils; import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam; -import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskException; +import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser; import org.apache.commons.lang3.StringUtils; import org.apache.sshd.client.SshClient; @@ -36,24 +36,19 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.EnumSet; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.HashMap; +import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class RemoteExecutor { - protected final Logger logger = - LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME, getClass())); - - protected static final Pattern SETVALUE_REGEX = Pattern.compile(TaskConstants.SETVALUE_REGEX); - static final String REMOTE_SHELL_HOME = "/tmp/dolphinscheduler-remote-shell-%s/"; static final String STATUS_TAG_MESSAGE = "DOLPHINSCHEDULER-REMOTE-SHELL-TASK-STATUS-"; static final int TRACK_INTERVAL = 5000; - protected StringBuilder varPool = new StringBuilder(); + protected Map taskOutputParams = new HashMap<>(); SshClient sshClient; ClientSession session; @@ -105,60 +100,44 @@ public class RemoteExecutor { public void track(String taskId) throws Exception { int logN = 0; String pid; - logger.info("Remote shell task log:"); + log.info("Remote shell task log:"); + TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); do { pid = getTaskPid(taskId); String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN + 1, getRemoteShellHome(), taskId); - String log = runRemote(trackCommand); - if (StringUtils.isEmpty(log)) { + String logLine = runRemote(trackCommand); + if (StringUtils.isEmpty(logLine)) { Thread.sleep(TRACK_INTERVAL); } else { - logN += log.split("\n").length; - setVarPool(log); - logger.info(log); + logN += logLine.split("\n").length; + log.info(logLine); + taskOutputParameterParser.appendParseLog(logLine); } } while (StringUtils.isNotEmpty(pid)); + taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams()); } - public String getVarPool() { - return varPool.toString(); - } - - private void setVarPool(String log) { - String[] lines = log.split("\n"); - for (String line : lines) { - if (line.startsWith("${setValue(") || line.startsWith("#{setValue(")) { - varPool.append(findVarPool(line)); - varPool.append("$VarPool$"); - } - } - } - - private String findVarPool(String line) { - Matcher matcher = SETVALUE_REGEX.matcher(line); - if (matcher.find()) { - return matcher.group(1); - } - return null; + public Map getTaskOutputParams() { + return taskOutputParams; } public Integer getTaskExitCode(String taskId) throws IOException { String trackCommand = String.format(COMMAND.LOG_TAIL_COMMAND, getRemoteShellHome(), taskId); - String log = runRemote(trackCommand); + String logLine = runRemote(trackCommand); int exitCode = -1; - logger.info("Remote shell task run status: {}", log); - if (log.contains(STATUS_TAG_MESSAGE)) { - String status = log.replace(STATUS_TAG_MESSAGE, "").trim(); + log.info("Remote shell task run status: {}", logLine); + if (logLine.contains(STATUS_TAG_MESSAGE)) { + String status = logLine.replace(STATUS_TAG_MESSAGE, "").trim(); if (status.equals("0")) { - logger.info("Remote shell task success"); + log.info("Remote shell task success"); exitCode = 0; } else { - logger.error("Remote shell task failed"); + log.error("Remote shell task failed"); exitCode = Integer.parseInt(status); } } cleanData(taskId); - logger.error("Remote shell task failed"); + log.error("Remote shell task failed"); return exitCode; } @@ -168,7 +147,7 @@ public class RemoteExecutor { try { runRemote(cleanCommand); } catch (Exception e) { - logger.error("Remote shell task clean data failed, but will not affect the task execution", e); + log.error("Remote shell task clean data failed, but will not affect the task execution", e); } } @@ -189,14 +168,14 @@ public class RemoteExecutor { runRemote(checkDirCommand); uploadScript(taskId, localFile); - logger.info("The final script is: \n{}", + log.info("The final script is: \n{}", runRemote(String.format(COMMAND.CAT_FINAL_SCRIPT, getRemoteShellHome(), taskId))); } public void uploadScript(String taskId, String localFile) throws IOException { String remotePath = getRemoteShellHome() + taskId + ".sh"; - logger.info("upload script from local:{} to remote: {}", localFile, remotePath); + log.info("upload script from local:{} to remote: {}", localFile, remotePath); try (SftpFileSystem fs = SftpClientFactory.instance().createSftpFileSystem(getSession())) { Path path = fs.getPath(remotePath); Files.copy(Paths.get(localFile), path); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java index 216203bfea..561392896c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java @@ -42,9 +42,9 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Map; -/** - * shell task - */ +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class RemoteShellTask extends AbstractTask { static final String TASK_ID_PREFIX = "dolphinscheduler-remoteshell-"; @@ -102,7 +102,7 @@ public class RemoteShellTask extends AbstractTask { String localFile = buildCommand(); int exitCode = remoteExecutor.run(taskId, localFile); setExitStatusCode(exitCode); - remoteShellParameters.dealOutParam(remoteExecutor.getVarPool()); + remoteShellParameters.dealOutParam(remoteExecutor.getTaskOutputParams()); } catch (Exception e) { log.error("shell task error", e); setExitStatusCode(EXIT_CODE_FAILURE); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java index d04f5a3fac..b1b2cc811f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java @@ -39,6 +39,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import lombok.extern.slf4j.Slf4j; + import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; @@ -52,6 +54,7 @@ import com.fasterxml.jackson.databind.json.JsonMapper; /** * SagemakerTask task, Used to start Sagemaker pipeline */ +@Slf4j public class SagemakerTask extends AbstractRemoteTask { private static final ObjectMapper objectMapper = JsonMapper.builder() diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java index 0b837fa568..b6d0b6136e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java @@ -47,9 +47,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; -/** - * seatunnel task - */ +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class SeatunnelTask extends AbstractRemoteTask { private static final String SEATUNNEL_BIN_DIR = "${SEATUNNEL_HOME}/bin/"; @@ -78,9 +78,7 @@ public class SeatunnelTask extends AbstractRemoteTask { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskExecutionContext, - log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext); } @Override @@ -109,7 +107,7 @@ public class SeatunnelTask extends AbstractRemoteTask { setExitStatusCode(commandExecuteResult.getExitStatusCode()); setAppIds(String.join(TaskConstants.COMMA, getApplicationIds())); setProcessId(commandExecuteResult.getProcessId()); - seatunnelParameters.dealOutParam(shellCommandExecutor.getVarPool()); + seatunnelParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("The current SeaTunnel task has been interrupted", e); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java index b657f9a9ef..71cfb9522c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java @@ -31,9 +31,9 @@ import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilde import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; -/** - * shell task - */ +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class ShellTask extends AbstractTask { /** @@ -60,9 +60,7 @@ public class ShellTask extends AbstractTask { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext; - this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, - taskExecutionContext, - log); + this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext); } @Override @@ -87,7 +85,7 @@ public class ShellTask extends AbstractTask { TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack); setExitStatusCode(commandExecuteResult.getExitStatusCode()); setProcessId(commandExecuteResult.getProcessId()); - shellParameters.dealOutParam(shellCommandExecutor.getVarPool()); + shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("The current Shell task has been interrupted", e); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java index ef0b71c8fe..99a9d9e61b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java @@ -53,19 +53,15 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; import io.fabric8.kubernetes.client.Config; +@Slf4j public class SparkTask extends AbstractYarnTask { - /** - * spark parameters - */ private SparkParameters sparkParameters; - /** - * taskExecutionContext - */ - private TaskExecutionContext taskExecutionContext; + private final TaskExecutionContext taskExecutionContext; public SparkTask(TaskExecutionContext taskExecutionContext) { super(taskExecutionContext); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 091dfc5794..77886db59c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -61,24 +61,18 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +@Slf4j public class SqlTask extends AbstractTask { - /** - * taskExecutionContext - */ - private TaskExecutionContext taskExecutionContext; + private final TaskExecutionContext taskExecutionContext; - /** - * sql parameters - */ - private SqlParameters sqlParameters; + private final SqlParameters sqlParameters; - /** - * base datasource - */ private BaseConnectionParam baseConnectionParam; /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java index 016a07a56b..86aea61f0c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java @@ -29,19 +29,16 @@ import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters; import java.util.Map; +import lombok.extern.slf4j.Slf4j; + /** * sqoop task extends the shell task */ +@Slf4j public class SqoopTask extends AbstractYarnTask { - /** - * sqoop task params - */ private SqoopParameters sqoopParameters; - /** - * taskExecutionContext - */ private final TaskExecutionContext taskExecutionContext; private SqoopTaskExecutionContext sqoopTaskExecutionContext; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java index 01459111b5..c3b580de35 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java @@ -42,19 +42,15 @@ import java.util.List; import java.util.Map; import kong.unirest.Unirest; +import lombok.extern.slf4j.Slf4j; import com.fasterxml.jackson.databind.ObjectMapper; +@Slf4j public class ZeppelinTask extends AbstractRemoteTask { - /** - * taskExecutionContext - */ private final TaskExecutionContext taskExecutionContext; - /** - * zeppelin parameters - */ private ZeppelinParameters zeppelinParameters; /** @@ -66,11 +62,6 @@ public class ZeppelinTask extends AbstractRemoteTask { private ZeppelinTaskExecutionContext zeppelinTaskExecutionContext; - /** - * constructor - * - * @param taskExecutionContext taskExecutionContext - */ protected ZeppelinTask(TaskExecutionContext taskExecutionContext) { super(taskExecutionContext); this.taskExecutionContext = taskExecutionContext;