Browse Source

chrry-pick sqoop task in dev to 1.3.4 (#4168)

* sqoop task optimization

* sqoop front-end optimization

* modify sqoop task UT

* add sqoop task UT to pom

* sqoop task source type or target type is null throw exception

* fix testSqoopTaskTest bug (#3024)

* [FIX-4034][server] fix sqoop import fail (#4036)

* fix #4043, sqoop import query fail

* fix #4043, sqoop task hard code & code style

* add license for SqoopConstants

* add private constructor for SqoopConstants

* fixed sqoop mysql pwd have special character

* fix checkstyle

* fix sqoop task log

* remove unused constants

* [FIX-4034][server] fix sqoop import fail (#4036)

* fix #4043, sqoop import query fail

* fix #4043, sqoop task hard code & code style

* add license for SqoopConstants

* add private constructor for SqoopConstants

* fixed sqoop mysql pwd have special character

* fix checkstyle

* fix sqoop task log

* remove unused constants

* fix sqoop task jdbc string contains special char (#4105)

* split sqoop import hive database and table (#4141)

* modify JSONUtils

Co-authored-by: CalvinKirs <acm_master@163.com>
Yelli 4 years ago committed by GitHub
parent
commit
7720a773cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
  2. 14
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
  3. 41
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java
  4. 30
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java
  5. 99
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java
  6. 2
      dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java
  7. 33
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
  8. 75
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java
  9. 50
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
  10. 62
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java
  11. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java
  12. 6
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java
  13. 54
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java
  14. 36
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java
  15. 47
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java
  16. 112
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java
  17. 56
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java
  18. 69
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java
  19. 88
      dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java
  20. 4
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
  21. 180
      dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
  22. 2
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue
  23. 962
      dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue
  24. 7
      dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
  25. 7
      dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
  26. 1
      pom.xml

2
dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java

@ -217,7 +217,7 @@ public enum Status {
DATA_IS_NOT_VALID(50017,"data {0} not valid", "数据[{0}]无效"),
DATA_IS_NULL(50018,"data {0} is null", "数据[{0}]不能为空"),
PROCESS_NODE_HAS_CYCLE(50019,"process node has cycle", "流程节点间存在循环依赖"),
PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node %s parameter invalid", "流程节点[%s]参数无效"),
PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node {0} parameter invalid", "流程节点[{0}]参数无效"),
PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"),
DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022,"delete process definition by id error", "删除工作流定义错误"),
SCHEDULE_CRON_STATE_ONLINE(50023,"the status of schedule {0} is already on line", "调度配置[{0}]已上线"),

14
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java

@ -215,6 +215,11 @@ public final class Constants {
*/
public static final String COLON = ":";
/**
* SPACE " "
*/
public static final String SPACE = " ";
/**
* SINGLE_SLASH /
*/
@ -225,6 +230,15 @@ public final class Constants {
*/
public static final String DOUBLE_SLASH = "//";
/**
* SINGLE_QUOTES "'"
*/
public static final String SINGLE_QUOTES = "'";
/**
* DOUBLE_QUOTES "\""
*/
public static final String DOUBLE_QUOTES = "\"";
/**
* SEMICOLON ;
*/

41
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
public enum SqoopJobType {
CUSTOM(0, "CUSTOM"),
TEMPLATE(1, "TEMPLATE");
SqoopJobType(int code, String descp){
this.code = code;
this.descp = descp;
}
@EnumValue
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

30
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java → dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java

@ -14,20 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.common.enums;
public enum QueryType {
public enum SqoopQueryType {
FORM(0, "SQOOP_QUERY_FORM"),
SQL(1, "SQOOP_QUERY_SQL");
private final Integer code;
private final String desc;
FORM,
SQL;
SqoopQueryType(Integer code, String desc) {
this.code = code;
this.desc = desc;
}
public Integer getCode() {
return code;
}
public static QueryType getEnum(int value){
for (QueryType e:QueryType.values()) {
if(e.ordinal() == value) {
return e;
}
}
//For values out of enum scope
return null;
public String getDesc() {
return desc;
}
}

99
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java

@ -16,6 +16,8 @@
*/
package org.apache.dolphinscheduler.common.task.sqoop;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.StringUtils;
@ -28,6 +30,23 @@ import java.util.List;
*/
public class SqoopParameters extends AbstractParameters {
/**
* sqoop job type:
* CUSTOM - custom sqoop job
* TEMPLATE - sqoop template job
*/
private String jobType;
/**
* customJob eq 1, use customShell
*/
private String customShell;
/**
* sqoop job name - map-reduce job name
*/
private String jobName;
/**
* model type
*/
@ -53,6 +72,16 @@ public class SqoopParameters extends AbstractParameters {
*/
private String targetParams;
/**
* hadoop custom param for sqoop job
*/
private List<Property> hadoopCustomParams;
/**
* sqoop advanced param
*/
private List<Property> sqoopAdvancedParams;
public String getModelType() {
return modelType;
}
@ -101,18 +130,74 @@ public class SqoopParameters extends AbstractParameters {
this.targetParams = targetParams;
}
public String getJobType() {
return jobType;
}
public void setJobType(String jobType) {
this.jobType = jobType;
}
public String getJobName() {
return jobName;
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public String getCustomShell() {
return customShell;
}
public void setCustomShell(String customShell) {
this.customShell = customShell;
}
public List<Property> getHadoopCustomParams() {
return hadoopCustomParams;
}
public void setHadoopCustomParams(List<Property> hadoopCustomParams) {
this.hadoopCustomParams = hadoopCustomParams;
}
public List<Property> getSqoopAdvancedParams() {
return sqoopAdvancedParams;
}
public void setSqoopAdvancedParams(List<Property> sqoopAdvancedParams) {
this.sqoopAdvancedParams = sqoopAdvancedParams;
}
@Override
public boolean checkParameters() {
return StringUtils.isNotEmpty(modelType)&&
concurrency != 0 &&
StringUtils.isNotEmpty(sourceType)&&
StringUtils.isNotEmpty(targetType)&&
StringUtils.isNotEmpty(sourceParams)&&
StringUtils.isNotEmpty(targetParams);
boolean sqoopParamsCheck = false;
if (StringUtils.isEmpty(jobType)) {
return sqoopParamsCheck;
}
if (SqoopJobType.TEMPLATE.getDescp().equals(jobType)) {
sqoopParamsCheck = StringUtils.isEmpty(customShell) &&
StringUtils.isNotEmpty(modelType) &&
StringUtils.isNotEmpty(jobName) &&
concurrency != 0 &&
StringUtils.isNotEmpty(sourceType) &&
StringUtils.isNotEmpty(targetType) &&
StringUtils.isNotEmpty(sourceParams) &&
StringUtils.isNotEmpty(targetParams);
} else if (SqoopJobType.CUSTOM.getDescp().equals(jobType)) {
sqoopParamsCheck = StringUtils.isNotEmpty(customShell) &&
StringUtils.isEmpty(jobName);
}
return sqoopParamsCheck;
}
@Override
public List<ResourceInfo> getResourceFilesList() {
return new ArrayList<>();
return new ArrayList<>();
}
}

2
dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java

@ -106,7 +106,7 @@ public class TargetMysqlParameter {
this.preQuery = preQuery;
}
public boolean isUpdate() {
public boolean getIsUpdate() {
return isUpdate;
}

33
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java

@ -21,6 +21,7 @@ import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
import org.apache.dolphinscheduler.common.model.TaskNode;
@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@ -275,29 +277,32 @@ public class TaskPriorityQueueConsumer extends Thread{
/**
* set datax task relation
* set sqoop task relation
* @param sqoopTaskExecutionContext sqoopTaskExecutionContext
* @param taskNode taskNode
*/
private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) {
SqoopParameters sqoopParameters = JSONObject.parseObject(taskNode.getParams(), SqoopParameters.class);
SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
// sqoop job type is template set task relation
if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
if (dataSource != null){
sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
}
if (dataSource != null){
sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
}
if (dataTarget != null){
sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
if (dataTarget != null){
sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
}
}
}

75
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop;
public final class SqoopConstants {
private SqoopConstants() {
}
//sqoop general param
public static final String SQOOP = "sqoop";
public static final String SQOOP_MR_JOB_NAME = "mapred.job.name";
public static final String SQOOP_PARALLELISM = "-m";
public static final String FIELDS_TERMINATED_BY = "--fields-terminated-by";
public static final String LINES_TERMINATED_BY = "--lines-terminated-by";
public static final String FIELD_NULL_PLACEHOLDER = "--null-non-string 'NULL' --null-string 'NULL'";
//sqoop db
public static final String DB_CONNECT = "--connect";
public static final String DB_USERNAME = "--username";
public static final String DB_PWD = "--password";
public static final String TABLE = "--table";
public static final String COLUMNS = "--columns";
public static final String QUERY_WHERE = "where";
public static final String QUERY = "--query";
public static final String QUERY_CONDITION = "AND \\$CONDITIONS";
public static final String QUERY_WITHOUT_CONDITION = "WHERE \\$CONDITIONS";
public static final String MAP_COLUMN_HIVE = "--map-column-hive";
public static final String MAP_COLUMN_JAVA = "--map-column-java";
//sqoop hive source
public static final String HCATALOG_DATABASE = "--hcatalog-database";
public static final String HCATALOG_TABLE = "--hcatalog-table";
public static final String HCATALOG_PARTITION_KEYS = "--hcatalog-partition-keys";
public static final String HCATALOG_PARTITION_VALUES = "--hcatalog-partition-values";
//sqoop hdfs
public static final String HDFS_EXPORT_DIR = "--export-dir";
public static final String TARGET_DIR = "--target-dir";
public static final String COMPRESSION_CODEC = "--compression-codec";
//sqoop hive
public static final String HIVE_IMPORT = "--hive-import";
public static final String HIVE_DATABASE = "--hive-database";
public static final String HIVE_TABLE = "--hive-table";
public static final String CREATE_HIVE_TABLE = "--create-hive-table";
public static final String HIVE_DROP_IMPORT_DELIMS = "--hive-drop-import-delims";
public static final String HIVE_OVERWRITE = "--hive-overwrite";
public static final String DELETE_TARGET_DIR = "--delete-target-dir";
public static final String HIVE_DELIMS_REPLACEMENT = "--hive-delims-replacement";
public static final String HIVE_PARTITION_KEY = "--hive-partition-key";
public static final String HIVE_PARTITION_VALUE = "--hive-partition-value";
//sqoop update model
public static final String UPDATE_KEY = "--update-key";
public static final String UPDATE_MODE = "--update-mode";
}

50
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java

@ -14,63 +14,73 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop;
import com.alibaba.fastjson.JSON;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
import org.slf4j.Logger;
import java.util.Map;
import org.slf4j.Logger;
/**
* sqoop task extends the shell task
*/
public class SqoopTask extends AbstractYarnTask {
/**
* sqoop task params
*/
private SqoopParameters sqoopParameters;
/**
* taskExecutionContext
*/
private TaskExecutionContext taskExecutionContext;
private final TaskExecutionContext sqoopTaskExecutionContext;
public SqoopTask(TaskExecutionContext taskExecutionContext, Logger logger){
super(taskExecutionContext,logger);
this.taskExecutionContext = taskExecutionContext;
public SqoopTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.sqoopTaskExecutionContext = taskExecutionContext;
}
@Override
public void init() throws Exception {
logger.info("sqoop task params {}", taskExecutionContext.getTaskParams());
public void init() {
logger.info("sqoop task params {}", sqoopTaskExecutionContext.getTaskParams());
sqoopParameters =
JSON.parseObject(taskExecutionContext.getTaskParams(),SqoopParameters.class);
if (!sqoopParameters.checkParameters()) {
throw new RuntimeException("sqoop task params is not valid");
JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(), SqoopParameters.class);
//check sqoop task params
if (null == sqoopParameters) {
throw new IllegalArgumentException("Sqoop Task params is null");
}
if (!sqoopParameters.checkParameters()) {
throw new IllegalArgumentException("Sqoop Task params check fail");
}
}
@Override
protected String buildCommand() throws Exception {
protected String buildCommand() {
//get sqoop scripts
SqoopJobGenerator generator = new SqoopJobGenerator();
String script = generator.generateSqoopJob(sqoopParameters,taskExecutionContext);
String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
sqoopParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()),
sqoopTaskExecutionContext.getDefinedParams(),
sqoopParameters.getLocalParametersMap(),
CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
sqoopTaskExecutionContext.getScheduleTime());
if(paramsMap != null){
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
if (paramsMap != null) {
String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
logger.info("sqoop script: {}", resultScripts);
return resultScripts;
}

62
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java

@ -14,9 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,21 +33,53 @@ import org.slf4j.LoggerFactory;
*/
public class CommonGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger logger = LoggerFactory.getLogger(CommonGenerator.class);
public String generate(SqoopParameters sqoopParameters) {
StringBuilder result = new StringBuilder();
try{
result.append("sqoop ")
.append(sqoopParameters.getModelType());
if(sqoopParameters.getConcurrency() >0){
result.append(" -m ")
.append(sqoopParameters.getConcurrency());
StringBuilder commonSb = new StringBuilder();
try {
//sqoop task model
commonSb.append(SqoopConstants.SQOOP)
.append(Constants.SPACE)
.append(sqoopParameters.getModelType());
//sqoop map-reduce job name
commonSb.append(Constants.SPACE).append(Constants.D).append(Constants.SPACE)
.append(String.format("%s%s%s", SqoopConstants.SQOOP_MR_JOB_NAME,
Constants.EQUAL_SIGN, sqoopParameters.getJobName()));
//hadoop custom param
List<Property> hadoopCustomParams = sqoopParameters.getHadoopCustomParams();
if (CollectionUtils.isNotEmpty(hadoopCustomParams)) {
for (Property hadoopCustomParam : hadoopCustomParams) {
String hadoopCustomParamStr = String.format("%s%s%s", hadoopCustomParam.getProp(),
Constants.EQUAL_SIGN, hadoopCustomParam.getValue());
commonSb.append(Constants.SPACE).append(Constants.D)
.append(Constants.SPACE).append(hadoopCustomParamStr);
}
}
//sqoop custom params
List<Property> sqoopAdvancedParams = sqoopParameters.getSqoopAdvancedParams();
if (CollectionUtils.isNotEmpty(sqoopAdvancedParams)) {
for (Property sqoopAdvancedParam : sqoopAdvancedParams) {
commonSb.append(Constants.SPACE).append(sqoopAdvancedParam.getProp())
.append(Constants.SPACE).append(sqoopAdvancedParam.getValue());
}
}
//sqoop parallelism
if (sqoopParameters.getConcurrency() > 0) {
commonSb.append(Constants.SPACE).append(SqoopConstants.SQOOP_PARALLELISM)
.append(Constants.SPACE).append(sqoopParameters.getConcurrency());
}
}catch (Exception e){
logger.error(e.getMessage());
} catch (Exception e) {
logger.error(String.format("Sqoop task general param build failed: [%s]", e.getMessage()));
}
return result.toString();
return commonSb.toString();
}
}

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
@ -26,9 +27,10 @@ public interface ISourceGenerator {
/**
* generate the source script
* @param sqoopParameters sqoopParameters
*
* @param sqoopParameters sqoopParameters
* @param taskExecutionContext taskExecutionContext
* @return source script
*/
String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext);
String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext);
}

6
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java

@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
@ -26,9 +27,10 @@ public interface ITargetGenerator {
/**
* generate the target script
* @param sqoopParameters sqoopParameters
*
* @param sqoopParameters sqoopParameters
* @param taskExecutionContext taskExecutionContext
* @return target script
*/
String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext);
String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext);
}

54
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java

@ -14,8 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HdfsSourceGenerator;
@ -45,40 +47,51 @@ public class SqoopJobGenerator {
/**
* common script generator
*/
private CommonGenerator commonGenerator;
private final CommonGenerator commonGenerator;
public SqoopJobGenerator(){
public SqoopJobGenerator() {
commonGenerator = new CommonGenerator();
}
private void createSqoopJobGenerator(String sourceType,String targetType){
private void createSqoopJobGenerator(String sourceType, String targetType) {
sourceGenerator = createSourceGenerator(sourceType);
targetGenerator = createTargetGenerator(targetType);
}
/**
* get the final sqoop scripts
* @param sqoopParameters
* @return
*
* @param sqoopParameters sqoop params
* @return sqoop scripts
*/
public String generateSqoopJob(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext){
createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType());
if(sourceGenerator == null || targetGenerator == null){
return null;
public String generateSqoopJob(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
String sqoopScripts = "";
if (SqoopJobType.TEMPLATE.getDescp().equals(sqoopParameters.getJobType())) {
createSqoopJobGenerator(sqoopParameters.getSourceType(), sqoopParameters.getTargetType());
if (sourceGenerator == null || targetGenerator == null) {
throw new RuntimeException("sqoop task source type or target type is null");
}
sqoopScripts = String.format("%s%s%s", commonGenerator.generate(sqoopParameters),
sourceGenerator.generate(sqoopParameters, taskExecutionContext),
targetGenerator.generate(sqoopParameters, taskExecutionContext));
} else if (SqoopJobType.CUSTOM.getDescp().equals(sqoopParameters.getJobType())) {
sqoopScripts = sqoopParameters.getCustomShell().replaceAll("\\r\\n", "\n");
}
return commonGenerator.generate(sqoopParameters)
+ sourceGenerator.generate(sqoopParameters,taskExecutionContext)
+ targetGenerator.generate(sqoopParameters,taskExecutionContext);
return sqoopScripts;
}
/**
* get the source generator
* @param sourceType
* @return
*
* @param sourceType sqoop source type
* @return sqoop source generator
*/
private ISourceGenerator createSourceGenerator(String sourceType){
switch (sourceType){
private ISourceGenerator createSourceGenerator(String sourceType) {
switch (sourceType) {
case MYSQL:
return new MysqlSourceGenerator();
case HIVE:
@ -92,11 +105,12 @@ public class SqoopJobGenerator {
/**
* get the target generator
* @param targetType
* @return
*
* @param targetType sqoop target type
* @return sqoop target generator
*/
private ITargetGenerator createTargetGenerator(String targetType){
switch (targetType){
private ITargetGenerator createTargetGenerator(String targetType) {
switch (targetType) {
case MYSQL:
return new MysqlTargetGenerator();
case HIVE:

36
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java

@ -14,14 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,28 +34,30 @@ import org.slf4j.LoggerFactory;
*/
public class HdfsSourceGenerator implements ISourceGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger logger = LoggerFactory.getLogger(HdfsSourceGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
StringBuilder result = new StringBuilder();
try{
public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
StringBuilder hdfsSourceSb = new StringBuilder();
try {
SourceHdfsParameter sourceHdfsParameter
= JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHdfsParameter.class);
if(sourceHdfsParameter != null){
if(StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())){
result.append(" --export-dir ")
.append(sourceHdfsParameter.getExportDir());
}else{
throw new Exception("--export-dir is null");
= JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceHdfsParameter.class);
if (null != sourceHdfsParameter) {
if (StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())) {
hdfsSourceSb.append(Constants.SPACE).append(SqoopConstants.HDFS_EXPORT_DIR)
.append(Constants.SPACE).append(sourceHdfsParameter.getExportDir());
} else {
throw new IllegalArgumentException("Sqoop hdfs export dir is null");
}
}
}catch (Exception e){
logger.error("get hdfs source failed",e);
} catch (Exception e) {
logger.error(String.format("Sqoop hdfs source parmas build failed: [%s]", e.getMessage()));
}
return result.toString();
return hdfsSourceSb.toString();
}
}

47
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java

@ -14,14 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,33 +34,40 @@ import org.slf4j.LoggerFactory;
*/
public class HiveSourceGenerator implements ISourceGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger logger = LoggerFactory.getLogger(HiveSourceGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
StringBuilder sb = new StringBuilder();
try{
public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
StringBuilder hiveSourceSb = new StringBuilder();
try {
SourceHiveParameter sourceHiveParameter
= JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHiveParameter.class);
if(sourceHiveParameter != null){
if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())){
sb.append(" --hcatalog-database ").append(sourceHiveParameter.getHiveDatabase());
= JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceHiveParameter.class);
if (null != sourceHiveParameter) {
if (StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())) {
hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_DATABASE)
.append(Constants.SPACE).append(sourceHiveParameter.getHiveDatabase());
}
if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())){
sb.append(" --hcatalog-table ").append(sourceHiveParameter.getHiveTable());
if (StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())) {
hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_TABLE)
.append(Constants.SPACE).append(sourceHiveParameter.getHiveTable());
}
if(StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey())&&
StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())){
sb.append(" --hcatalog-partition-keys ").append(sourceHiveParameter.getHivePartitionKey())
.append(" --hcatalog-partition-values ").append(sourceHiveParameter.getHivePartitionValue());
if (StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey())
&& StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())) {
hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_PARTITION_KEYS)
.append(Constants.SPACE).append(sourceHiveParameter.getHivePartitionKey())
.append(Constants.SPACE).append(SqoopConstants.HCATALOG_PARTITION_VALUES)
.append(Constants.SPACE).append(sourceHiveParameter.getHivePartitionValue());
}
}
}catch (Exception e){
logger.error(e.getMessage());
} catch (Exception e) {
logger.error(String.format("Sqoop hive source params build failed: [%s]", e.getMessage()));
}
return sb.toString();
return hiveSourceSb.toString();
}
}

112
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java

@ -14,106 +14,118 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.QueryType;
import org.apache.dolphinscheduler.common.enums.SqoopQueryType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* mysql source generator
*/
public class MysqlSourceGenerator implements ISourceGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger logger = LoggerFactory.getLogger(MysqlSourceGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
StringBuilder result = new StringBuilder();
try {
SourceMysqlParameter sourceMysqlParameter
= JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceMysqlParameter.class);
public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
StringBuilder mysqlSourceSb = new StringBuilder();
try {
SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext();
if(sourceMysqlParameter != null){
if (null != sourceMysqlParameter) {
BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getSourcetype()),
sqoopTaskExecutionContext.getSourceConnectionParams());
if(baseDataSource != null){
result.append(" --connect ")
.append(baseDataSource.getJdbcUrl())
.append(" --username ")
.append(baseDataSource.getUser())
.append(" --password ")
.append(baseDataSource.getPassword());
if(sourceMysqlParameter.getSrcQueryType() == QueryType.FORM.ordinal()){
if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())){
result.append(" --table ").append(sourceMysqlParameter.getSrcTable());
sqoopTaskExecutionContext.getSourceConnectionParams());
if (null != baseDataSource) {
mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.DB_CONNECT)
.append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getJdbcUrl()).append(Constants.DOUBLE_QUOTES)
.append(Constants.SPACE).append(SqoopConstants.DB_USERNAME)
.append(Constants.SPACE).append(baseDataSource.getUser())
.append(Constants.SPACE).append(SqoopConstants.DB_PWD)
.append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getPassword()).append(Constants.DOUBLE_QUOTES);
//sqoop table & sql query
if (sourceMysqlParameter.getSrcQueryType() == SqoopQueryType.FORM.getCode()) {
if (StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())) {
mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.TABLE)
.append(Constants.SPACE).append(sourceMysqlParameter.getSrcTable());
}
if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())){
result.append(" --columns ").append(sourceMysqlParameter.getSrcColumns());
if (StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())) {
mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.COLUMNS)
.append(Constants.SPACE).append(sourceMysqlParameter.getSrcColumns());
}
} else if (sourceMysqlParameter.getSrcQueryType() == SqoopQueryType.SQL.getCode()
&& StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())) {
}else if(sourceMysqlParameter.getSrcQueryType() == QueryType.SQL.ordinal()
&& StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())){
String srcQuery = sourceMysqlParameter.getSrcQuerySql();
if(srcQuery.toLowerCase().contains("where")){
srcQuery += " AND "+"$CONDITIONS";
}else{
srcQuery += " WHERE $CONDITIONS";
}
result.append(" --query \'").append(srcQuery).append("\'");
mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY)
.append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(srcQuery);
if (srcQuery.toLowerCase().contains(SqoopConstants.QUERY_WHERE)) {
mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY_CONDITION).append(Constants.DOUBLE_QUOTES);
} else {
mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY_WITHOUT_CONDITION).append(Constants.DOUBLE_QUOTES);
}
}
List<Property> mapColumnHive = sourceMysqlParameter.getMapColumnHive();
//sqoop hive map column
List<Property> mapColumnHive = sourceMysqlParameter.getMapColumnHive();
if(mapColumnHive != null && !mapColumnHive.isEmpty()){
if (null != mapColumnHive && !mapColumnHive.isEmpty()) {
StringBuilder columnMap = new StringBuilder();
for(Property item:mapColumnHive){
columnMap.append(item.getProp()).append("=").append(item.getValue()).append(",");
for (Property item : mapColumnHive) {
columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA);
}
if(StringUtils.isNotEmpty(columnMap.toString())){
result.append(" --map-column-hive ")
.append(columnMap.substring(0,columnMap.length() - 1));
if (StringUtils.isNotEmpty(columnMap.toString())) {
mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_HIVE)
.append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1));
}
}
List<Property> mapColumnJava = sourceMysqlParameter.getMapColumnJava();
//sqoop map column java
List<Property> mapColumnJava = sourceMysqlParameter.getMapColumnJava();
if(mapColumnJava != null && !mapColumnJava.isEmpty()){
if (null != mapColumnJava && !mapColumnJava.isEmpty()) {
StringBuilder columnMap = new StringBuilder();
for(Property item:mapColumnJava){
columnMap.append(item.getProp()).append("=").append(item.getValue()).append(",");
for (Property item : mapColumnJava) {
columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA);
}
if(StringUtils.isNotEmpty(columnMap.toString())){
result.append(" --map-column-java ")
.append(columnMap.substring(0,columnMap.length() - 1));
if (StringUtils.isNotEmpty(columnMap.toString())) {
mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_JAVA)
.append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1));
}
}
}
}
}catch (Exception e){
logger.error(e.getMessage());
} catch (Exception e) {
logger.error(String.format("Sqoop task mysql source params build failed: [%s]", e.getMessage()));
}
return result.toString();
return mysqlSourceSb.toString();
}
}

56
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java

@ -14,14 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,47 +34,53 @@ import org.slf4j.LoggerFactory;
*/
public class HdfsTargetGenerator implements ITargetGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger logger = LoggerFactory.getLogger(HdfsTargetGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
StringBuilder result = new StringBuilder();
try{
public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
StringBuilder hdfsTargetSb = new StringBuilder();
try {
TargetHdfsParameter targetHdfsParameter =
JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHdfsParameter.class);
JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetHdfsParameter.class);
if(targetHdfsParameter != null){
if (null != targetHdfsParameter) {
if(StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())){
result.append(" --target-dir ").append(targetHdfsParameter.getTargetPath());
if (StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())) {
hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.TARGET_DIR)
.append(Constants.SPACE).append(targetHdfsParameter.getTargetPath());
}
if(StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())){
result.append(" --compression-codec ").append(targetHdfsParameter.getCompressionCodec());
if (StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())) {
hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.COMPRESSION_CODEC)
.append(Constants.SPACE).append(targetHdfsParameter.getCompressionCodec());
}
if(StringUtils.isNotEmpty(targetHdfsParameter.getFileType())){
result.append(" ").append(targetHdfsParameter.getFileType());
if (StringUtils.isNotEmpty(targetHdfsParameter.getFileType())) {
hdfsTargetSb.append(Constants.SPACE).append(targetHdfsParameter.getFileType());
}
if(targetHdfsParameter.isDeleteTargetDir()){
result.append(" --delete-target-dir");
if (targetHdfsParameter.isDeleteTargetDir()) {
hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.DELETE_TARGET_DIR);
}
if(StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())){
result.append(" --fields-terminated-by '").append(targetHdfsParameter.getFieldsTerminated()).append("'");
if (StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())) {
hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELDS_TERMINATED_BY)
.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetHdfsParameter.getFieldsTerminated()).append(Constants.SINGLE_QUOTES);
}
if(StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())){
result.append(" --lines-terminated-by '").append(targetHdfsParameter.getLinesTerminated()).append("'");
if (StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())) {
hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.LINES_TERMINATED_BY)
.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetHdfsParameter.getLinesTerminated()).append(Constants.SINGLE_QUOTES);
}
result.append(" --null-non-string 'NULL' --null-string 'NULL'");
hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELD_NULL_PLACEHOLDER);
}
}catch(Exception e){
logger.error(e.getMessage());
} catch (Exception e) {
logger.error(String.format("Sqoop hdfs target params build failed: [%s]", e.getMessage()));
}
return result.toString();
return hdfsTargetSb.toString();
}
}

69
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java

@ -14,14 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,57 +34,58 @@ import org.slf4j.LoggerFactory;
*/
public class HiveTargetGenerator implements ITargetGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger logger = LoggerFactory.getLogger(HiveTargetGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
StringBuilder result = new StringBuilder();
StringBuilder hiveTargetSb = new StringBuilder();
try{
try {
TargetHiveParameter targetHiveParameter =
JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHiveParameter.class);
if(targetHiveParameter != null){
result.append(" --hive-import ");
JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetHiveParameter.class);
if (null != targetHiveParameter) {
hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_IMPORT);
if(StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase())&&
StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())){
result.append(" --hive-table ")
.append(targetHiveParameter.getHiveDatabase())
.append(".")
.append(targetHiveParameter.getHiveTable());
if (StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase())
&& StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())) {
hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DATABASE)
.append(Constants.SPACE).append(targetHiveParameter.getHiveDatabase())
.append(Constants.SPACE).append(SqoopConstants.HIVE_TABLE)
.append(Constants.SPACE).append(targetHiveParameter.getHiveTable());
}
if(targetHiveParameter.isCreateHiveTable()){
result.append(" --create-hive-table");
if (targetHiveParameter.isCreateHiveTable()) {
hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.CREATE_HIVE_TABLE);
}
if(targetHiveParameter.isDropDelimiter()){
result.append(" --hive-drop-import-delims");
if (targetHiveParameter.isDropDelimiter()) {
hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DROP_IMPORT_DELIMS);
}
if(targetHiveParameter.isHiveOverWrite()){
result.append(" --hive-overwrite -delete-target-dir");
if (targetHiveParameter.isHiveOverWrite()) {
hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_OVERWRITE)
.append(Constants.SPACE).append(SqoopConstants.DELETE_TARGET_DIR);
}
if(StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())){
result.append(" --hive-delims-replacement ").append(targetHiveParameter.getReplaceDelimiter());
if (StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())) {
hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DELIMS_REPLACEMENT)
.append(Constants.SPACE).append(targetHiveParameter.getReplaceDelimiter());
}
if(StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey())&&
StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())){
result.append(" --hive-partition-key ")
.append(targetHiveParameter.getHivePartitionKey())
.append(" --hive-partition-value ")
.append(targetHiveParameter.getHivePartitionValue());
if (StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey())
&& StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())) {
hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_PARTITION_KEY)
.append(Constants.SPACE).append(targetHiveParameter.getHivePartitionKey())
.append(Constants.SPACE).append(SqoopConstants.HIVE_PARTITION_VALUE)
.append(Constants.SPACE).append(targetHiveParameter.getHivePartitionValue());
}
}
}catch(Exception e){
logger.error(e.getMessage());
} catch (Exception e) {
logger.error(String.format("Sqoop hive target params build failed: [%s]", e.getMessage()));
}
return result.toString();
return hiveTargetSb.toString();
}
}

88
dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java

@ -14,21 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,59 +38,74 @@ import org.slf4j.LoggerFactory;
*/
public class MysqlTargetGenerator implements ITargetGenerator {
private Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger logger = LoggerFactory.getLogger(MysqlTargetGenerator.class);
@Override
public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
StringBuilder result = new StringBuilder();
try{
StringBuilder mysqlTargetSb = new StringBuilder();
try {
TargetMysqlParameter targetMysqlParameter =
JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetMysqlParameter.class);
JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext();
if(targetMysqlParameter != null && targetMysqlParameter.getTargetDatasource() != 0){
if (null != targetMysqlParameter && targetMysqlParameter.getTargetDatasource() != 0) {
// get datasource
BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getTargetType()),
sqoopTaskExecutionContext.getTargetConnectionParams());
if(baseDataSource != null){
result.append(" --connect ")
.append(baseDataSource.getJdbcUrl())
.append(" --username ")
.append(baseDataSource.getUser())
.append(" --password ")
.append(baseDataSource.getPassword())
.append(" --table ")
.append(targetMysqlParameter.getTargetTable());
if(StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())){
result.append(" --columns ").append(targetMysqlParameter.getTargetColumns());
sqoopTaskExecutionContext.getTargetConnectionParams());
if (null != baseDataSource) {
mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.DB_CONNECT)
.append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getJdbcUrl()).append(Constants.DOUBLE_QUOTES)
.append(Constants.SPACE).append(SqoopConstants.DB_USERNAME)
.append(Constants.SPACE).append(baseDataSource.getUser())
.append(Constants.SPACE).append(SqoopConstants.DB_PWD)
.append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getPassword()).append(Constants.DOUBLE_QUOTES)
.append(Constants.SPACE).append(SqoopConstants.TABLE)
.append(Constants.SPACE).append(targetMysqlParameter.getTargetTable());
if (StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())) {
mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.COLUMNS)
.append(Constants.SPACE).append(targetMysqlParameter.getTargetColumns());
}
if(StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())){
result.append(" --fields-terminated-by '").append(targetMysqlParameter.getFieldsTerminated()).append("'");
if (StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())) {
mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELDS_TERMINATED_BY);
if (targetMysqlParameter.getFieldsTerminated().contains("'")) {
mysqlTargetSb.append(Constants.SPACE).append(targetMysqlParameter.getFieldsTerminated());
} else {
mysqlTargetSb.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetMysqlParameter.getFieldsTerminated()).append(Constants.SINGLE_QUOTES);
}
}
if(StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())){
result.append(" --lines-terminated-by '").append(targetMysqlParameter.getLinesTerminated()).append("'");
if (StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())) {
mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.LINES_TERMINATED_BY);
if (targetMysqlParameter.getLinesTerminated().contains(Constants.SINGLE_QUOTES)) {
mysqlTargetSb.append(Constants.SPACE).append(targetMysqlParameter.getLinesTerminated());
} else {
mysqlTargetSb.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetMysqlParameter.getLinesTerminated()).append(Constants.SINGLE_QUOTES);
}
}
if(targetMysqlParameter.isUpdate()
&& StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey())
&& StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())){
result.append(" --update-key ").append(targetMysqlParameter.getTargetUpdateKey())
.append(" --update-mode ").append(targetMysqlParameter.getTargetUpdateMode());
if (targetMysqlParameter.getIsUpdate()
&& StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey())
&& StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())) {
mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.UPDATE_KEY)
.append(Constants.SPACE).append(targetMysqlParameter.getTargetUpdateKey())
.append(Constants.SPACE).append(SqoopConstants.UPDATE_MODE)
.append(Constants.SPACE).append(targetMysqlParameter.getTargetUpdateMode());
}
}
}
}catch (Exception e){
logger.error(e.getMessage());
} catch (Exception e) {
logger.error(String.format("Sqoop mysql target params build failed: [%s]", e.getMessage()));
}
return result.toString();
return mysqlTargetSb.toString();
}
}

4
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java

@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Before;
@ -233,8 +232,7 @@ public class TaskPriorityQueueConsumerTest {
dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
dataSource.setCreateTime(new Date());
dataSource.setUpdateTime(new Date());
Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource);
Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
Thread.sleep(10000);
}

180
dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java

@ -14,17 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.sqoop;
import com.alibaba.fastjson.JSON;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -35,8 +37,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.util.*;
/**
* sqoop task test
*/
@ -45,71 +45,137 @@ public class SqoopTaskTest {
private static final Logger logger = LoggerFactory.getLogger(SqoopTaskTest.class);
private ProcessService processService;
private ApplicationContext applicationContext;
private SqoopTask sqoopTask;
@Before
public void before() throws Exception{
processService = Mockito.mock(ProcessService.class);
Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
applicationContext = Mockito.mock(ApplicationContext.class);
public void before() {
ProcessService processService = Mockito.mock(ProcessService.class);
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskProps props = new TaskProps();
props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date());
props.setTaskTimeout(0);
props.setTaskParams("{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}");
sqoopTask = new SqoopTask(new TaskExecutionContext(),logger);
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskAppId(String.valueOf(System.currentTimeMillis()));
taskExecutionContext.setTenantCode("1");
taskExecutionContext.setEnvFile(".dolphinscheduler_env.sh");
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setTaskTimeout(0);
taskExecutionContext.setTaskParams("{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,"
+ "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\","
+ "\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\","
+ "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],"
+ "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\""
+ ",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,"
+ "\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\","
+ "\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}");
sqoopTask = new SqoopTask(taskExecutionContext, logger);
//test sqoop task init method
sqoopTask.init();
}
/**
* test SqoopJobGenerator
*/
@Test
public void testGenerator(){
String data1 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters sqoopParameters1 = JSON.parseObject(data1,SqoopParameters.class);
public void testGenerator() {
TaskExecutionContext mysqlTaskExecutionContext = getMysqlTaskExecutionContext();
//sqoop TEMPLATE job
//import mysql to HDFS with hadoop
String mysqlToHdfs =
"{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}],"
+ "\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"jobType\":\"TEMPLATE\",\"concurrency\":1,"
+ "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\","
+ "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\","
+ "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\","
+ "\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\","
+ "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters mysqlToHdfsParams = JSONUtils.parseObject(mysqlToHdfs, SqoopParameters.class);
SqoopJobGenerator generator = new SqoopJobGenerator();
String script = generator.generateSqoopJob(sqoopParameters1,new TaskExecutionContext());
String expected = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'";
Assert.assertEquals(expected, script);
String data2 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters sqoopParameters2 = JSON.parseObject(data2,SqoopParameters.class);
String script2 = generator.generateSqoopJob(sqoopParameters2,new TaskExecutionContext());
String expected2 = "sqoop export -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert";
Assert.assertEquals(expected2, script2);
String data3 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters sqoopParameters3 = JSON.parseObject(data3,SqoopParameters.class);
String script3 = generator.generateSqoopJob(sqoopParameters3,new TaskExecutionContext());
String expected3 = "sqoop export -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'";
Assert.assertEquals(expected3, script3);
String data4 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
SqoopParameters sqoopParameters4 = JSON.parseObject(data4,SqoopParameters.class);
String script4 = generator.generateSqoopJob(sqoopParameters4,new TaskExecutionContext());
String expected4 = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
Assert.assertEquals(expected4, script4);
String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams, mysqlTaskExecutionContext);
String mysqlToHdfsExpected =
"sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test\" "
+ "--username kylo --password \"123456\" --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile "
+ "--delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'";
Assert.assertEquals(mysqlToHdfsExpected, mysqlToHdfsScript);
//export hdfs to mysql using update mode
String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\","
+ "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\","
+ "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\","
+ "\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\","
+ "\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters hdfsToMysqlParams = JSONUtils.parseObject(hdfsToMysql, SqoopParameters.class);
String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams, mysqlTaskExecutionContext);
String hdfsToMysqlScriptExpected =
"sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect \"jdbc:mysql://192.168.0.111:3306/test\" "
+ "--username kylo --password \"123456\" --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' "
+ "--lines-terminated-by '\\n' --update-key id --update-mode allowinsert";
Assert.assertEquals(hdfsToMysqlScriptExpected, hdfsToMysqlScript);
//export hive to mysql
String hiveToMysql =
"{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\","
+ "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\","
+ "\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\","
+ "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\","
+ "\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\","
+ "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters hiveToMysqlParams = JSONUtils.parseObject(hiveToMysql, SqoopParameters.class);
String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams, mysqlTaskExecutionContext);
String hiveToMysqlExpected =
"sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date "
+ "--hcatalog-partition-values 2020-02-17 --connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" --table person_3 "
+ "--fields-terminated-by '@' --lines-terminated-by '\\n'";
Assert.assertEquals(hiveToMysqlExpected, hiveToMysqlScript);
//import mysql to hive
String mysqlToHive =
"{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\","
+ "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\","
+ "\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],"
+ "\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\","
+ "\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,"
+ "\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, SqoopParameters.class);
String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext);
String mysqlToHiveExpected =
"sqoop import -D mapred.job.name=sqoop_import -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" "
+ "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-database stg --hive-table person_internal_2 "
+ "--create-hive-table --hive-overwrite --delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript);
//sqoop CUSTOM job
String sqoopCustomString = "{\"jobType\":\"CUSTOM\",\"localParams\":[],\"customShell\":\"sqoop import\"}";
SqoopParameters sqoopCustomParams = JSONUtils.parseObject(sqoopCustomString, SqoopParameters.class);
String sqoopCustomScript = generator.generateSqoopJob(sqoopCustomParams, new TaskExecutionContext());
String sqoopCustomExpected = "sqoop import";
Assert.assertEquals(sqoopCustomExpected, sqoopCustomScript);
}
private DataSource getDataSource() {
DataSource dataSource = new DataSource();
dataSource.setType(DbType.MYSQL);
dataSource.setConnectionParams(
"{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}");
dataSource.setUserId(1);
return dataSource;
/**
* get taskExecutionContext include mysql
*
* @return TaskExecutionContext
*/
private TaskExecutionContext getMysqlTaskExecutionContext() {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext();
String mysqlSourceConnectionParams =
"{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}";
String mysqlTargetConnectionParams =
"{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}";
sqoopTaskExecutionContext.setDataSourceId(2);
sqoopTaskExecutionContext.setDataTargetId(2);
sqoopTaskExecutionContext.setSourcetype(0);
sqoopTaskExecutionContext.setTargetConnectionParams(mysqlTargetConnectionParams);
sqoopTaskExecutionContext.setSourceConnectionParams(mysqlSourceConnectionParams);
sqoopTaskExecutionContext.setTargetType(0);
taskExecutionContext.setSqoopTaskExecutionContext(sqoopTaskExecutionContext);
return taskExecutionContext;
}
@Test
@ -121,7 +187,7 @@ public class SqoopTaskTest {
* Method: init
*/
@Test
public void testInit(){
public void testInit() {
try {
sqoopTask.init();
} catch (Exception e) {

2
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue

@ -131,7 +131,7 @@
},
created () {
let supportType = this.supportType || []
this.typeList = _.cloneDeep(this.store.state.dag.dsTypeListS)
this.typeList = this.data.typeList || _.cloneDeep(this.store.state.dag.dsTypeListS)
// Have a specified data source
if (supportType.length) {
let is = (type) => {

962
dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue

File diff suppressed because it is too large Load Diff

7
dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js

@ -547,6 +547,9 @@ export default {
'Whether directory': 'Whether directory',
Yes: 'Yes',
No: 'No',
'Hadoop Custom Params': 'Hadoop Params',
'Sqoop Advanced Parameters': 'Sqoop Params',
'Sqoop Job Name': 'Job Name',
'Please enter Mysql Database(required)': 'Please enter Mysql Database(required)',
'Please enter Mysql Table(required)': 'Please enter Mysql Table(required)',
'Please enter Columns (Comma separated)': 'Please enter Columns (Comma separated)',
@ -561,6 +564,8 @@ export default {
'Please enter Lines Terminated': 'Please enter Lines Terminated',
'Please enter Concurrency': 'Please enter Concurrency',
'Please enter Update Key': 'Please enter Update Key',
'Please enter Job Name(required)': 'Please enter Job Name(required)',
'Please enter Custom Shell(required)': 'Please enter Custom Shell(required)',
Direct: 'Direct',
Type: 'Type',
ModelType: 'ModelType',
@ -594,6 +599,8 @@ export default {
'All Columns': 'All Columns',
'Some Columns': 'Some Columns',
'Branch flow': 'Branch flow',
'Custom Job': 'Custom Job',
'Custom Script': 'Custom Script',
'Cannot select the same node for successful branch flow and failed branch flow': 'Cannot select the same node for successful branch flow and failed branch flow',
'Successful branch flow and failed branch flow are required': 'conditions node Successful and failed branch flow are required',
'No resources exist': 'No resources exist',

7
dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js

@ -545,6 +545,9 @@ export default {
'Whether directory': '是否文件夹',
Yes: '是',
No: '否',
'Hadoop Custom Params': 'Hadoop参数',
'Sqoop Advanced Parameters': 'Sqoop参数',
'Sqoop Job Name': '任务名称',
'Please enter Mysql Database(required)': '请输入Mysql数据库(必填)',
'Please enter Mysql Table(required)': '请输入Mysql表名(必填)',
'Please enter Columns (Comma separated)': '请输入列名 , 隔开',
@ -559,6 +562,8 @@ export default {
'Please enter Lines Terminated': '请输入行分隔符',
'Please enter Concurrency': '请输入并发度',
'Please enter Update Key': '请输入更新列',
'Please enter Job Name(required)': '请输入任务名称(必填)',
'Please enter Custom Shell(required)': '请输入自定义脚本',
Direct: '流向',
Type: '类型',
ModelType: '模式',
@ -592,6 +597,8 @@ export default {
'All Columns': '全表导入',
'Some Columns': '选择列',
'Branch flow': '分支流转',
'Custom Job': '自定义任务',
'Custom Script': '自定义脚本',
'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点',
'Successful branch flow and failed branch flow are required': 'conditions节点成功和失败分支流转必填',
'No resources exist': '不存在资源',

1
pom.xml

@ -819,6 +819,7 @@
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/EnvFileTest.java</include>
<include>**/server/worker/task/spark/SparkTaskTest.java</include>
<include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
<include>**/server/worker/EnvFileTest.java</include>
<include>**/service/process/ProcessServiceTest.java</include>
<include>**/service/quartz/cron/CronUtilsTest.java</include>

Loading…
Cancel
Save