Browse Source

Format task parameter as pretty json (#13173)

3.2.0-release
Wenjun Ruan 2 years ago committed by GitHub
parent
commit
70ccffeee2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 27
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
  2. 6
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  3. 8
      dolphinscheduler-standalone-server/src/main/resources/application.yaml
  4. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java
  5. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java
  6. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java
  7. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java
  8. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java
  9. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java
  10. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java
  11. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java
  12. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java
  13. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java
  14. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java
  15. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java
  16. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java
  17. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java
  18. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java
  19. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java
  20. 9
      dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java
  21. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java
  22. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java
  23. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java
  24. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java
  25. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java
  26. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java
  27. 4
      dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java
  28. 16
      dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java
  29. 7
      dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java
  30. 6
      dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java
  31. 3
      dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java
  32. 11
      dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java
  33. 8
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java
  34. 2
      dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java
  35. 2
      dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

27
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java

@ -54,6 +54,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
@ -72,21 +73,17 @@ public class JSONUtils {
logger.info("init timezone: {}", TimeZone.getDefault());
}
private static final SimpleModule LOCAL_DATE_TIME_MODULE = new SimpleModule()
.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer())
.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer());
/**
* can use static singleton, inject: just make sure to reuse!
*/
private static final ObjectMapper objectMapper = new ObjectMapper()
private static final ObjectMapper objectMapper = JsonMapper.builder()
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.configure(REQUIRE_SETTERS_FOR_GETTERS, true)
.registerModule(LOCAL_DATE_TIME_MODULE)
.setTimeZone(TimeZone.getDefault())
.setDateFormat(new SimpleDateFormat(YYYY_MM_DD_HH_MM_SS));
.addModule(new SimpleModule()
.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer())
.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer()))
.defaultTimeZone(TimeZone.getDefault())
.defaultDateFormat(new SimpleDateFormat(YYYY_MM_DD_HH_MM_SS))
.build();
private JSONUtils() {
throw new UnsupportedOperationException("Construct JSONUtils");
@ -325,6 +322,14 @@ public class JSONUtils {
}
}
public static String toPrettyJsonString(Object object) {
try {
return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(object);
} catch (Exception e) {
throw new RuntimeException("Object json deserialization exception.", e);
}
}
/**
* serialize to json byte
*

6
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -320,12 +320,14 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
this.setTaskResourceInfo(resources);
// TODO to be optimized
DataQualityTaskExecutionContext dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext();
DataQualityTaskExecutionContext dataQualityTaskExecutionContext = null;
if (TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) {
dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext();
setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, tenant.getTenantCode());
}
K8sTaskExecutionContext k8sTaskExecutionContext = new K8sTaskExecutionContext();
K8sTaskExecutionContext k8sTaskExecutionContext = null;
if (TASK_TYPE_SET_K8S.contains(taskInstance.getTaskType())) {
k8sTaskExecutionContext = new K8sTaskExecutionContext();
setK8sTaskRelation(k8sTaskExecutionContext, taskInstance);
}

8
dolphinscheduler-standalone-server/src/main/resources/application.yaml

@ -135,17 +135,15 @@ master:
host-selector: lower_weight
# master heartbeat interval
heartbeat-interval: 10s
# Master heart beat task error threshold, if the continuous error count exceed this count, the master will close.
heartbeat-error-threshold: 5
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval
task-commit-interval: 1s
state-wheel-interval: 5s
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1
max-cpu-load-avg: 50
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3
reserved-memory: 0.03
# failover interval
failover-interval: 10m
# kill yarn jon when failover taskInstance, default true
@ -159,8 +157,6 @@ worker:
exec-threads: 10
# worker heartbeat interval
heartbeat-interval: 10s
# Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close.
heartbeat-error-threshold: 5
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true.

3
dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java

@ -30,6 +30,8 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import com.fasterxml.jackson.annotation.JsonInclude;
/**
* to master/worker task transport
*/
@ -37,6 +39,7 @@ import lombok.NoArgsConstructor;
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class TaskExecutionContext implements Serializable {
private static final long serialVersionUID = -1L;

3
dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java

@ -94,8 +94,9 @@ public class ChunJunTask extends AbstractTask {
*/
@Override
public void init() {
logger.info("chunjun task params {}", taskExecutionContext.getTaskParams());
chunJunParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ChunJunParameters.class);
logger.info("Initialize chunjun task params {}",
JSONUtils.toPrettyJsonString(taskExecutionContext.getTaskParams()));
if (!chunJunParameters.checkParameters()) {
throw new RuntimeException("chunjun task params is not valid");

2
dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java

@ -82,10 +82,10 @@ public class DataQualityTask extends AbstractYarnTask {
@Override
public void init() {
logger.info("data quality task params {}", dqTaskExecutionContext.getTaskParams());
dataQualityParameters =
JSONUtils.parseObject(dqTaskExecutionContext.getTaskParams(), DataQualityParameters.class);
logger.info("Initialize data quality task params {}", JSONUtils.toPrettyJsonString(dataQualityParameters));
if (null == dataQualityParameters) {
logger.error("data quality params is null");

7
dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java

@ -68,9 +68,9 @@ public class DatasyncTask extends AbstractRemoteTask {
@Override
public void init() {
logger.info("Datasync task params {}", taskExecutionContext.getTaskParams());
parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DatasyncParameters.class);
logger.info("Initialize Datasync task params {}", JSONUtils.toPrettyJsonString(parameters));
initParams();
hook = new DatasyncHook();
@ -84,10 +84,9 @@ public class DatasyncTask extends AbstractRemoteTask {
try {
parameters = objectMapper.readValue(parameters.getJson(), DatasyncParameters.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
throw new TaskException("Convert json to task params failed", e);
}
// parameters = JSONUtils.parseObject(parameters.getJson(), DatasyncParameters.class);
logger.info("Datasync convert task params {}", parameters);
logger.info("Success convert json to task params {}", JSONUtils.toPrettyJsonString(parameters));
}
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java

@ -134,8 +134,8 @@ public class DataxTask extends AbstractTask {
*/
@Override
public void init() {
logger.info("datax task params {}", taskExecutionContext.getTaskParams());
dataXParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DataxParameters.class);
logger.info("Initialize datax task params {}", JSONUtils.toPrettyJsonString(dataXParameters));
if (dataXParameters == null || !dataXParameters.checkParameters()) {
throw new RuntimeException("datax task params is not valid");

2
dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java

@ -80,8 +80,8 @@ public class DinkyTask extends AbstractRemoteTask {
@Override
public void init() {
final String taskParams = taskExecutionContext.getTaskParams();
logger.info("dinky task params:{}", taskParams);
this.dinkyParameters = JSONUtils.parseObject(taskParams, DinkyParameters.class);
logger.info("Initialize dinky task params: {}", JSONUtils.toPrettyJsonString(dinkyParameters));
if (this.dinkyParameters == null || !this.dinkyParameters.checkParameters()) {
throw new DinkyTaskException("dinky task params is not valid");
}

9
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java

@ -40,15 +40,18 @@ import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStat
import com.amazonaws.services.databasemigrationservice.model.ReplicationTask;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.json.JsonMapper;
public class DmsTask extends AbstractRemoteTask {
private static final ObjectMapper objectMapper =
new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
JsonMapper.builder()
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.configure(REQUIRE_SETTERS_FOR_GETTERS, true)
.setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
.propertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy())
.build();
/**
* taskExecutionContext
*/
@ -68,8 +71,8 @@ public class DmsTask extends AbstractRemoteTask {
@Override
public void init() throws TaskException {
logger.info("Dms task params {}", taskExecutionContext.getTaskParams());
parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class);
logger.info("Initialize Dms task params {}", JSONUtils.toPrettyJsonString(parameters));
initDmsHook();
}

6
dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java

@ -65,12 +65,12 @@ public class DvcTask extends AbstractTask {
@Override
public void init() {
logger.info("dvc task params {}", taskExecutionContext.getTaskParams());
parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DvcParameters.class);
logger.info("Initialize dvc task params {}", JSONUtils.toPrettyJsonString(parameters));
if (!parameters.checkParameters()) {
throw new RuntimeException("dvc task params is not valid");
if (parameters == null || !parameters.checkParameters()) {
throw new TaskException("dvc task params is not valid");
}
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java

@ -78,8 +78,8 @@ public abstract class AbstractEmrTask extends AbstractRemoteTask {
@Override
public void init() {
final String taskParams = taskExecutionContext.getTaskParams();
logger.info("emr task params:{}", taskParams);
emrParameters = JSONUtils.parseObject(taskParams, EmrParameters.class);
logger.info("Initialize emr task params:{}", JSONUtils.toPrettyJsonString(taskParams));
if (emrParameters == null || !emrParameters.checkParameters()) {
throw new EmrTaskException("emr task params is not valid");
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java

@ -50,9 +50,9 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask {
@Override
public void init() {
logger.info("flink task params {}", taskExecutionContext.getTaskParams());
flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkStreamParameters.class);
logger.info("Initialize Flink task params {}", JSONUtils.toPrettyJsonString(flinkParameters));
if (flinkParameters == null || !flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");

2
dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java

@ -53,9 +53,9 @@ public class FlinkTask extends AbstractYarnTask {
@Override
public void init() {
logger.info("flink task params {}", taskExecutionContext.getTaskParams());
flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class);
logger.info("Initialize flink task params {}", JSONUtils.toPrettyJsonString(flinkParameters));
if (flinkParameters == null || !flinkParameters.checkParameters()) {
throw new RuntimeException("flink task params is not valid");

4
dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java

@ -80,10 +80,10 @@ public class HttpTask extends AbstractTask {
@Override
public void init() {
logger.info("http task params {}", taskExecutionContext.getTaskParams());
this.httpParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), HttpParameters.class);
logger.info("Initialize http task params {}", JSONUtils.toPrettyJsonString(httpParameters));
if (!httpParameters.checkParameters()) {
if (httpParameters == null || !httpParameters.checkParameters()) {
throw new RuntimeException("http task params is not valid");
}
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java

@ -89,7 +89,6 @@ public class JavaTask extends AbstractTask {
**/
@Override
public void init() {
logger.info("java task params {}", taskRequest.getTaskParams());
javaParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), JavaParameters.class);
if (javaParameters == null || !javaParameters.checkParameters()) {
throw new TaskException("java task params is not valid");
@ -97,6 +96,7 @@ public class JavaTask extends AbstractTask {
if (javaParameters.getRunType().equals(JavaConstants.RUN_TYPE_JAR)) {
setMainJarName();
}
logger.info("Initialize java task params {}", JSONUtils.toPrettyJsonString(javaParameters));
}
/**

2
dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java

@ -65,9 +65,9 @@ public class JupyterTask extends AbstractRemoteTask {
@Override
public void init() {
logger.info("jupyter task params {}", taskExecutionContext.getTaskParams());
jupyterParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), JupyterParameters.class);
logger.info("Initialize jupyter task params {}", JSONUtils.toPrettyJsonString(jupyterParameters));
if (null == jupyterParameters) {
logger.error("jupyter params is null");

3
dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java

@ -53,7 +53,8 @@ public class K8sTask extends AbstractK8sTask {
super(taskRequest);
this.taskExecutionContext = taskRequest;
this.k8sTaskParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), K8sTaskParameters.class);
if (!k8sTaskParameters.checkParameters()) {
logger.info("Initialize k8s task parameters {}", JSONUtils.toPrettyJsonString(k8sTaskParameters));
if (k8sTaskParameters == null || !k8sTaskParameters.checkParameters()) {
throw new TaskException("K8S task params is not valid");
}
}

2
dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java

@ -53,8 +53,8 @@ public class KubeflowTask extends AbstractRemoteTask {
@Override
public void init() throws TaskException {
logger.info("Kubeflow task params {}", taskExecutionContext.getTaskParams());
kubeflowParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), KubeflowParameters.class);
logger.info("Initialize Kubeflow task params {}", taskExecutionContext.getTaskParams());
kubeflowParameters.setClusterYAML(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml());
if (!kubeflowParameters.checkParameters()) {

9
dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java

@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
@ -88,7 +89,9 @@ public class LinkisTask extends AbstractRemoteTask {
@Override
public void init() {
logger.info("Linkis task params {}", taskExecutionContext.getTaskParams());
linkisParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), LinkisParameters.class);
logger.info("Initialize Linkis task params {}", JSONUtils.toPrettyJsonString(linkisParameters));
if (!linkisParameters.checkParameters()) {
throw new RuntimeException("Linkis task params is not valid");
}
@ -251,8 +254,4 @@ public class LinkisTask extends AbstractRemoteTask {
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
}
public void setLinkisParameters(LinkisParameters linkisParameters) {
this.linkisParameters = linkisParameters;
}
}

6
dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java

@ -102,12 +102,12 @@ public class MlflowTask extends AbstractTask {
@Override
public void init() {
logger.info("shell task params {}", taskExecutionContext.getTaskParams());
mlflowParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MlflowParameters.class);
if (!mlflowParameters.checkParameters()) {
throw new RuntimeException("shell task params is not valid");
logger.info("Initialize MLFlow task params {}", JSONUtils.toPrettyJsonString(mlflowParameters));
if (mlflowParameters == null || !mlflowParameters.checkParameters()) {
throw new RuntimeException("MLFlow task params is not valid");
}
}

3
dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java

@ -64,8 +64,6 @@ public class MapReduceTask extends AbstractYarnTask {
@Override
public void init() {
logger.info("mapreduce task params {}", taskExecutionContext.getTaskParams());
this.mapreduceParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MapReduceParameters.class);
@ -89,6 +87,7 @@ public class MapReduceTask extends AbstractYarnTask {
ParamUtils.convert(paramsMap));
mapreduceParameters.setOthers(others);
}
logger.info("Initialize mapreduce task params {}", JSONUtils.toPrettyJsonString(mapreduceParameters));
}
/**

2
dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java

@ -63,10 +63,10 @@ public class OpenmldbTask extends PythonTask {
@Override
public void init() {
logger.info("openmldb task params {}", taskRequest.getTaskParams());
openmldbParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), OpenmldbParameters.class);
logger.info("Initialize openmldb task params {}", JSONUtils.toPrettyJsonString(openmldbParameters));
if (openmldbParameters == null || !openmldbParameters.checkParameters()) {
throw new TaskException("openmldb task params is not valid");
}

8
dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java

@ -73,12 +73,12 @@ public class PigeonTask extends AbstractRemoteTask {
}
@Override
public void init() {
public void init() throws TaskException {
super.init();
logger.info("PIGEON task params {}", taskExecutionContext.getTaskParams());
parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PigeonParameters.class);
if (!parameters.checkParameters()) {
throw new RuntimeException("datax task params is not valid");
logger.info("Initialize PIGEON task params {}", JSONUtils.toPrettyJsonString(parameters));
if (parameters == null || !parameters.checkParameters()) {
throw new TaskException("datax task params is not valid");
}
}

7
dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

@ -75,14 +75,13 @@ public class ProcedureTask extends AbstractTask {
this.taskExecutionContext = taskExecutionContext;
logger.info("procedure task params {}", taskExecutionContext.getTaskParams());
this.procedureParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class);
logger.info("Initialize procedure task params {}", JSONUtils.toPrettyJsonString(procedureParameters));
// check parameters
if (!procedureParameters.checkParameters()) {
throw new RuntimeException("procedure task params is not valid");
if (procedureParameters == null || !procedureParameters.checkParameters()) {
throw new TaskException("procedure task params is not valid");
}
procedureTaskExecutionContext =

4
dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java

@ -78,11 +78,11 @@ public class PythonTask extends AbstractTask {
@Override
public void init() {
logger.info("python task params {}", taskRequest.getTaskParams());
pythonParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), PythonParameters.class);
if (!pythonParameters.checkParameters()) {
logger.info("Initialize python task params {}", JSONUtils.toPrettyJsonString(pythonParameters));
if (pythonParameters == null || !pythonParameters.checkParameters()) {
throw new TaskException("python task params is not valid");
}
}

4
dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java

@ -52,11 +52,11 @@ public class PytorchTask extends AbstractTask {
@Override
public void init() {
logger.info("python task params {}", taskExecutionContext.getTaskParams());
pytorchParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PytorchParameters.class);
logger.info("Initialize pytorch task params {}", JSONUtils.toPrettyJsonString(taskExecutionContext));
if (!pytorchParameters.checkParameters()) {
if (pytorchParameters == null || !pytorchParameters.checkParameters()) {
throw new TaskException("python task params is not valid");
}

16
dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java

@ -46,18 +46,20 @@ import com.amazonaws.services.sagemaker.AmazonSageMakerClientBuilder;
import com.amazonaws.services.sagemaker.model.StartPipelineExecutionRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.json.JsonMapper;
/**
* SagemakerTask task, Used to start Sagemaker pipeline
*/
public class SagemakerTask extends AbstractRemoteTask {
private static final ObjectMapper objectMapper =
new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.configure(REQUIRE_SETTERS_FOR_GETTERS, true)
.setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy());
private static final ObjectMapper objectMapper = JsonMapper.builder()
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true)
.configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true)
.configure(REQUIRE_SETTERS_FOR_GETTERS, true)
.propertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy())
.build();
/**
* SageMaker parameters
*/
@ -80,10 +82,10 @@ public class SagemakerTask extends AbstractRemoteTask {
@Override
public void init() {
logger.info("Sagemaker task params {}", taskRequest.getTaskParams());
parameters = JSONUtils.parseObject(taskRequest.getTaskParams(), SagemakerParameters.class);
logger.info("Initialize Sagemaker task params {}", JSONUtils.toPrettyJsonString(parameters));
if (parameters == null) {
throw new SagemakerTaskException("Sagemaker task params is empty");
}

7
dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
@ -86,9 +87,9 @@ public class SeatunnelTask extends AbstractRemoteTask {
@Override
public void init() {
logger.info("SeaTunnel task params {}", taskExecutionContext.getTaskParams());
if (!seatunnelParameters.checkParameters()) {
throw new RuntimeException("SeaTunnel task params is not valid");
logger.info("Intialize SeaTunnel task params {}", JSONUtils.toPrettyJsonString(seatunnelParameters));
if (seatunnelParameters == null || !seatunnelParameters.checkParameters()) {
throw new TaskException("SeaTunnel task params is not valid");
}
}

6
dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java

@ -76,12 +76,12 @@ public class ShellTask extends AbstractTask {
@Override
public void init() {
logger.info("shell task params {}", taskExecutionContext.getTaskParams());
shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class);
logger.info("Initialize shell task params {}", JSONUtils.toPrettyJsonString(shellParameters));
if (!shellParameters.checkParameters()) {
throw new RuntimeException("shell task params is not valid");
if (shellParameters == null || !shellParameters.checkParameters()) {
throw new TaskException("shell task params is not valid");
}
}

3
dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java

@ -65,8 +65,6 @@ public class SparkTask extends AbstractYarnTask {
@Override
public void init() {
logger.info("spark task params {}", taskExecutionContext.getTaskParams());
sparkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SparkParameters.class);
if (null == sparkParameters) {
@ -82,6 +80,7 @@ public class SparkTask extends AbstractYarnTask {
if (sparkParameters.getProgramType() != ProgramType.SQL) {
setMainJarName();
}
logger.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(sparkParameters));
}
/**

11
dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java

@ -110,13 +110,12 @@ public class SqlTask extends AbstractTask {
super(taskRequest);
this.taskExecutionContext = taskRequest;
this.sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class);
assert sqlParameters != null;
if (taskExecutionContext.getTestFlag() == TEST_FLAG_YES && this.sqlParameters.getDatasource() == 0) {
throw new RuntimeException("unbound test data source");
logger.info("Initialize sql task parameter {}", JSONUtils.toPrettyJsonString(sqlParameters));
if (sqlParameters == null || !sqlParameters.checkParameters()) {
throw new TaskException("sql task params is not valid");
}
if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
if (taskExecutionContext.getTestFlag() == TEST_FLAG_YES && this.sqlParameters.getDatasource() == 0) {
throw new TaskException("unbound test data source");
}
sqlTaskExecutionContext =

8
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java

@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.sqoop;
import org.apache.dolphinscheduler.common.log.SensitiveDataConverter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
@ -54,16 +55,15 @@ public class SqoopTask extends AbstractYarnTask {
@Override
public void init() {
logger.info("sqoop task params {}", taskExecutionContext.getTaskParams());
sqoopParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqoopParameters.class);
// check sqoop task params
logger.info("Initialize sqoop task params {}", JSONUtils.toPrettyJsonString(sqoopParameters));
if (null == sqoopParameters) {
throw new IllegalArgumentException("Sqoop Task params is null");
throw new TaskException("Sqoop Task params is null");
}
if (!sqoopParameters.checkParameters()) {
throw new IllegalArgumentException("Sqoop Task params check fail");
throw new TaskException("Sqoop Task params check fail");
}
sqoopTaskExecutionContext =

2
dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java

@ -71,11 +71,11 @@ public class ZeppelinTask extends AbstractRemoteTask {
@Override
public void init() {
final String taskParams = taskExecutionContext.getTaskParams();
logger.info("zeppelin task params:{}", taskParams);
this.zeppelinParameters = JSONUtils.parseObject(taskParams, ZeppelinParameters.class);
if (this.zeppelinParameters == null || !this.zeppelinParameters.checkParameters()) {
throw new ZeppelinTaskException("zeppelin task params is not valid");
}
logger.info("Initialize zeppelin task params:{}", JSONUtils.toPrettyJsonString(taskParams));
this.zClient = getZeppelinClient();
}

2
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java

@ -201,7 +201,7 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable {
taskExecutionContext.setTaskAppId(taskAppId);
logger.info("Set task appId: {}", taskAppId);
logger.info("End initialize task");
logger.info("End initialize task {}", JSONUtils.toPrettyJsonString(taskExecutionContext));
}
protected void beforeExecute() {

Loading…
Cancel
Save