Browse Source

Support parse task output params under multiple log (#15244)

augit-log
Wenjun Ruan 6 months ago committed by GitHub
parent
commit
71ee1f0c3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      docs/docs/en/guide/parameter/context.md
  2. 2
      docs/docs/zh/guide/parameter/context.md
  3. 80
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java
  4. 26
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java
  5. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java
  6. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java
  7. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java
  8. 11
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java
  9. 16
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java
  10. 18
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java
  11. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java
  12. 38
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java
  13. 1
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java
  14. 107
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java
  15. 47
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java
  16. 20
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java
  17. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java
  18. 77
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java
  19. 50
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java
  20. 16
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/emptyVarPoolLog.txt
  21. 19
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt
  22. 21
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/oneVarPollInMultiLineLog.txt
  23. 18
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/onelineVarPoolLog.txt
  24. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
  25. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java
  26. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
  27. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java
  28. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
  29. 14
      dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
  30. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java
  31. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
  32. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
  33. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java
  34. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java
  35. 11
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
  36. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
  37. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java
  38. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
  39. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
  40. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
  41. 15
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
  42. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java
  43. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
  44. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
  45. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
  46. 18
      dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
  47. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
  48. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
  49. 18
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
  50. 20
      dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
  51. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
  52. 73
      dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java
  53. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java
  54. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
  55. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  56. 12
      dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
  57. 10
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
  58. 16
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
  59. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
  60. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java

2
docs/docs/en/guide/parameter/context.md

@ -122,7 +122,7 @@ Although the two parameters var1 and var2 are output in the A task, only the `OU
#### Pass parameter from Kubernetes task to downstream
Different programming languages may use different logging frameworks in Kubernetes tasks. To be compatible with these frameworks, DolphinScheduler provides a universal logging data format `${(key=value)dsVal}`. Users can output log data in the format `${(key=value)dsVal}` in the terminal logs of their applications, where `key` is the corresponding parameter prop and `value` is the value of that parameter. DolphinScheduler will capture the `${(key=value)dsVal}` in the output logs to capture the parameters and pass them downstream.
Different programming languages may use different logging frameworks in Kubernetes tasks. To be compatible with these frameworks, DolphinScheduler provides a universal logging data format `${(key=value)}` or `#{(key=value)}`. Users can output log data in the format in the terminal logs of their applications, where `key` is the corresponding parameter prop and `value` is the value of that parameter. DolphinScheduler will capture the `${(key=value)}` or `#{(key=value)}` in the output logs to capture the parameters and pass them downstream.
For example

2
docs/docs/zh/guide/parameter/context.md

@ -121,7 +121,7 @@ Node_mysql 运行结果如下:
#### Kubernetes 任务传递参数
在Kubernetes任务中不同的程序开发语言可能会采用不同的日志框架,DolphinScheduler为了兼容不同的日志框架,提供了一种通用的日志数据格式,即 `${(key=value)dsVal}`,用户可以在应用程序的终端日志中输出以格式为 `${(key=value)dsVal}` 结束的日志数据,key 为对应参数的 prop,value 为该参数的值。DolphinScheduler会捕捉输出日志中的 `${(key=value)dsVal}`来进行参数捕捉,从而传递到下游。
在Kubernetes任务中不同的程序开发语言可能会采用不同的日志框架,DolphinScheduler为了兼容不同的日志框架,提供了一种通用的日志数据格式,即 `${(key=value)}``#{(key=value)}`,用户可以在应用程序的终端日志中输出以这种格式的日志数据,key 为对应参数的 prop,value 为该参数的值。DolphinScheduler会捕捉输出日志中的 `${(key=value)}``#{(key=value)}` 来进行参数捕捉,从而传递到下游。
如下图所示:

80
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptor;
import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilder;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
@ -39,31 +40,25 @@ import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import lombok.extern.slf4j.Slf4j;
import io.fabric8.kubernetes.client.dsl.LogWatch;
/**
* abstract command executor
*/
@Slf4j
public abstract class AbstractCommandExecutor {
/**
* rules for extracting Var Pool
*/
protected static final Pattern SETVALUE_REGEX = Pattern.compile(TaskConstants.SETVALUE_REGEX);
protected StringBuilder varPool = new StringBuilder();
protected volatile Map<String, String> taskOutputParams = new HashMap<>();
/**
* process
*/
@ -74,11 +69,6 @@ public abstract class AbstractCommandExecutor {
*/
protected Consumer<LinkedBlockingQueue<String>> logHandler;
/**
* logger
*/
protected Logger logger;
/**
* log list
*/
@ -98,11 +88,9 @@ public abstract class AbstractCommandExecutor {
protected Future<?> podLogOutputFuture;
public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskExecutionContext taskRequest,
Logger logger) {
TaskExecutionContext taskRequest) {
this.logHandler = logHandler;
this.taskRequest = taskRequest;
this.logger = logger;
this.logBuffer = new LinkedBlockingQueue<>();
this.logBuffer.add(EMPTY_STRING);
@ -119,7 +107,7 @@ public abstract class AbstractCommandExecutor {
TaskResponse result = new TaskResponse();
int taskInstanceId = taskRequest.getTaskInstanceId();
if (null == TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId)) {
logger.warn(
log.warn(
"Cannot find the taskInstance: {} from TaskExecutionContextCacheManager, the task might already been killed",
taskInstanceId);
result.setExitStatusCode(EXIT_CODE_KILL);
@ -180,7 +168,7 @@ public abstract class AbstractCommandExecutor {
return result;
}
// print process id
logger.info("process start, process id is: {}", processId);
log.info("process start, process id is: {}", processId);
// if timeout occurs, exit directly
long remainTime = getRemainTime();
@ -201,7 +189,7 @@ public abstract class AbstractCommandExecutor {
// Wait the task log process finished.
taskOutputFuture.get();
} catch (ExecutionException e) {
logger.error("Handle task log error", e);
log.error("Handle task log error", e);
}
}
@ -212,7 +200,7 @@ public abstract class AbstractCommandExecutor {
// delete pod after successful execution and log collection
ProcessUtils.cancelApplication(taskRequest);
} catch (ExecutionException e) {
logger.error("Handle pod log error", e);
log.error("Handle pod log error", e);
}
}
@ -223,21 +211,21 @@ public abstract class AbstractCommandExecutor {
result.setExitStatusCode(this.process.exitValue());
} else {
logger.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
taskRequest.getTaskTimeout());
result.setExitStatusCode(EXIT_CODE_FAILURE);
cancelApplication();
}
int exitCode = this.process.exitValue();
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
logger.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
return result;
}
public String getVarPool() {
return varPool.toString();
public Map<String, String> getTaskOutputParams() {
return taskOutputParams;
}
public void cancelApplication() throws InterruptedException {
@ -246,16 +234,12 @@ public abstract class AbstractCommandExecutor {
}
// soft kill
logger.info("Begin to kill process process, pid is : {}", taskRequest.getProcessId());
log.info("Begin to kill process process, pid is : {}", taskRequest.getProcessId());
process.destroy();
if (!process.waitFor(5, TimeUnit.SECONDS)) {
process.destroyForcibly();
}
logger.info("Success kill task: {}, pid: {}", taskRequest.getTaskAppId(), taskRequest.getProcessId());
}
private void printCommand(List<String> commands) {
logger.info("task run command: {}", String.join(" ", commands));
log.info("Success kill task: {}, pid: {}", taskRequest.getTaskAppId(), taskRequest.getProcessId());
}
private void collectPodLogIfNeeded() {
@ -299,24 +283,22 @@ public abstract class AbstractCommandExecutor {
ExecutorService getOutputLogService = ThreadUtils
.newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + taskRequest.getTaskName());
getOutputLogService.submit(() -> {
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
String line;
while ((line = inReader.readLine()) != null) {
if (line.startsWith("${setValue(") || line.startsWith("#{setValue(")) {
varPool.append(findVarPool(line));
varPool.append("$VarPool$");
} else {
logBuffer.add(line);
}
logBuffer.add(line);
taskOutputParameterParser.appendParseLog(line);
}
processLogOutputIsSuccess = true;
} catch (Exception e) {
logger.error("Parse var pool error", e);
log.error("Parse var pool error", e);
processLogOutputIsSuccess = true;
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
}
taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
});
getOutputLogService.shutdown();
@ -336,7 +318,7 @@ public abstract class AbstractCommandExecutor {
}
}
} catch (Exception e) {
logger.error("Output task log error", e);
log.error("Output task log error", e);
} finally {
LogUtils.removeTaskInstanceLogFullPathMDC();
}
@ -344,20 +326,6 @@ public abstract class AbstractCommandExecutor {
parseProcessOutputExecutorService.shutdown();
}
/**
* find var pool
*
* @param line
* @return
*/
private String findVarPool(String line) {
Matcher matcher = SETVALUE_REGEX.matcher(line);
if (matcher.find()) {
return matcher.group(1);
}
return null;
}
/**
* get remain times
*
@ -389,7 +357,7 @@ public abstract class AbstractCommandExecutor {
processId = f.getInt(process);
} catch (Exception e) {
logger.error("Get task pid failed", e);
log.error("Get task pid failed", e);
}
return processId;

26
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java

@ -28,24 +28,20 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* executive task
*/
@Slf4j
public abstract class AbstractTask {
protected final Logger log = LoggerFactory.getLogger(AbstractTask.class);
private static String groupName1 = "paramName1";
private static String groupName2 = "paramName2";
public String rgex = String.format("['\"]\\$\\{(?<%s>.*?)}['\"]|\\$\\{(?<%s>.*?)}", groupName1, groupName2);
/**
* varPool string
*/
protected String varPool;
@Getter
@Setter
protected Map<String, String> taskOutputParams;
/**
* taskExecutionContext
@ -91,14 +87,6 @@ public abstract class AbstractTask {
public abstract void cancel() throws TaskException;
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public String getVarPool() {
return varPool;
}
/**
* get exit status code
*

7
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

@ -29,15 +29,16 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class AbstractYarnTask extends AbstractRemoteTask {
private ShellCommandExecutor shellCommandExecutor;
public AbstractYarnTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest,
log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest);
}
// todo split handle to submit and track

10
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java

@ -20,17 +20,11 @@ package org.apache.dolphinscheduler.plugin.task.api;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.slf4j.Logger;
/**
* shell command executor
*/
public class ShellCommandExecutor extends AbstractCommandExecutor {
public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
TaskExecutionContext taskRequest,
Logger logger) {
super(logHandler, taskRequest, logger);
TaskExecutionContext taskRequest) {
super(logHandler, taskRequest);
}
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskConstants.java

@ -35,10 +35,6 @@ public class TaskConstants {
public static final String FLINK_APPLICATION_REGEX = "JobID \\w+";
public static final String SETVALUE_REGEX = "[\\$#]\\{setValue\\((.*?)\\)}";
public static final String DSVALUE_REGEX = "[\\$#]\\{\\((.*?)\\)dsVal}$";
/**
* string false
*/

11
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTask.java

@ -24,6 +24,11 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.k8s.impl.K8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class AbstractK8sTask extends AbstractRemoteTask {
/**
@ -37,7 +42,7 @@ public abstract class AbstractK8sTask extends AbstractRemoteTask {
*/
protected AbstractK8sTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.abstractK8sTaskExecutor = new K8sTaskExecutor(log, taskRequest);
this.abstractK8sTaskExecutor = new K8sTaskExecutor(taskRequest);
}
// todo split handle to submit and track
@ -47,7 +52,7 @@ public abstract class AbstractK8sTask extends AbstractRemoteTask {
TaskResponse response = abstractK8sTaskExecutor.run(buildCommand());
setExitStatusCode(response.getExitStatusCode());
setAppIds(response.getAppIds());
dealOutParam(abstractK8sTaskExecutor.getVarPool());
dealOutParam(abstractK8sTaskExecutor.getTaskOutputParams());
} catch (Exception e) {
log.error("k8s task submit failed with error");
exitStatusCode = -1;
@ -86,5 +91,5 @@ public abstract class AbstractK8sTask extends AbstractRemoteTask {
*/
protected abstract String buildCommand();
protected abstract void dealOutParam(String result);
protected abstract void dealOutParam(Map<String, String> taskOutputParams);
}

16
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/AbstractK8sTaskExecutor.java

@ -22,25 +22,25 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.utils.K8sUtils;
import org.slf4j.Logger;
import java.util.HashMap;
import java.util.Map;
import org.yaml.snakeyaml.Yaml;
public abstract class AbstractK8sTaskExecutor {
protected Logger log;
protected TaskExecutionContext taskRequest;
protected K8sUtils k8sUtils;
protected Yaml yaml;
protected StringBuilder varPool;
protected AbstractK8sTaskExecutor(Logger log, TaskExecutionContext taskRequest) {
this.log = log;
protected volatile Map<String, String> taskOutputParams;
protected AbstractK8sTaskExecutor(TaskExecutionContext taskRequest) {
this.taskRequest = taskRequest;
this.k8sUtils = new K8sUtils();
this.yaml = new Yaml();
this.varPool = new StringBuilder();
this.taskOutputParams = new HashMap<>();
}
public String getVarPool() {
return varPool.toString();
public Map<String, String> getTaskOutputParams() {
return taskOutputParams;
}
public abstract TaskResponse run(String k8sParameterStr) throws Exception;

18
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java

@ -44,10 +44,10 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.k8s.AbstractK8sTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.k8s.K8sTaskMainParameters;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.commons.lang3.StringUtils;
@ -64,8 +64,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import lombok.extern.slf4j.Slf4j;
import io.fabric8.kubernetes.api.model.Affinity;
import io.fabric8.kubernetes.api.model.AffinityBuilder;
import io.fabric8.kubernetes.api.model.EnvVar;
@ -84,14 +83,15 @@ import io.fabric8.kubernetes.client.dsl.LogWatch;
/**
* K8sTaskExecutor used to submit k8s task to K8S
*/
@Slf4j
public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
private Job job;
protected boolean podLogOutputIsFinished = false;
protected Future<?> podLogOutputFuture;
public K8sTaskExecutor(Logger logger, TaskExecutionContext taskRequest) {
super(logger, taskRequest);
public K8sTaskExecutor(TaskExecutionContext taskRequest) {
super(taskRequest);
}
public void buildK8sJob(K8sTaskMainParameters k8STaskMainParameters) {
@ -255,6 +255,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
String taskName = taskRequest.getTaskName().toLowerCase(Locale.ROOT);
String containerName = String.format("%s-%s", taskName, taskInstanceId);
podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
try (
LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(),
taskRequest.getTaskAppId(), containerName)) {
@ -263,11 +264,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) {
while ((line = reader.readLine()) != null) {
log.info("[K8S-pod-log] {}", line);
if (line.endsWith(VarPoolUtils.VAR_SUFFIX)) {
varPool.append(VarPoolUtils.findVarPool(line));
varPool.append(VarPoolUtils.VAR_DELIMITER);
}
taskOutputParameterParser.appendParseLog(line);
}
}
} catch (Exception e) {
@ -276,6 +273,7 @@ public class K8sTaskExecutor extends AbstractK8sTaskExecutor {
LogUtils.removeTaskInstanceLogFullPathMDC();
podLogOutputIsFinished = true;
}
taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
});
collectPodLogExecutorService.shutdown();

2
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/loop/BaseLoopTaskExecutor.java

@ -30,12 +30,14 @@ import java.time.Duration;
import javax.annotation.Nullable;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
/**
* This class is the base class for all loop task type.
* <p>
* The loop task type means, we will submit a task, and loop the task status until the task is finished.
*/
@Slf4j
public abstract class BaseLoopTaskExecutor extends AbstractRemoteTask {
/**

38
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/AbstractParameters.java

@ -24,21 +24,21 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
/**
* job params related class
*/
@Slf4j
public abstract class AbstractParameters implements IParameters {
@Override
@ -130,7 +130,7 @@ public abstract class AbstractParameters implements IParameters {
}
}
public void dealOutParam(String result) {
public void dealOutParam(Map<String, String> taskOutputParams) {
if (CollectionUtils.isEmpty(localParams)) {
return;
}
@ -138,19 +138,18 @@ public abstract class AbstractParameters implements IParameters {
if (CollectionUtils.isEmpty(outProperty)) {
return;
}
if (StringUtils.isEmpty(result)) {
if (MapUtils.isEmpty(taskOutputParams)) {
outProperty.forEach(this::addPropertyToValPool);
return;
}
Map<String, String> taskResult = getMapByString(result);
if (taskResult.size() == 0) {
return;
}
for (Property info : outProperty) {
String propValue = taskResult.get(info.getProp());
String propValue = taskOutputParams.get(info.getProp());
if (StringUtils.isNotEmpty(propValue)) {
info.setValue(propValue);
addPropertyToValPool(info);
} else {
log.warn("Cannot find the output parameter {} in the task output parameters", info.getProp());
}
}
}
@ -178,23 +177,6 @@ public abstract class AbstractParameters implements IParameters {
return allParams;
}
/**
* shell's result format is key=value$VarPool$key=value$VarPool$
* @param result
* @return
*/
public static Map<String, String> getMapByString(String result) {
String[] formatResult = result.split("\\$VarPool\\$");
Map<String, String> format = new HashMap<>();
for (String info : formatResult) {
if (StringUtils.isNotEmpty(info) && info.contains("=")) {
String[] keyValue = info.split("=");
format.put(keyValue[0], keyValue[1]);
}
}
return format;
}
public ResourceParametersHelper getResources() {
return new ResourceParametersHelper();
}

1
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parameters/SqlParameters.java

@ -236,7 +236,6 @@ public class SqlParameters extends AbstractParameters {
return new ArrayList<>();
}
@Override
public void dealOutParam(String result) {
if (CollectionUtils.isEmpty(localParams)) {
return;

107
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParser.java

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.parser;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
/**
* Used to parse ${setValue()} and #{setValue()} from given lines.
*/
@Slf4j
@NotThreadSafe
public class TaskOutputParameterParser {
private final Map<String, String> taskOutputParams = new HashMap<>();
private List<String> currentTaskOutputParam;
public void appendParseLog(String log) {
if (log == null) {
return;
}
if (currentTaskOutputParam != null) {
// continue to parse the rest of line
int i = log.indexOf(")}");
if (i == -1) {
// the end of var pool not found
currentTaskOutputParam.add(log);
} else {
// the end of var pool found
currentTaskOutputParam.add(log.substring(0, i + 2));
Pair<String, String> keyValue = parseOutputParam(String.join("\n", currentTaskOutputParam));
if (keyValue.getKey() != null && keyValue.getValue() != null) {
taskOutputParams.put(keyValue.getKey(), keyValue.getValue());
}
currentTaskOutputParam = null;
// continue to parse the rest of line
if (i + 2 != log.length()) {
appendParseLog(log.substring(i + 2));
}
}
return;
}
int indexOfVarPoolBegin = log.indexOf("${setValue(");
if (indexOfVarPoolBegin == -1) {
indexOfVarPoolBegin = log.indexOf("#{setValue(");
}
if (indexOfVarPoolBegin == -1) {
return;
}
currentTaskOutputParam = new ArrayList<>();
appendParseLog(log.substring(indexOfVarPoolBegin));
}
public Map<String, String> getTaskOutputParams() {
return taskOutputParams;
}
// #{setValue(xx=xx)}
protected Pair<String, String> parseOutputParam(String outputParam) {
if (StringUtils.isEmpty(outputParam)) {
log.info("The task output param is empty");
return ImmutablePair.nullPair();
}
if ((!outputParam.startsWith("${setValue(") && !outputParam.startsWith("#{setValue("))
|| !outputParam.endsWith(")}")) {
log.info("The task output param {} should start with '${setValue(' or '#{setValue(' and end with ')}'",
outputParam);
return ImmutablePair.nullPair();
}
String keyValueExpression = outputParam.substring(11, outputParam.length() - 2);
if (!keyValueExpression.contains("=")) {
log.warn("The task output param {} should composite with key=value", outputParam);
return ImmutablePair.nullPair();
}
String[] keyValue = keyValueExpression.split("=", 2);
return ImmutablePair.of(keyValue[0], keyValue[1]);
}
}

47
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtils.java

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.utils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.experimental.UtilityClass;
@UtilityClass
public class VarPoolUtils {
static final Pattern DSVALUE_REGEX = Pattern.compile(TaskConstants.DSVALUE_REGEX);
public static final String VAR_SUFFIX = ")dsVal}";
public static final String VAR_DELIMITER = "$VarPool$";
/**
* find var pool
*
* @param line
* @return
*/
public static String findVarPool(String line) {
Matcher matcher = DSVALUE_REGEX.matcher(line);
if (matcher.find()) {
return matcher.group(1);
}
return null;
}
}

20
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTaskTest.java

@ -38,24 +38,4 @@ public class AbstractTaskTest {
Assertions.assertEquals(jobId, str.substring(6));
}
@Test
public void testSetValue() {
Pattern SETVALUE_REGEX = Pattern.compile(TaskConstants.SETVALUE_REGEX);
String line1 = "${setValue(sql=\"INSERT INTO a VALUES (1, 2);\")}";
String line2 = "${setValue(a=2))}";
Matcher matcher1 = SETVALUE_REGEX.matcher(line1);
String str1 = null;
if (matcher1.find()) {
str1 = matcher1.group();
}
String str2 = null;
Matcher matcher2 = SETVALUE_REGEX.matcher(line2);
if (matcher2.find()) {
str2 = matcher2.group();
}
Assertions.assertNotNull(str1);
Assertions.assertNotNull(str2);
Assertions.assertEquals(str1.length(), line1.length());
Assertions.assertEquals(str2.length(), line2.length());
}
}

9
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/k8s/K8sTaskExecutorTest.java

@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.plugin.task.api.k8s;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
@ -65,7 +64,7 @@ public class K8sTaskExecutorTest {
requirement.setKey("node-label");
requirement.setOperator("In");
requirement.setValues(Arrays.asList("1234", "123456"));
k8sTaskExecutor = new K8sTaskExecutor(logger, taskRequest);
k8sTaskExecutor = new K8sTaskExecutor(taskRequest);
k8sTaskMainParameters = new K8sTaskMainParameters();
k8sTaskMainParameters.setImage(image);
k8sTaskMainParameters.setImagePullPolicy(imagePullPolicy);
@ -102,10 +101,4 @@ public class K8sTaskExecutorTest {
}
}
@Test
public void testValpool() {
String result = "key=value" + VAR_DELIMITER;
k8sTaskExecutor.varPool.append(result);
Assertions.assertEquals(result, k8sTaskExecutor.getVarPool());
}
}

77
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/parser/TaskOutputParameterParserTest.java

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.parser;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import com.google.common.collect.ImmutableMap;
class TaskOutputParameterParserTest {
@Test
void testEmptyLog() throws IOException, URISyntaxException {
List<String> varPools = getLogs("/outputParam/emptyVarPoolLog.txt");
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
varPools.forEach(taskOutputParameterParser::appendParseLog);
Assertions.assertTrue(taskOutputParameterParser.getTaskOutputParams().isEmpty());
}
@Test
void testOneLineLog() throws IOException, URISyntaxException {
List<String> varPools = getLogs("/outputParam/onelineVarPoolLog.txt");
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
varPools.forEach(taskOutputParameterParser::appendParseLog);
assertEquals(ImmutableMap.of("name", "name=tom"), taskOutputParameterParser.getTaskOutputParams());
}
@Test
void testOneVarPollInMultiLineLog() throws IOException, URISyntaxException {
List<String> varPools = getLogs("/outputParam/oneVarPollInMultiLineLog.txt");
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
varPools.forEach(taskOutputParameterParser::appendParseLog);
assertEquals(ImmutableMap.of("sql",
"select * from table\n" +
"where\n" +
"id = 1\n"),
taskOutputParameterParser.getTaskOutputParams());
}
@Test
void testVarPollInMultiLineLog() throws IOException, URISyntaxException {
List<String> varPools = getLogs("/outputParam/multipleVarPoll.txt");
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
varPools.forEach(taskOutputParameterParser::appendParseLog);
assertEquals(ImmutableMap.of("name", "tom", "age", "1"), taskOutputParameterParser.getTaskOutputParams());
}
private List<String> getLogs(String file) throws IOException, URISyntaxException {
URI uri = TaskOutputParameterParserTest.class.getResource(file).toURI();
return Files.lines(Paths.get(uri)).collect(Collectors.toList());
}
}

50
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/VarPoolUtilsTest.java

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.utils;
import java.util.HashMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
class VarPoolUtilsTest {
@Test
void findVar() {
HashMap<String, String> tcs = new HashMap<>();
tcs.put("${(set_val=123)dsVal}", "set_val=123");
tcs.put("1970-01-01 ${(set_val=123)dsVal}", "set_val=123");
tcs.put("1970-01-01 ${(set_val=123)dsVal}123", null);
tcs.put("${(set_val=123}dsVal", null);
tcs.put("#{(set_val=123)dsVal}", "set_val=123");
tcs.put("1970-01-01 #{(set_val=123)dsVal}", "set_val=123");
tcs.put("1970-01-01 #{(set_val=123)dsVal}123", null);
tcs.put("#{(set_val=123)dsVal}123", null);
tcs.put("#{(set_val=123dsVal}", null);
tcs.put("${(set_val=123)dsVal}${(set_val=456)dsVal}", "set_val=123)dsVal}${(set_val=456");
tcs.put("1970-01-01$#{(set_val=123)dsVal}", "set_val=123");
tcs.put("1970-01-01{(set_val=123)dsVal}123", null);
tcs.put("1970-01-01$#{(${(set_val=123)})dsVal}", "${(set_val=123)}");
tcs.put("1970-01-01$#{(${(set_val=123\\)})dsVal}", "${(set_val=123\\)}");
for (String tc : tcs.keySet()) {
Assertions.assertEquals(tcs.get(tc), VarPoolUtils.findVarPool(tc));
}
}
}

16
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/emptyVarPoolLog.txt

@ -0,0 +1,16 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

19
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/multipleVarPoll.txt

@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
INFO: ${setValue(name=tom)}
INFO: ${setValue(age=1)}

21
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/oneVarPollInMultiLineLog.txt

@ -0,0 +1,21 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
${setValue(sql=select * from table
where
id = 1
)}

18
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/resources/outputParam/onelineVarPoolLog.txt

@ -0,0 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
${setValue(name=name=tom)}

9
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java

@ -45,9 +45,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* chunjun task
*/
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ChunJunTask extends AbstractTask {
/**
@ -79,8 +79,7 @@ public class ChunJunTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext, log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
}
/**

2
dolphinscheduler-task-plugin/dolphinscheduler-task-datafactory/src/main/java/org/apache/dolphinscheduler/plugin/task/datafactory/DatafactoryTask.java

@ -30,9 +30,11 @@ import java.util.List;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Setter
@Getter
@Slf4j
public class DatafactoryTask extends AbstractRemoteTask {
private final TaskExecutionContext taskExecutionContext;

3
dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java

@ -58,11 +58,14 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
/**
* In DataQualityTask, the input parameters will be converted into DataQualityConfiguration,
* which will be converted into a string as the parameter of DataQualityApplication,
* and DataQualityApplication is spark application
*/
@Slf4j
public class DataQualityTask extends AbstractYarnTask {
/**

2
dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java

@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.List;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -43,6 +44,7 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.json.JsonMapper;
@Setter
@Slf4j
public class DatasyncTask extends AbstractRemoteTask {
private static final ObjectMapper objectMapper =

6
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

@ -56,6 +56,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
@ -68,6 +70,7 @@ import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@Slf4j
public class DataxTask extends AbstractTask {
/**
@ -125,8 +128,7 @@ public class DataxTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext, log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
}
/**

14
dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java

@ -45,28 +45,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
@Slf4j
public class DinkyTask extends AbstractRemoteTask {
/**
* taskExecutionContext
*/
private final TaskExecutionContext taskExecutionContext;
/**
* dinky parameters
*/
private DinkyParameters dinkyParameters;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
protected DinkyTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;

3
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java

@ -35,12 +35,15 @@ import org.apache.commons.lang3.StringUtils;
import java.util.Collections;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStateException;
import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.json.JsonMapper;
@Slf4j
public class DmsTask extends AbstractRemoteTask {
private static final ObjectMapper objectMapper =

10
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java

@ -33,9 +33,9 @@ import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilder
import java.util.ArrayList;
import java.util.List;
/**
* shell task
*/
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DvcTask extends AbstractTask {
/**
@ -62,7 +62,7 @@ public class DvcTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
}
@Override
@ -85,7 +85,7 @@ public class DvcTask extends AbstractTask {
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
parameters.dealOutParam(shellCommandExecutor.getVarPool());
parameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current DvcTask has been interrupted", e);

3
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java

@ -31,6 +31,8 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters
import java.util.TimeZone;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
@ -44,6 +46,7 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy;
*
* @since v3.1.0
*/
@Slf4j
public abstract class AbstractEmrTask extends AbstractRemoteTask {
final TaskExecutionContext taskExecutionContext;

3
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrAddStepsTask.java

@ -26,6 +26,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
@ -45,6 +47,7 @@ import com.google.common.collect.Sets;
*
* @since v3.1.0
*/
@Slf4j
public class EmrAddStepsTask extends AbstractEmrTask {
private String stepId;

3
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/EmrJobFlowTask.java

@ -26,6 +26,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.elasticmapreduce.model.ClusterState;
import com.amazonaws.services.elasticmapreduce.model.ClusterStateChangeReason;
@ -40,6 +42,7 @@ import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsResult;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Sets;
@Slf4j
public class EmrJobFlowTask extends AbstractEmrTask {
private final HashSet<String> waitingStateSet = Sets.newHashSet(

11
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java

@ -31,17 +31,14 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FlinkStreamTask extends FlinkTask implements StreamTask {
/**
* flink parameters
*/
private FlinkStreamParameters flinkParameters;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
private final TaskExecutionContext taskExecutionContext;
public FlinkStreamTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);

3
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

@ -29,6 +29,9 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FlinkTask extends AbstractYarnTask {
/**

9
dolphinscheduler-task-plugin/dolphinscheduler-task-hivecli/src/main/java/org/apache/dolphinscheduler/plugin/task/hivecli/HiveCliTask.java

@ -53,6 +53,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class HiveCliTask extends AbstractRemoteTask {
private HiveCliParameters hiveCliParameters;
@ -65,9 +68,7 @@ public class HiveCliTask extends AbstractRemoteTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,
log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
}
@Override
@ -96,7 +97,7 @@ public class HiveCliTask extends AbstractRemoteTask {
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current HiveCLI Task has been interrupted", e);

3
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java

@ -52,8 +52,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.node.ObjectNode;
@Slf4j
public class HttpTask extends AbstractTask {
/**

9
dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java

@ -52,8 +52,11 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Preconditions;
@Slf4j
public class JavaTask extends AbstractTask {
/**
@ -79,9 +82,7 @@ public class JavaTask extends AbstractTask {
public JavaTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.taskRequest = taskRequest;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest,
log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest);
}
/**
@ -131,7 +132,7 @@ public class JavaTask extends AbstractTask {
setExitStatusCode(taskResponse.getExitStatusCode());
setAppIds(taskResponse.getAppIds());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
log.error("java task interrupted ", e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);

7
dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java

@ -41,8 +41,11 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.ObjectMapper;
@Slf4j
public class JupyterTask extends AbstractRemoteTask {
private JupyterParameters jupyterParameters;
@ -54,9 +57,7 @@ public class JupyterTask extends AbstractRemoteTask {
public JupyterTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,
log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
}
@Override

15
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java

@ -43,26 +43,19 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
@Slf4j
public class K8sTask extends AbstractK8sTask {
/**
* taskExecutionContext
*/
private final TaskExecutionContext taskExecutionContext;
/**
* task parameters
*/
private K8sTaskParameters k8sTaskParameters;
private K8sTaskExecutionContext k8sTaskExecutionContext;
private K8sConnectionParam k8sConnectionParam;
/**
* @param taskRequest taskRequest
*/
public K8sTask(TaskExecutionContext taskRequest) {
super(taskRequest);
this.taskExecutionContext = taskRequest;
@ -119,8 +112,8 @@ public class K8sTask extends AbstractK8sTask {
}
@Override
protected void dealOutParam(String result) {
this.k8sTaskParameters.dealOutParam(result);
protected void dealOutParam(Map<String, String> taskOutputParams) {
this.k8sTaskParameters.dealOutParam(taskOutputParams);
}
public List<NodeSelectorRequirement> convertToNodeSelectorRequirements(List<NodeSelectorExpression> expressions) {

4
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/test/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTaskTest.java

@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.plugin.task.k8s;
import static org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils.VAR_DELIMITER;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@ -47,6 +46,7 @@ import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableMap;
import io.fabric8.kubernetes.api.model.NodeSelectorRequirement;
public class K8sTaskTest {
@ -167,7 +167,7 @@ public class K8sTaskTest {
@Test
public void testDealOutParam() {
String result = "key=123" + VAR_DELIMITER;
Map<String, String> result = ImmutableMap.of("key", "123");
k8sTask.getParameters().localParams.add(new Property("key", Direct.OUT, DataType.VARCHAR, "value"));
k8sTask.dealOutParam(result);
k8sTask.getParameters().getVarPool().forEach(property -> {

3
dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java

@ -36,6 +36,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class KubeflowTask extends AbstractRemoteTask {
private final TaskExecutionContext taskExecutionContext;

12
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java

@ -41,9 +41,9 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* linkis task
*/
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LinkisTask extends AbstractRemoteTask {
/**
@ -76,9 +76,7 @@ public class LinkisTask extends AbstractRemoteTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,
log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
}
@Override
@ -107,7 +105,7 @@ public class LinkisTask extends AbstractRemoteTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(findTaskId(commandExecuteResult.getResultString()));
setProcessId(commandExecuteResult.getProcessId());
linkisParameters.dealOutParam(shellCommandExecutor.getVarPool());
linkisParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current Linkis task has been interrupted", e);

10
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java

@ -42,9 +42,9 @@ import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* shell task
*/
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MlflowTask extends AbstractTask {
private static final Pattern GIT_CHECK_PATTERN = Pattern.compile("^(git@|https?://)");
@ -71,7 +71,7 @@ public class MlflowTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext, log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
}
static public String getPresetRepository() {
@ -130,7 +130,7 @@ public class MlflowTask extends AbstractTask {
}
setExitStatusCode(exitCode);
setProcessId(commandExecuteResult.getProcessId());
mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool());
mlflowParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current Mlflow task has been interrupted", e);

18
dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java

@ -30,9 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* mapreduce task
*/
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MapReduceTask extends AbstractYarnTask {
/**
@ -41,20 +41,10 @@ public class MapReduceTask extends AbstractYarnTask {
*/
private static final String MAPREDUCE_COMMAND = TaskConstants.HADOOP;
/**
* mapreduce parameters
*/
private MapReduceParameters mapreduceParameters;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
private final TaskExecutionContext taskExecutionContext;
/**
* constructor
* @param taskExecutionContext taskExecutionContext
*/
public MapReduceTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;

6
dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java

@ -32,11 +32,11 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Preconditions;
/**
* openmldb task
*/
@Slf4j
public class OpenmldbTask extends PythonTask {
/**

3
dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java

@ -46,12 +46,15 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
/**
* TIS DataX Task
**/
@Slf4j
public class PigeonTask extends AbstractRemoteTask {
public static final String KEY_POOL_VAR_PIGEON_HOST = "p_host";

18
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

@ -46,24 +46,18 @@ import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import com.google.common.collect.Maps;
/**
* procedure task
*/
@Slf4j
public class ProcedureTask extends AbstractTask {
/**
* procedure parameters
*/
private ProcedureParameters procedureParameters;
private final ProcedureParameters procedureParameters;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
private final TaskExecutionContext taskExecutionContext;
private ProcedureTaskExecutionContext procedureTaskExecutionContext;
private final ProcedureTaskExecutionContext procedureTaskExecutionContext;
/**
* constructor

20
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

@ -40,21 +40,15 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import com.google.common.base.Preconditions;
/**
* python task
*/
@Slf4j
public class PythonTask extends AbstractTask {
/**
* python parameters
*/
protected PythonParameters pythonParameters;
/**
* shell command executor
*/
private ShellCommandExecutor shellCommandExecutor;
protected TaskExecutionContext taskRequest;
@ -70,9 +64,7 @@ public class PythonTask extends AbstractTask {
super(taskRequest);
this.taskRequest = taskRequest;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskRequest,
log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest);
}
@Override
@ -104,8 +96,8 @@ public class PythonTask extends AbstractTask {
TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
pythonParameters.dealOutParam(shellCommandExecutor.getVarPool());
setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
pythonParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (Exception e) {
log.error("python task failure", e);
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);

9
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java

@ -34,6 +34,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class PytorchTask extends AbstractTask {
private final ShellCommandExecutor shellCommandExecutor;
@ -45,9 +48,7 @@ public class PytorchTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,
log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
}
@Override
@ -76,7 +77,7 @@ public class PytorchTask extends AbstractTask {
TaskResponse taskResponse = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(taskResponse.getExitStatusCode());
setProcessId(taskResponse.getProcessId());
setVarPool(shellCommandExecutor.getVarPool());
setTaskOutputParams(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current Pytorch task has been interrupted", e);

73
dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java

@ -19,8 +19,8 @@ package org.apache.dolphinscheduler.plugin.task.remoteshell;
import org.apache.dolphinscheduler.plugin.datasource.ssh.SSHUtils;
import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.sshd.client.SshClient;
@ -36,24 +36,19 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.EnumSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RemoteExecutor {
protected final Logger logger =
LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME, getClass()));
protected static final Pattern SETVALUE_REGEX = Pattern.compile(TaskConstants.SETVALUE_REGEX);
static final String REMOTE_SHELL_HOME = "/tmp/dolphinscheduler-remote-shell-%s/";
static final String STATUS_TAG_MESSAGE = "DOLPHINSCHEDULER-REMOTE-SHELL-TASK-STATUS-";
static final int TRACK_INTERVAL = 5000;
protected StringBuilder varPool = new StringBuilder();
protected Map<String, String> taskOutputParams = new HashMap<>();
SshClient sshClient;
ClientSession session;
@ -105,60 +100,44 @@ public class RemoteExecutor {
public void track(String taskId) throws Exception {
int logN = 0;
String pid;
logger.info("Remote shell task log:");
log.info("Remote shell task log:");
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
do {
pid = getTaskPid(taskId);
String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN + 1, getRemoteShellHome(), taskId);
String log = runRemote(trackCommand);
if (StringUtils.isEmpty(log)) {
String logLine = runRemote(trackCommand);
if (StringUtils.isEmpty(logLine)) {
Thread.sleep(TRACK_INTERVAL);
} else {
logN += log.split("\n").length;
setVarPool(log);
logger.info(log);
logN += logLine.split("\n").length;
log.info(logLine);
taskOutputParameterParser.appendParseLog(logLine);
}
} while (StringUtils.isNotEmpty(pid));
taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams());
}
public String getVarPool() {
return varPool.toString();
}
private void setVarPool(String log) {
String[] lines = log.split("\n");
for (String line : lines) {
if (line.startsWith("${setValue(") || line.startsWith("#{setValue(")) {
varPool.append(findVarPool(line));
varPool.append("$VarPool$");
}
}
}
private String findVarPool(String line) {
Matcher matcher = SETVALUE_REGEX.matcher(line);
if (matcher.find()) {
return matcher.group(1);
}
return null;
public Map<String, String> getTaskOutputParams() {
return taskOutputParams;
}
public Integer getTaskExitCode(String taskId) throws IOException {
String trackCommand = String.format(COMMAND.LOG_TAIL_COMMAND, getRemoteShellHome(), taskId);
String log = runRemote(trackCommand);
String logLine = runRemote(trackCommand);
int exitCode = -1;
logger.info("Remote shell task run status: {}", log);
if (log.contains(STATUS_TAG_MESSAGE)) {
String status = log.replace(STATUS_TAG_MESSAGE, "").trim();
log.info("Remote shell task run status: {}", logLine);
if (logLine.contains(STATUS_TAG_MESSAGE)) {
String status = logLine.replace(STATUS_TAG_MESSAGE, "").trim();
if (status.equals("0")) {
logger.info("Remote shell task success");
log.info("Remote shell task success");
exitCode = 0;
} else {
logger.error("Remote shell task failed");
log.error("Remote shell task failed");
exitCode = Integer.parseInt(status);
}
}
cleanData(taskId);
logger.error("Remote shell task failed");
log.error("Remote shell task failed");
return exitCode;
}
@ -168,7 +147,7 @@ public class RemoteExecutor {
try {
runRemote(cleanCommand);
} catch (Exception e) {
logger.error("Remote shell task clean data failed, but will not affect the task execution", e);
log.error("Remote shell task clean data failed, but will not affect the task execution", e);
}
}
@ -189,14 +168,14 @@ public class RemoteExecutor {
runRemote(checkDirCommand);
uploadScript(taskId, localFile);
logger.info("The final script is: \n{}",
log.info("The final script is: \n{}",
runRemote(String.format(COMMAND.CAT_FINAL_SCRIPT, getRemoteShellHome(), taskId)));
}
public void uploadScript(String taskId, String localFile) throws IOException {
String remotePath = getRemoteShellHome() + taskId + ".sh";
logger.info("upload script from local:{} to remote: {}", localFile, remotePath);
log.info("upload script from local:{} to remote: {}", localFile, remotePath);
try (SftpFileSystem fs = SftpClientFactory.instance().createSftpFileSystem(getSession())) {
Path path = fs.getPath(remotePath);
Files.copy(Paths.get(localFile), path);

8
dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java

@ -42,9 +42,9 @@ import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Map;
/**
* shell task
*/
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RemoteShellTask extends AbstractTask {
static final String TASK_ID_PREFIX = "dolphinscheduler-remoteshell-";
@ -102,7 +102,7 @@ public class RemoteShellTask extends AbstractTask {
String localFile = buildCommand();
int exitCode = remoteExecutor.run(taskId, localFile);
setExitStatusCode(exitCode);
remoteShellParameters.dealOutParam(remoteExecutor.getVarPool());
remoteShellParameters.dealOutParam(remoteExecutor.getTaskOutputParams());
} catch (Exception e) {
log.error("shell task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);

3
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java

@ -39,6 +39,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
@ -52,6 +54,7 @@ import com.fasterxml.jackson.databind.json.JsonMapper;
/**
* SagemakerTask task, Used to start Sagemaker pipeline
*/
@Slf4j
public class SagemakerTask extends AbstractRemoteTask {
private static final ObjectMapper objectMapper = JsonMapper.builder()

12
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -47,9 +47,9 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* seatunnel task
*/
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SeatunnelTask extends AbstractRemoteTask {
private static final String SEATUNNEL_BIN_DIR = "${SEATUNNEL_HOME}/bin/";
@ -78,9 +78,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,
log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
}
@Override
@ -109,7 +107,7 @@ public class SeatunnelTask extends AbstractRemoteTask {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(String.join(TaskConstants.COMMA, getApplicationIds()));
setProcessId(commandExecuteResult.getProcessId());
seatunnelParameters.dealOutParam(shellCommandExecutor.getVarPool());
seatunnelParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current SeaTunnel task has been interrupted", e);

12
dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java

@ -31,9 +31,9 @@ import org.apache.dolphinscheduler.plugin.task.api.shell.IShellInterceptorBuilde
import org.apache.dolphinscheduler.plugin.task.api.shell.ShellInterceptorBuilderFactory;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
/**
* shell task
*/
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ShellTask extends AbstractTask {
/**
@ -60,9 +60,7 @@ public class ShellTask extends AbstractTask {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,
log);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
}
@Override
@ -87,7 +85,7 @@ public class ShellTask extends AbstractTask {
TaskResponse commandExecuteResult = shellCommandExecutor.run(shellActuatorBuilder, taskCallBack);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setProcessId(commandExecuteResult.getProcessId());
shellParameters.dealOutParam(shellCommandExecutor.getVarPool());
shellParameters.dealOutParam(shellCommandExecutor.getTaskOutputParams());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("The current Shell task has been interrupted", e);

10
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java

@ -53,19 +53,15 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import io.fabric8.kubernetes.client.Config;
@Slf4j
public class SparkTask extends AbstractYarnTask {
/**
* spark parameters
*/
private SparkParameters sparkParameters;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
private final TaskExecutionContext taskExecutionContext;
public SparkTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);

16
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -61,24 +61,18 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@Slf4j
public class SqlTask extends AbstractTask {
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
private final TaskExecutionContext taskExecutionContext;
/**
* sql parameters
*/
private SqlParameters sqlParameters;
private final SqlParameters sqlParameters;
/**
* base datasource
*/
private BaseConnectionParam baseConnectionParam;
/**

9
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java

@ -29,19 +29,16 @@ import org.apache.dolphinscheduler.plugin.task.sqoop.parameter.SqoopParameters;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
/**
* sqoop task extends the shell task
*/
@Slf4j
public class SqoopTask extends AbstractYarnTask {
/**
* sqoop task params
*/
private SqoopParameters sqoopParameters;
/**
* taskExecutionContext
*/
private final TaskExecutionContext taskExecutionContext;
private SqoopTaskExecutionContext sqoopTaskExecutionContext;

13
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java

@ -42,19 +42,15 @@ import java.util.List;
import java.util.Map;
import kong.unirest.Unirest;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.databind.ObjectMapper;
@Slf4j
public class ZeppelinTask extends AbstractRemoteTask {
/**
* taskExecutionContext
*/
private final TaskExecutionContext taskExecutionContext;
/**
* zeppelin parameters
*/
private ZeppelinParameters zeppelinParameters;
/**
@ -66,11 +62,6 @@ public class ZeppelinTask extends AbstractRemoteTask {
private ZeppelinTaskExecutionContext zeppelinTaskExecutionContext;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
*/
protected ZeppelinTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;

Loading…
Cancel
Save