diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java index 8c0edcb696..55d548ca23 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java @@ -54,6 +54,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -72,21 +73,17 @@ public class JSONUtils { logger.info("init timezone: {}", TimeZone.getDefault()); } - private static final SimpleModule LOCAL_DATE_TIME_MODULE = new SimpleModule() - .addSerializer(LocalDateTime.class, new LocalDateTimeSerializer()) - .addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer()); - - /** - * can use static singleton, inject: just make sure to reuse! - */ - private static final ObjectMapper objectMapper = new ObjectMapper() + private static final ObjectMapper objectMapper = JsonMapper.builder() .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) .configure(REQUIRE_SETTERS_FOR_GETTERS, true) - .registerModule(LOCAL_DATE_TIME_MODULE) - .setTimeZone(TimeZone.getDefault()) - .setDateFormat(new SimpleDateFormat(YYYY_MM_DD_HH_MM_SS)); + .addModule(new SimpleModule() + .addSerializer(LocalDateTime.class, new LocalDateTimeSerializer()) + .addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer())) + .defaultTimeZone(TimeZone.getDefault()) + .defaultDateFormat(new SimpleDateFormat(YYYY_MM_DD_HH_MM_SS)) + .build(); private JSONUtils() { throw new UnsupportedOperationException("Construct JSONUtils"); @@ -325,6 +322,14 @@ public class JSONUtils { } } + public static String toPrettyJsonString(Object object) { + try { + return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(object); + } catch (Exception e) { + throw new RuntimeException("Object json deserialization exception.", e); + } + } + /** * serialize to json byte * diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 1036aad83a..cc51b96d4d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -320,12 +320,14 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { this.setTaskResourceInfo(resources); // TODO to be optimized - DataQualityTaskExecutionContext dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext(); + DataQualityTaskExecutionContext dataQualityTaskExecutionContext = null; if (TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) { + dataQualityTaskExecutionContext = new DataQualityTaskExecutionContext(); setDataQualityTaskRelation(dataQualityTaskExecutionContext, taskInstance, tenant.getTenantCode()); } - K8sTaskExecutionContext k8sTaskExecutionContext = new K8sTaskExecutionContext(); + K8sTaskExecutionContext k8sTaskExecutionContext = null; if (TASK_TYPE_SET_K8S.contains(taskInstance.getTaskType())) { + k8sTaskExecutionContext = new K8sTaskExecutionContext(); setK8sTaskRelation(k8sTaskExecutionContext, taskInstance); } diff --git a/dolphinscheduler-standalone-server/src/main/resources/application.yaml b/dolphinscheduler-standalone-server/src/main/resources/application.yaml index fdd47afcfe..5a995a6dcc 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/application.yaml +++ b/dolphinscheduler-standalone-server/src/main/resources/application.yaml @@ -135,17 +135,15 @@ master: host-selector: lower_weight # master heartbeat interval heartbeat-interval: 10s - # Master heart beat task error threshold, if the continuous error count exceed this count, the master will close. - heartbeat-error-threshold: 5 # master commit task retry times task-commit-retry-times: 5 # master commit task interval task-commit-interval: 1s state-wheel-interval: 5s # master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2 - max-cpu-load-avg: -1 + max-cpu-load-avg: 50 # master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G - reserved-memory: 0.3 + reserved-memory: 0.03 # failover interval failover-interval: 10m # kill yarn jon when failover taskInstance, default true @@ -159,8 +157,6 @@ worker: exec-threads: 10 # worker heartbeat interval heartbeat-interval: 10s - # Worker heart beat task error threshold, if the continuous error count exceed this count, the worker will close. - heartbeat-error-threshold: 5 # worker host weight to dispatch tasks, default value 100 host-weight: 100 # tenant corresponds to the user of the system, which is used by the worker to submit the job. If system does not have this user, it will be automatically created after the parameter worker.tenant.auto.create is true. diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index e33ebcd59f..4e02f954ed 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -30,6 +30,8 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import com.fasterxml.jackson.annotation.JsonInclude; + /** * to master/worker task transport */ @@ -37,6 +39,7 @@ import lombok.NoArgsConstructor; @Builder @NoArgsConstructor @AllArgsConstructor +@JsonInclude(JsonInclude.Include.NON_NULL) public class TaskExecutionContext implements Serializable { private static final long serialVersionUID = -1L; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java index a4ccda4b37..2e4e8792bc 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java @@ -94,8 +94,9 @@ public class ChunJunTask extends AbstractTask { */ @Override public void init() { - logger.info("chunjun task params {}", taskExecutionContext.getTaskParams()); chunJunParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ChunJunParameters.class); + logger.info("Initialize chunjun task params {}", + JSONUtils.toPrettyJsonString(taskExecutionContext.getTaskParams())); if (!chunJunParameters.checkParameters()) { throw new RuntimeException("chunjun task params is not valid"); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java index 2d5bcf92c7..c1360eca94 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dataquality/src/main/java/org/apache/dolphinscheduler/plugin/task/dq/DataQualityTask.java @@ -82,10 +82,10 @@ public class DataQualityTask extends AbstractYarnTask { @Override public void init() { - logger.info("data quality task params {}", dqTaskExecutionContext.getTaskParams()); dataQualityParameters = JSONUtils.parseObject(dqTaskExecutionContext.getTaskParams(), DataQualityParameters.class); + logger.info("Initialize data quality task params {}", JSONUtils.toPrettyJsonString(dataQualityParameters)); if (null == dataQualityParameters) { logger.error("data quality params is null"); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java index f27b9aa335..16a6827879 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datasync/src/main/java/org/apache/dolphinscheduler/plugin/task/datasync/DatasyncTask.java @@ -68,9 +68,9 @@ public class DatasyncTask extends AbstractRemoteTask { @Override public void init() { - logger.info("Datasync task params {}", taskExecutionContext.getTaskParams()); parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DatasyncParameters.class); + logger.info("Initialize Datasync task params {}", JSONUtils.toPrettyJsonString(parameters)); initParams(); hook = new DatasyncHook(); @@ -84,10 +84,9 @@ public class DatasyncTask extends AbstractRemoteTask { try { parameters = objectMapper.readValue(parameters.getJson(), DatasyncParameters.class); } catch (JsonProcessingException e) { - throw new RuntimeException(e); + throw new TaskException("Convert json to task params failed", e); } - // parameters = JSONUtils.parseObject(parameters.getJson(), DatasyncParameters.class); - logger.info("Datasync convert task params {}", parameters); + logger.info("Success convert json to task params {}", JSONUtils.toPrettyJsonString(parameters)); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java index 0dc3086ec5..1caf9cfa4f 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxTask.java @@ -134,8 +134,8 @@ public class DataxTask extends AbstractTask { */ @Override public void init() { - logger.info("datax task params {}", taskExecutionContext.getTaskParams()); dataXParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DataxParameters.class); + logger.info("Initialize datax task params {}", JSONUtils.toPrettyJsonString(dataXParameters)); if (dataXParameters == null || !dataXParameters.checkParameters()) { throw new RuntimeException("datax task params is not valid"); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java index 270a983c34..f4b4b1eb0d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dinky/src/main/java/org/apache/dolphinscheduler/plugin/task/dinky/DinkyTask.java @@ -80,8 +80,8 @@ public class DinkyTask extends AbstractRemoteTask { @Override public void init() { final String taskParams = taskExecutionContext.getTaskParams(); - logger.info("dinky task params:{}", taskParams); this.dinkyParameters = JSONUtils.parseObject(taskParams, DinkyParameters.class); + logger.info("Initialize dinky task params: {}", JSONUtils.toPrettyJsonString(dinkyParameters)); if (this.dinkyParameters == null || !this.dinkyParameters.checkParameters()) { throw new DinkyTaskException("dinky task params is not valid"); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java index f64ae05164..ef60e67f58 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java @@ -40,15 +40,18 @@ import com.amazonaws.services.databasemigrationservice.model.InvalidResourceStat import com.amazonaws.services.databasemigrationservice.model.ReplicationTask; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.fasterxml.jackson.databind.json.JsonMapper; public class DmsTask extends AbstractRemoteTask { private static final ObjectMapper objectMapper = - new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false) + JsonMapper.builder() + .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) .configure(REQUIRE_SETTERS_FOR_GETTERS, true) - .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy()); + .propertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy()) + .build(); /** * taskExecutionContext */ @@ -68,8 +71,8 @@ public class DmsTask extends AbstractRemoteTask { @Override public void init() throws TaskException { - logger.info("Dms task params {}", taskExecutionContext.getTaskParams()); parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DmsParameters.class); + logger.info("Initialize Dms task params {}", JSONUtils.toPrettyJsonString(parameters)); initDmsHook(); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java index bf971e25ce..af87f87e27 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-dvc/src/main/java/org/apache/dolphinscheduler/plugin/task/dvc/DvcTask.java @@ -65,12 +65,12 @@ public class DvcTask extends AbstractTask { @Override public void init() { - logger.info("dvc task params {}", taskExecutionContext.getTaskParams()); parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), DvcParameters.class); + logger.info("Initialize dvc task params {}", JSONUtils.toPrettyJsonString(parameters)); - if (!parameters.checkParameters()) { - throw new RuntimeException("dvc task params is not valid"); + if (parameters == null || !parameters.checkParameters()) { + throw new TaskException("dvc task params is not valid"); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java index da88fa6de5..e3f8a8d73d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-emr/src/main/java/org/apache/dolphinscheduler/plugin/task/emr/AbstractEmrTask.java @@ -78,8 +78,8 @@ public abstract class AbstractEmrTask extends AbstractRemoteTask { @Override public void init() { final String taskParams = taskExecutionContext.getTaskParams(); - logger.info("emr task params:{}", taskParams); emrParameters = JSONUtils.parseObject(taskParams, EmrParameters.class); + logger.info("Initialize emr task params:{}", JSONUtils.toPrettyJsonString(taskParams)); if (emrParameters == null || !emrParameters.checkParameters()) { throw new EmrTaskException("emr task params is not valid"); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java index 6d895d8834..6608d701f3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink-stream/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkStreamTask.java @@ -50,9 +50,9 @@ public class FlinkStreamTask extends FlinkTask implements StreamTask { @Override public void init() { - logger.info("flink task params {}", taskExecutionContext.getTaskParams()); flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkStreamParameters.class); + logger.info("Initialize Flink task params {}", JSONUtils.toPrettyJsonString(flinkParameters)); if (flinkParameters == null || !flinkParameters.checkParameters()) { throw new RuntimeException("flink task params is not valid"); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java index df454a4f33..115c5954a5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-flink/src/main/java/org/apache/dolphinscheduler/plugin/task/flink/FlinkTask.java @@ -53,9 +53,9 @@ public class FlinkTask extends AbstractYarnTask { @Override public void init() { - logger.info("flink task params {}", taskExecutionContext.getTaskParams()); flinkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), FlinkParameters.class); + logger.info("Initialize flink task params {}", JSONUtils.toPrettyJsonString(flinkParameters)); if (flinkParameters == null || !flinkParameters.checkParameters()) { throw new RuntimeException("flink task params is not valid"); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java index 1d65f185a7..735c4a4830 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java @@ -80,10 +80,10 @@ public class HttpTask extends AbstractTask { @Override public void init() { - logger.info("http task params {}", taskExecutionContext.getTaskParams()); this.httpParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), HttpParameters.class); + logger.info("Initialize http task params {}", JSONUtils.toPrettyJsonString(httpParameters)); - if (!httpParameters.checkParameters()) { + if (httpParameters == null || !httpParameters.checkParameters()) { throw new RuntimeException("http task params is not valid"); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java index 950854845d..8ac5cf8ae1 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-java/src/main/java/org/apache/dolphinscheduler/plugin/task/java/JavaTask.java @@ -89,7 +89,6 @@ public class JavaTask extends AbstractTask { **/ @Override public void init() { - logger.info("java task params {}", taskRequest.getTaskParams()); javaParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), JavaParameters.class); if (javaParameters == null || !javaParameters.checkParameters()) { throw new TaskException("java task params is not valid"); @@ -97,6 +96,7 @@ public class JavaTask extends AbstractTask { if (javaParameters.getRunType().equals(JavaConstants.RUN_TYPE_JAR)) { setMainJarName(); } + logger.info("Initialize java task params {}", JSONUtils.toPrettyJsonString(javaParameters)); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java index 9c32ab2a38..f3155a933a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-jupyter/src/main/java/org/apache/dolphinscheduler/plugin/task/jupyter/JupyterTask.java @@ -65,9 +65,9 @@ public class JupyterTask extends AbstractRemoteTask { @Override public void init() { - logger.info("jupyter task params {}", taskExecutionContext.getTaskParams()); jupyterParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), JupyterParameters.class); + logger.info("Initialize jupyter task params {}", JSONUtils.toPrettyJsonString(jupyterParameters)); if (null == jupyterParameters) { logger.error("jupyter params is null"); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java index 88faf9cf81..cd756c76b3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-k8s/src/main/java/org/apache/dolphinscheduler/plugin/task/k8s/K8sTask.java @@ -53,7 +53,8 @@ public class K8sTask extends AbstractK8sTask { super(taskRequest); this.taskExecutionContext = taskRequest; this.k8sTaskParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), K8sTaskParameters.class); - if (!k8sTaskParameters.checkParameters()) { + logger.info("Initialize k8s task parameters {}", JSONUtils.toPrettyJsonString(k8sTaskParameters)); + if (k8sTaskParameters == null || !k8sTaskParameters.checkParameters()) { throw new TaskException("K8S task params is not valid"); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java index 39adf04bf3..5355ae1d24 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-kubeflow/src/main/java/org/apache/dolphinscheduler/plugin/kubeflow/KubeflowTask.java @@ -53,8 +53,8 @@ public class KubeflowTask extends AbstractRemoteTask { @Override public void init() throws TaskException { - logger.info("Kubeflow task params {}", taskExecutionContext.getTaskParams()); kubeflowParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), KubeflowParameters.class); + logger.info("Initialize Kubeflow task params {}", taskExecutionContext.getTaskParams()); kubeflowParameters.setClusterYAML(taskExecutionContext.getK8sTaskExecutionContext().getConfigYaml()); if (!kubeflowParameters.checkParameters()) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java index c11888e4a8..9e4a2944f3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-linkis/src/main/java/org/apache/dolphinscheduler/plugin/task/linkis/LinkisTask.java @@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_COD import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskException; @@ -88,7 +89,9 @@ public class LinkisTask extends AbstractRemoteTask { @Override public void init() { - logger.info("Linkis task params {}", taskExecutionContext.getTaskParams()); + linkisParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), LinkisParameters.class); + logger.info("Initialize Linkis task params {}", JSONUtils.toPrettyJsonString(linkisParameters)); + if (!linkisParameters.checkParameters()) { throw new RuntimeException("Linkis task params is not valid"); } @@ -251,8 +254,4 @@ public class LinkisTask extends AbstractRemoteTask { Map paramsMap = taskExecutionContext.getPrepareParamsMap(); return ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); } - - public void setLinkisParameters(LinkisParameters linkisParameters) { - this.linkisParameters = linkisParameters; - } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java index a17a23caf0..3800cd846c 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mlflow/src/main/java/org/apache/dolphinscheduler/plugin/task/mlflow/MlflowTask.java @@ -102,12 +102,12 @@ public class MlflowTask extends AbstractTask { @Override public void init() { - logger.info("shell task params {}", taskExecutionContext.getTaskParams()); mlflowParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MlflowParameters.class); - if (!mlflowParameters.checkParameters()) { - throw new RuntimeException("shell task params is not valid"); + logger.info("Initialize MLFlow task params {}", JSONUtils.toPrettyJsonString(mlflowParameters)); + if (mlflowParameters == null || !mlflowParameters.checkParameters()) { + throw new RuntimeException("MLFlow task params is not valid"); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java index 062884b07b..563d55346d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-mr/src/main/java/org/apache/dolphinscheduler/plugin/task/mr/MapReduceTask.java @@ -64,8 +64,6 @@ public class MapReduceTask extends AbstractYarnTask { @Override public void init() { - logger.info("mapreduce task params {}", taskExecutionContext.getTaskParams()); - this.mapreduceParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), MapReduceParameters.class); @@ -89,6 +87,7 @@ public class MapReduceTask extends AbstractYarnTask { ParamUtils.convert(paramsMap)); mapreduceParameters.setOthers(others); } + logger.info("Initialize mapreduce task params {}", JSONUtils.toPrettyJsonString(mapreduceParameters)); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java index 8adf717300..f34934f84d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-openmldb/src/main/java/org/apache/dolphinscheduler/plugin/task/openmldb/OpenmldbTask.java @@ -63,10 +63,10 @@ public class OpenmldbTask extends PythonTask { @Override public void init() { - logger.info("openmldb task params {}", taskRequest.getTaskParams()); openmldbParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), OpenmldbParameters.class); + logger.info("Initialize openmldb task params {}", JSONUtils.toPrettyJsonString(openmldbParameters)); if (openmldbParameters == null || !openmldbParameters.checkParameters()) { throw new TaskException("openmldb task params is not valid"); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java index 5f6c452ac9..a2ff351b88 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pigeon/src/main/java/org/apache/dolphinscheduler/plugin/task/pigeon/PigeonTask.java @@ -73,12 +73,12 @@ public class PigeonTask extends AbstractRemoteTask { } @Override - public void init() { + public void init() throws TaskException { super.init(); - logger.info("PIGEON task params {}", taskExecutionContext.getTaskParams()); parameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PigeonParameters.class); - if (!parameters.checkParameters()) { - throw new RuntimeException("datax task params is not valid"); + logger.info("Initialize PIGEON task params {}", JSONUtils.toPrettyJsonString(parameters)); + if (parameters == null || !parameters.checkParameters()) { + throw new TaskException("datax task params is not valid"); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java index 69d04db280..68b6c29e09 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java @@ -75,14 +75,13 @@ public class ProcedureTask extends AbstractTask { this.taskExecutionContext = taskExecutionContext; - logger.info("procedure task params {}", taskExecutionContext.getTaskParams()); - this.procedureParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class); + logger.info("Initialize procedure task params {}", JSONUtils.toPrettyJsonString(procedureParameters)); // check parameters - if (!procedureParameters.checkParameters()) { - throw new RuntimeException("procedure task params is not valid"); + if (procedureParameters == null || !procedureParameters.checkParameters()) { + throw new TaskException("procedure task params is not valid"); } procedureTaskExecutionContext = diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java index 208797f377..13e9277352 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-python/src/main/java/org/apache/dolphinscheduler/plugin/task/python/PythonTask.java @@ -78,11 +78,11 @@ public class PythonTask extends AbstractTask { @Override public void init() { - logger.info("python task params {}", taskRequest.getTaskParams()); pythonParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), PythonParameters.class); - if (!pythonParameters.checkParameters()) { + logger.info("Initialize python task params {}", JSONUtils.toPrettyJsonString(pythonParameters)); + if (pythonParameters == null || !pythonParameters.checkParameters()) { throw new TaskException("python task params is not valid"); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java index 6a7f4e53e8..31d63e27f3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-pytorch/src/main/java/org/apache/dolphinscheduler/plugin/task/pytorch/PytorchTask.java @@ -52,11 +52,11 @@ public class PytorchTask extends AbstractTask { @Override public void init() { - logger.info("python task params {}", taskExecutionContext.getTaskParams()); pytorchParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), PytorchParameters.class); + logger.info("Initialize pytorch task params {}", JSONUtils.toPrettyJsonString(taskExecutionContext)); - if (!pytorchParameters.checkParameters()) { + if (pytorchParameters == null || !pytorchParameters.checkParameters()) { throw new TaskException("python task params is not valid"); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java index b11827dc43..e49d1acff3 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sagemaker/src/main/java/org/apache/dolphinscheduler/plugin/task/sagemaker/SagemakerTask.java @@ -46,18 +46,20 @@ import com.amazonaws.services.sagemaker.AmazonSageMakerClientBuilder; import com.amazonaws.services.sagemaker.model.StartPipelineExecutionRequest; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.PropertyNamingStrategy; +import com.fasterxml.jackson.databind.json.JsonMapper; /** * SagemakerTask task, Used to start Sagemaker pipeline */ public class SagemakerTask extends AbstractRemoteTask { - private static final ObjectMapper objectMapper = - new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false) - .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) - .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) - .configure(REQUIRE_SETTERS_FOR_GETTERS, true) - .setPropertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy()); + private static final ObjectMapper objectMapper = JsonMapper.builder() + .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true) + .configure(READ_UNKNOWN_ENUM_VALUES_AS_NULL, true) + .configure(REQUIRE_SETTERS_FOR_GETTERS, true) + .propertyNamingStrategy(new PropertyNamingStrategy.UpperCamelCaseStrategy()) + .build(); /** * SageMaker parameters */ @@ -80,10 +82,10 @@ public class SagemakerTask extends AbstractRemoteTask { @Override public void init() { - logger.info("Sagemaker task params {}", taskRequest.getTaskParams()); parameters = JSONUtils.parseObject(taskRequest.getTaskParams(), SagemakerParameters.class); + logger.info("Initialize Sagemaker task params {}", JSONUtils.toPrettyJsonString(parameters)); if (parameters == null) { throw new SagemakerTaskException("Sagemaker task params is empty"); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java index 5433f3d828..35d9a42d1b 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-seatunnel/src/main/java/org/apache/dolphinscheduler/plugin/task/seatunnel/SeatunnelTask.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.seatunnel; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; import static org.apache.dolphinscheduler.plugin.task.seatunnel.Constants.CONFIG_OPTIONS; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask; import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor; import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack; @@ -86,9 +87,9 @@ public class SeatunnelTask extends AbstractRemoteTask { @Override public void init() { - logger.info("SeaTunnel task params {}", taskExecutionContext.getTaskParams()); - if (!seatunnelParameters.checkParameters()) { - throw new RuntimeException("SeaTunnel task params is not valid"); + logger.info("Intialize SeaTunnel task params {}", JSONUtils.toPrettyJsonString(seatunnelParameters)); + if (seatunnelParameters == null || !seatunnelParameters.checkParameters()) { + throw new TaskException("SeaTunnel task params is not valid"); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java index 681920c0c1..b1b854497a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-shell/src/main/java/org/apache/dolphinscheduler/plugin/task/shell/ShellTask.java @@ -76,12 +76,12 @@ public class ShellTask extends AbstractTask { @Override public void init() { - logger.info("shell task params {}", taskExecutionContext.getTaskParams()); shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class); + logger.info("Initialize shell task params {}", JSONUtils.toPrettyJsonString(shellParameters)); - if (!shellParameters.checkParameters()) { - throw new RuntimeException("shell task params is not valid"); + if (shellParameters == null || !shellParameters.checkParameters()) { + throw new TaskException("shell task params is not valid"); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java index b71ebc9e19..6796357d8a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-spark/src/main/java/org/apache/dolphinscheduler/plugin/task/spark/SparkTask.java @@ -65,8 +65,6 @@ public class SparkTask extends AbstractYarnTask { @Override public void init() { - logger.info("spark task params {}", taskExecutionContext.getTaskParams()); - sparkParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SparkParameters.class); if (null == sparkParameters) { @@ -82,6 +80,7 @@ public class SparkTask extends AbstractYarnTask { if (sparkParameters.getProgramType() != ProgramType.SQL) { setMainJarName(); } + logger.info("Initialize spark task params {}", JSONUtils.toPrettyJsonString(sparkParameters)); } /** diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 7fe98c78f9..5b4d083be6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -110,13 +110,12 @@ public class SqlTask extends AbstractTask { super(taskRequest); this.taskExecutionContext = taskRequest; this.sqlParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqlParameters.class); - - assert sqlParameters != null; - if (taskExecutionContext.getTestFlag() == TEST_FLAG_YES && this.sqlParameters.getDatasource() == 0) { - throw new RuntimeException("unbound test data source"); + logger.info("Initialize sql task parameter {}", JSONUtils.toPrettyJsonString(sqlParameters)); + if (sqlParameters == null || !sqlParameters.checkParameters()) { + throw new TaskException("sql task params is not valid"); } - if (!sqlParameters.checkParameters()) { - throw new RuntimeException("sql task params is not valid"); + if (taskExecutionContext.getTestFlag() == TEST_FLAG_YES && this.sqlParameters.getDatasource() == 0) { + throw new TaskException("unbound test data source"); } sqlTaskExecutionContext = diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java index c2eb473ce7..f62c9d466d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/SqoopTask.java @@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.sqoop; import org.apache.dolphinscheduler.common.log.SensitiveDataConverter; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.plugin.task.api.AbstractYarnTask; +import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; @@ -54,16 +55,15 @@ public class SqoopTask extends AbstractYarnTask { @Override public void init() { - logger.info("sqoop task params {}", taskExecutionContext.getTaskParams()); sqoopParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), SqoopParameters.class); - // check sqoop task params + logger.info("Initialize sqoop task params {}", JSONUtils.toPrettyJsonString(sqoopParameters)); if (null == sqoopParameters) { - throw new IllegalArgumentException("Sqoop Task params is null"); + throw new TaskException("Sqoop Task params is null"); } if (!sqoopParameters.checkParameters()) { - throw new IllegalArgumentException("Sqoop Task params check fail"); + throw new TaskException("Sqoop Task params check fail"); } sqoopTaskExecutionContext = diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java index 3cdea35637..05d0a7ceee 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-zeppelin/src/main/java/org/apache/dolphinscheduler/plugin/task/zeppelin/ZeppelinTask.java @@ -71,11 +71,11 @@ public class ZeppelinTask extends AbstractRemoteTask { @Override public void init() { final String taskParams = taskExecutionContext.getTaskParams(); - logger.info("zeppelin task params:{}", taskParams); this.zeppelinParameters = JSONUtils.parseObject(taskParams, ZeppelinParameters.class); if (this.zeppelinParameters == null || !this.zeppelinParameters.checkParameters()) { throw new ZeppelinTaskException("zeppelin task params is not valid"); } + logger.info("Initialize zeppelin task params:{}", JSONUtils.toPrettyJsonString(taskParams)); this.zClient = getZeppelinClient(); } diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java index d8c815dccc..a005fd1d0c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java @@ -201,7 +201,7 @@ public abstract class WorkerTaskExecuteRunnable implements Runnable { taskExecutionContext.setTaskAppId(taskAppId); logger.info("Set task appId: {}", taskAppId); - logger.info("End initialize task"); + logger.info("End initialize task {}", JSONUtils.toPrettyJsonString(taskExecutionContext)); } protected void beforeExecute() {