diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 7ad4974bea..94349b7b5d 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -739,6 +739,7 @@ public final class Constants { public static final String ALIAS = "alias"; public static final String CONTENT = "content"; public static final String DEPENDENT_SPLIT = ":||"; + public static final String DEPENDENT_ALL = "ALL"; /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java new file mode 100644 index 0000000000..c74414bb21 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/SqoopTaskExecutionContext.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.entity; + +import java.io.Serializable; + +/** + * master/worker task transport + */ +public class SqoopTaskExecutionContext implements Serializable{ + + /** + * dataSourceId + */ + private int dataSourceId; + + /** + * sourcetype + */ + private int sourcetype; + + /** + * sourceConnectionParams + */ + private String sourceConnectionParams; + + /** + * dataTargetId + */ + private int dataTargetId; + + /** + * targetType + */ + private int targetType; + + /** + * targetConnectionParams + */ + private String targetConnectionParams; + + public int getDataSourceId() { + return dataSourceId; + } + + public void setDataSourceId(int dataSourceId) { + this.dataSourceId = dataSourceId; + } + + public int getSourcetype() { + return sourcetype; + } + + public void setSourcetype(int sourcetype) { + this.sourcetype = sourcetype; + } + + public String getSourceConnectionParams() { + return sourceConnectionParams; + } + + public void setSourceConnectionParams(String sourceConnectionParams) { + this.sourceConnectionParams = sourceConnectionParams; + } + + public int getDataTargetId() { + return dataTargetId; + } + + public void setDataTargetId(int dataTargetId) { + this.dataTargetId = dataTargetId; + } + + public int getTargetType() { + return targetType; + } + + public void setTargetType(int targetType) { + this.targetType = targetType; + } + + public String getTargetConnectionParams() { + return targetConnectionParams; + } + + public void setTargetConnectionParams(String targetConnectionParams) { + this.targetConnectionParams = targetConnectionParams; + } + + @Override + public String toString() { + return "SqoopTaskExecutionContext{" + + "dataSourceId=" + dataSourceId + + ", sourcetype=" + sourcetype + + ", sourceConnectionParams='" + sourceConnectionParams + '\'' + + ", dataTargetId=" + dataTargetId + + ", targetType=" + targetType + + ", targetConnectionParams='" + targetConnectionParams + '\'' + + '}'; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java index 264055f90b..563f5c8459 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/entity/TaskExecutionContext.java @@ -187,6 +187,11 @@ public class TaskExecutionContext implements Serializable{ */ private DependenceTaskExecutionContext dependenceTaskExecutionContext; + /** + * sqoop TaskExecutionContext + */ + private SqoopTaskExecutionContext sqoopTaskExecutionContext; + /** * procedure TaskExecutionContext */ @@ -446,6 +451,14 @@ public class TaskExecutionContext implements Serializable{ this.resources = resources; } + public SqoopTaskExecutionContext getSqoopTaskExecutionContext() { + return sqoopTaskExecutionContext; + } + + public void setSqoopTaskExecutionContext(SqoopTaskExecutionContext sqoopTaskExecutionContext) { + this.sqoopTaskExecutionContext = sqoopTaskExecutionContext; + } + @Override public String toString() { return "TaskExecutionContext{" + @@ -479,6 +492,7 @@ public class TaskExecutionContext implements Serializable{ ", sqlTaskExecutionContext=" + sqlTaskExecutionContext + ", dataxTaskExecutionContext=" + dataxTaskExecutionContext + ", dependenceTaskExecutionContext=" + dependenceTaskExecutionContext + + ", sqoopTaskExecutionContext=" + sqoopTaskExecutionContext + ", procedureTaskExecutionContext=" + procedureTaskExecutionContext + '}'; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java index b426d32502..532a0863a5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTask.java @@ -26,8 +26,8 @@ import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.utils.DependentUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; @@ -67,20 +67,26 @@ public class DependentTask extends AbstractTask { */ private ProcessService processService; + /** + * taskExecutionContext + */ + private TaskExecutionContext taskExecutionContext; + /** * constructor - * @param props props + * @param taskExecutionContext taskExecutionContext * @param logger logger */ - public DependentTask(TaskProps props, Logger logger) { - super(props, logger); + public DependentTask(TaskExecutionContext taskExecutionContext, Logger logger) { + super(taskExecutionContext, logger); + this.taskExecutionContext = taskExecutionContext; } @Override public void init(){ logger.info("dependent task initialize"); - this.dependentParameters = JSONUtils.parseObject(this.taskProps.getDependence(), + this.dependentParameters = JSONUtils.parseObject(null, DependentParameters.class); if(dependentParameters != null){ for(DependentTaskModel taskModel : dependentParameters.getDependTaskList()){ @@ -91,10 +97,10 @@ public class DependentTask extends AbstractTask { this.processService = SpringApplicationContext.getBean(ProcessService.class); - if(taskProps.getScheduleTime() != null){ - this.dependentDate = taskProps.getScheduleTime(); + if(taskExecutionContext.getScheduleTime() != null){ + this.dependentDate = taskExecutionContext.getScheduleTime(); }else{ - this.dependentDate = taskProps.getTaskStartTime(); + this.dependentDate = taskExecutionContext.getStartTime(); } } @@ -102,13 +108,13 @@ public class DependentTask extends AbstractTask { @Override public void handle() throws Exception { // set the name of the current thread - String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskProps.getTaskAppId()); + String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); try{ TaskInstance taskInstance = null; while(Stopper.isRunning()){ - taskInstance = processService.findTaskInstanceById(this.taskProps.getTaskInstId()); + taskInstance = processService.findTaskInstanceById(this.taskExecutionContext.getTaskInstanceId()); if(taskInstance == null){ exitStatusCode = -1; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java index a35f67aac1..9f54d089be 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java @@ -25,7 +25,6 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; -import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; import org.slf4j.Logger; import java.util.Map; @@ -62,7 +61,7 @@ public class SqoopTask extends AbstractYarnTask { protected String buildCommand() throws Exception { //get sqoop scripts SqoopJobGenerator generator = new SqoopJobGenerator(); - String script = generator.generateSqoopJob(sqoopParameters); + String script = generator.generateSqoopJob(sqoopParameters,taskExecutionContext); Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), taskExecutionContext.getDefinedParams(), diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java index 6c1d1fdca8..841654b699 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; /** * Source Generator Interface @@ -25,8 +26,9 @@ public interface ISourceGenerator { /** * generate the source script - * @param sqoopParameters sqoop params - * @return + * @param sqoopParameters sqoopParameters + * @param taskExecutionContext taskExecutionContext + * @return source script */ - String generate(SqoopParameters sqoopParameters); + String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java index be307af5f2..7bdaf49e83 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; /** * Target Generator Interface @@ -24,9 +25,10 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; public interface ITargetGenerator { /** - * generate the target script - * @param sqoopParameters sqoop params - * @return + * generate the target script + * @param sqoopParameters sqoopParameters + * @param taskExecutionContext taskExecutionContext + * @return target script */ - String generate(SqoopParameters sqoopParameters); + String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java index 24c76e027d..4e9cb84ff3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HdfsSourceGenerator; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HiveSourceGenerator; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.MysqlSourceGenerator; @@ -60,15 +61,15 @@ public class SqoopJobGenerator { * @param sqoopParameters * @return */ - public String generateSqoopJob(SqoopParameters sqoopParameters){ + public String generateSqoopJob(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext){ createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType()); if(sourceGenerator == null || targetGenerator == null){ return null; } return commonGenerator.generate(sqoopParameters) - + sourceGenerator.generate(sqoopParameters) - + targetGenerator.generate(sqoopParameters); + + sourceGenerator.generate(sqoopParameters,taskExecutionContext) + + targetGenerator.generate(sqoopParameters,taskExecutionContext); } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java index 47b01363e6..41e56682ae 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ public class HdfsSourceGenerator implements ISourceGenerator { private Logger logger = LoggerFactory.getLogger(getClass()); @Override - public String generate(SqoopParameters sqoopParameters) { + public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { StringBuilder result = new StringBuilder(); try{ SourceHdfsParameter sourceHdfsParameter diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java index 91363e296a..ea12616825 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java @@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ public class HiveSourceGenerator implements ISourceGenerator { private Logger logger = LoggerFactory.getLogger(getClass()); @Override - public String generate(SqoopParameters sqoopParameters) { + public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { StringBuilder sb = new StringBuilder(); try{ SourceHiveParameter sourceHiveParameter diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java index 404f10bf39..f8e3d57c7d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources; import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.QueryType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; @@ -24,10 +25,9 @@ import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParamete import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; -import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; -import org.apache.dolphinscheduler.service.process.ProcessService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,17 +41,17 @@ public class MysqlSourceGenerator implements ISourceGenerator { private Logger logger = LoggerFactory.getLogger(getClass()); @Override - public String generate(SqoopParameters sqoopParameters) { + public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { StringBuilder result = new StringBuilder(); try { SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceMysqlParameter.class); + SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext(); + if(sourceMysqlParameter != null){ - ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); - DataSource dataSource= processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); - BaseDataSource baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(), - dataSource.getConnectionParams()); + BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getSourcetype()), + sqoopTaskExecutionContext.getSourceConnectionParams()); if(baseDataSource != null){ result.append(" --connect ") .append(baseDataSource.getJdbcUrl()) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java index 411e9b4450..64ea75e742 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java @@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ public class HdfsTargetGenerator implements ITargetGenerator { private Logger logger = LoggerFactory.getLogger(getClass()); @Override - public String generate(SqoopParameters sqoopParameters) { + public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { StringBuilder result = new StringBuilder(); try{ TargetHdfsParameter targetHdfsParameter = diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java index ad59173ad0..dc5440b529 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java @@ -20,6 +20,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +33,7 @@ public class HiveTargetGenerator implements ITargetGenerator { private Logger logger = LoggerFactory.getLogger(getClass()); @Override - public String generate(SqoopParameters sqoopParameters) { + public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { StringBuilder result = new StringBuilder(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java index 0e33b176e5..aed8b9e24a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java @@ -17,12 +17,15 @@ package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets; import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -37,7 +40,7 @@ public class MysqlTargetGenerator implements ITargetGenerator { private Logger logger = LoggerFactory.getLogger(getClass()); @Override - public String generate(SqoopParameters sqoopParameters) { + public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) { StringBuilder result = new StringBuilder(); try{ @@ -45,13 +48,13 @@ public class MysqlTargetGenerator implements ITargetGenerator { TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetMysqlParameter.class); + SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext(); + if(targetMysqlParameter != null && targetMysqlParameter.getTargetDatasource() != 0){ - ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); - DataSource dataSource= processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); // get datasource - BaseDataSource baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(), - dataSource.getConnectionParams()); + BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getTargetType()), + sqoopTaskExecutionContext.getTargetConnectionParams()); if(baseDataSource != null){ result.append(" --connect ") diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java index c13a7647fe..3477f4ac67 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/dependent/DependentTaskTest.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.utils.dependent.DependentDateUtils; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -76,10 +77,9 @@ public class DependentTaskTest { TaskProps taskProps = new TaskProps(); String dependString = "{\"dependTaskList\":[{\"dependItemList\":[{\"dateValue\":\"today\",\"depTasks\":\"ALL\",\"projectId\":1,\"definitionList\":[{\"label\":\"C\",\"value\":4},{\"label\":\"B\",\"value\":3},{\"label\":\"A\",\"value\":2}],\"cycle\":\"day\",\"definitionId\":4}],\"relation\":\"AND\"}],\"relation\":\"AND\"}"; - taskProps.setTaskInstId(252612); taskProps.setDependence(dependString); taskProps.setTaskStartTime(new Date()); - DependentTask dependentTask = new DependentTask(taskProps, logger); + DependentTask dependentTask = new DependentTask(new TaskExecutionContext(), logger); dependentTask.init(); dependentTask.handle(); Assert.assertEquals(dependentTask.getExitStatusCode(), Constants.EXIT_CODE_SUCCESS );