Browse Source

Fix DataX and Sqoop Task execute NullPointerException (#9192)

3.0.0/version-upgrade
Kerwin 3 years ago committed by GitHub
parent
commit
d7d756e7b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java
  2. 13
      dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java
  3. 45
      dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/parameter/SqoopParameters.java

3
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java

@ -317,6 +317,9 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
map.forEach((code, parameters) -> { map.forEach((code, parameters) -> {
DataSource datasource = processService.findDataSourceById(code); DataSource datasource = processService.findDataSourceById(code);
if (Objects.isNull(datasource)) {
return;
}
DataSourceParameters dataSourceParameters = new DataSourceParameters(); DataSourceParameters dataSourceParameters = new DataSourceParameters();
dataSourceParameters.setType(datasource.getType()); dataSourceParameters.setType(datasource.getType());
dataSourceParameters.setConnectionParams(datasource.getConnectionParams()); dataSourceParameters.setConnectionParams(datasource.getConnectionParams());

13
dolphinscheduler-task-plugin/dolphinscheduler-task-datax/src/main/java/org/apache/dolphinscheduler/plugin/task/datax/DataxParameters.java

@ -256,17 +256,26 @@ public class DataxParameters extends AbstractParameters {
@Override @Override
public ResourceParametersHelper getResources() { public ResourceParametersHelper getResources() {
ResourceParametersHelper resources = super.getResources(); ResourceParametersHelper resources = super.getResources();
if (customConfig == Flag.YES.ordinal()) {
return resources;
}
resources.put(ResourceType.DATASOURCE, dataSource); resources.put(ResourceType.DATASOURCE, dataSource);
resources.put(ResourceType.DATASOURCE, dataTarget); resources.put(ResourceType.DATASOURCE, dataTarget);
return resources; return resources;
} }
public DataxTaskExecutionContext generateExtendedContext(ResourceParametersHelper parametersHelper) { public DataxTaskExecutionContext generateExtendedContext(ResourceParametersHelper parametersHelper) {
DataSourceParameters dbSource = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, dataSource);
DataSourceParameters dbTarget = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, dataTarget);
DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext();
if (customConfig == Flag.YES.ordinal()) {
return dataxTaskExecutionContext;
}
DataSourceParameters dbSource = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, dataSource);
DataSourceParameters dbTarget = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, dataTarget);
if (Objects.nonNull(dbSource)) { if (Objects.nonNull(dbSource)) {
dataxTaskExecutionContext.setDataSourceId(dataSource); dataxTaskExecutionContext.setDataSourceId(dataSource);
dataxTaskExecutionContext.setSourcetype(dbSource.getType()); dataxTaskExecutionContext.setSourcetype(dbSource.getType());

45
dolphinscheduler-task-plugin/dolphinscheduler-task-sqoop/src/main/java/org/apache/dolphinscheduler/plugin/task/sqoop/parameter/SqoopParameters.java

@ -206,37 +206,46 @@ public class SqoopParameters extends AbstractParameters {
@Override @Override
public ResourceParametersHelper getResources() { public ResourceParametersHelper getResources() {
ResourceParametersHelper resources = super.getResources(); ResourceParametersHelper resources = super.getResources();
if (SqoopJobType.TEMPLATE.getDescp().equals(this.getJobType())) { if (!SqoopJobType.TEMPLATE.getDescp().equals(this.getJobType())) {
SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(this.getSourceParams(), SourceMysqlParameter.class); return resources;
TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(this.getTargetParams(), TargetMysqlParameter.class); }
SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(this.getSourceParams(), SourceMysqlParameter.class);
if (sourceMysqlParameter.getSrcDatasource() != 0) {
resources.put(ResourceType.DATASOURCE, sourceMysqlParameter.getSrcDatasource()); resources.put(ResourceType.DATASOURCE, sourceMysqlParameter.getSrcDatasource());
}
TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(this.getTargetParams(), TargetMysqlParameter.class);
if (targetMysqlParameter.getTargetDatasource() != 0) {
resources.put(ResourceType.DATASOURCE, targetMysqlParameter.getTargetDatasource()); resources.put(ResourceType.DATASOURCE, targetMysqlParameter.getTargetDatasource());
} }
return resources; return resources;
} }
public SqoopTaskExecutionContext generateExtendedContext(ResourceParametersHelper parametersHelper) { public SqoopTaskExecutionContext generateExtendedContext(ResourceParametersHelper parametersHelper) {
SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext(); SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext();
if (!SqoopJobType.TEMPLATE.getDescp().equals(this.getJobType())) {
return sqoopTaskExecutionContext;
}
if (SqoopJobType.TEMPLATE.getDescp().equals(this.getJobType())) { SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(this.getSourceParams(), SourceMysqlParameter.class);
SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(this.getSourceParams(), SourceMysqlParameter.class); TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(this.getTargetParams(), TargetMysqlParameter.class);
TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(this.getTargetParams(), TargetMysqlParameter.class);
DataSourceParameters dataSource = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, sourceMysqlParameter.getSrcDatasource()); DataSourceParameters dataSource = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, sourceMysqlParameter.getSrcDatasource());
DataSourceParameters dataTarget = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, targetMysqlParameter.getTargetDatasource()); DataSourceParameters dataTarget = (DataSourceParameters) parametersHelper.getResourceParameters(ResourceType.DATASOURCE, targetMysqlParameter.getTargetDatasource());
if (Objects.nonNull(dataSource)) { if (Objects.nonNull(dataSource)) {
sqoopTaskExecutionContext.setDataSourceId(sourceMysqlParameter.getSrcDatasource()); sqoopTaskExecutionContext.setDataSourceId(sourceMysqlParameter.getSrcDatasource());
sqoopTaskExecutionContext.setSourcetype(dataSource.getType()); sqoopTaskExecutionContext.setSourcetype(dataSource.getType());
sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
} }
if (Objects.nonNull(dataTarget)) { if (Objects.nonNull(dataTarget)) {
sqoopTaskExecutionContext.setDataTargetId(targetMysqlParameter.getTargetDatasource()); sqoopTaskExecutionContext.setDataTargetId(targetMysqlParameter.getTargetDatasource());
sqoopTaskExecutionContext.setTargetType(dataTarget.getType()); sqoopTaskExecutionContext.setTargetType(dataTarget.getType());
sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
}
} }
return sqoopTaskExecutionContext; return sqoopTaskExecutionContext;

Loading…
Cancel
Save