From d7d756e7b0165bfcdc2e0bffcfcb45892e23feb5 Mon Sep 17 00:00:00 2001 From: Kerwin <37063904+zhuangchong@users.noreply.github.com> Date: Sat, 26 Mar 2022 20:15:41 +0800 Subject: [PATCH] Fix DataX and Sqoop Task execute NullPointerException (#9192) --- .../master/runner/task/BaseTaskProcessor.java | 3 ++ .../plugin/task/datax/DataxParameters.java | 13 +++++- .../task/sqoop/parameter/SqoopParameters.java | 45 +++++++++++-------- 3 files changed, 41 insertions(+), 20 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 359984ead4..c53a02e886 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -317,6 +317,9 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { map.forEach((code, parameters) -> { DataSource datasource = processService.findDataSourceById(code); + if (Objects.isNull(datasource)) { + return; + } DataSourceParameters dataSourceParameters = new DataSourceParameters(); dataSourceParameters.setType(datasource.getType()); dataSourceParameters.setConnectionParams(datasource.getConnectionParams()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java index e84508221a..d6fd449462 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java @@ -256,17 +256,26 @@ public class DataxParameters extends AbstractParameters { @Override public ResourceParametersHelper getResources() { ResourceParametersHelper resources = super.getResources(); + + if (customConfig == Flag.YES.ordinal()) { + return resources; + } resources.put(ResourceType.DATASOURCE, dataSource); resources.put(ResourceType.DATASOURCE, dataTarget); return resources; } public DataxTaskExecutionContext generateExtendedContext(ResourceParametersHelper parametersHelper) { - DataSourceParameters dbSource = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, dataSource); - DataSourceParameters dbTarget = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, dataTarget); DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); + if (customConfig == Flag.YES.ordinal()) { + return dataxTaskExecutionContext; + } + + DataSourceParameters dbSource = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, dataSource); + DataSourceParameters dbTarget = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, dataTarget); + if (Objects.nonNull(dbSource)) { dataxTaskExecutionContext.setDataSourceId(dataSource); dataxTaskExecutionContext.setSourcetype(dbSource.getType()); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/parameter/SqoopParameters.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/parameter/SqoopParameters.java index 3a7f31fd2e..b6019aa8ce 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/parameter/SqoopParameters.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/parameter/SqoopParameters.java @@ -206,37 +206,46 @@ public class SqoopParameters extends AbstractParameters { @Override public ResourceParametersHelper getResources() { ResourceParametersHelper resources = super.getResources(); - if (SqoopJobType.TEMPLATE.getDescp().equals(this.getJobType())) { - SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(this.getSourceParams(), SourceMysqlParameter.class); - TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(this.getTargetParams(), TargetMysqlParameter.class); + if (!SqoopJobType.TEMPLATE.getDescp().equals(this.getJobType())) { + return resources; + } + + SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(this.getSourceParams(), SourceMysqlParameter.class); + if (sourceMysqlParameter.getSrcDatasource() != 0) { resources.put(ResourceType.DATASOURCE, sourceMysqlParameter.getSrcDatasource()); + } + + TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(this.getTargetParams(), TargetMysqlParameter.class); + if (targetMysqlParameter.getTargetDatasource() != 0) { resources.put(ResourceType.DATASOURCE, targetMysqlParameter.getTargetDatasource()); } + return resources; } public SqoopTaskExecutionContext generateExtendedContext(ResourceParametersHelper parametersHelper) { SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext(); + if (!SqoopJobType.TEMPLATE.getDescp().equals(this.getJobType())) { + return sqoopTaskExecutionContext; + } - if (SqoopJobType.TEMPLATE.getDescp().equals(this.getJobType())) { - SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(this.getSourceParams(), SourceMysqlParameter.class); - TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(this.getTargetParams(), TargetMysqlParameter.class); + SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(this.getSourceParams(), SourceMysqlParameter.class); + TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(this.getTargetParams(), TargetMysqlParameter.class); - DataSourceParameters dataSource = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, sourceMysqlParameter.getSrcDatasource()); - DataSourceParameters dataTarget = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, targetMysqlParameter.getTargetDatasource()); + DataSourceParameters dataSource = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, sourceMysqlParameter.getSrcDatasource()); + DataSourceParameters dataTarget = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, targetMysqlParameter.getTargetDatasource()); - if (Objects.nonNull(dataSource)) { - sqoopTaskExecutionContext.setDataSourceId(sourceMysqlParameter.getSrcDatasource()); - sqoopTaskExecutionContext.setSourcetype(dataSource.getType()); - sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); - } + if (Objects.nonNull(dataSource)) { + sqoopTaskExecutionContext.setDataSourceId(sourceMysqlParameter.getSrcDatasource()); + sqoopTaskExecutionContext.setSourcetype(dataSource.getType()); + sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + } - if (Objects.nonNull(dataTarget)) { - sqoopTaskExecutionContext.setDataTargetId(targetMysqlParameter.getTargetDatasource()); - sqoopTaskExecutionContext.setTargetType(dataTarget.getType()); - sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); - } + if (Objects.nonNull(dataTarget)) { + sqoopTaskExecutionContext.setDataTargetId(targetMysqlParameter.getTargetDatasource()); + sqoopTaskExecutionContext.setTargetType(dataTarget.getType()); + sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); } return sqoopTaskExecutionContext;