From c8f28ab2ba865a40063486ca0879ef183342086d Mon Sep 17 00:00:00 2001 From: eights Date: Tue, 9 Jun 2020 03:16:35 -0700 Subject: [PATCH 1/5] sqoop task optimization --- .../dolphinscheduler/api/enums/Status.java | 2 +- .../common/enums/SqoopJobType.java | 41 ++++++++ .../common/task/sqoop/SqoopParameters.java | 99 +++++++++++++++++-- .../consumer/TaskPriorityQueueConsumer.java | 32 +++--- .../task/sqoop/generator/CommonGenerator.java | 39 ++++++++ .../sources/MysqlSourceGenerator.java | 14 +-- 6 files changed, 198 insertions(+), 29 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index ae7300f54d..7ed6addcb5 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -218,7 +218,7 @@ public enum Status { DATA_IS_NOT_VALID(50017,"data {0} not valid", "数据[{0}]无效"), DATA_IS_NULL(50018,"data {0} is null", "数据[{0}]不能为空"), PROCESS_NODE_HAS_CYCLE(50019,"process node has cycle", "流程节点间存在循环依赖"), - PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node %s parameter invalid", "流程节点[%s]参数无效"), + PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node {0} parameter invalid", "流程节点[{0}]参数无效"), PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"), DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022,"delete process definition by id error", "删除工作流定义错误"), SCHEDULE_CRON_STATE_ONLINE(50023,"the status of schedule {0} is already on line", "调度配置[{0}]已上线"), diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java new file mode 100644 index 0000000000..f1fde27928 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.enums; + +import com.baomidou.mybatisplus.annotation.EnumValue; + +public enum SqoopJobType { + CUSTOM(0, "CUSTOM"), + TEMPLATE(1, "TEMPLATE"); + + SqoopJobType(int code, String descp){ + this.code = code; + this.descp = descp; + } + + @EnumValue + private final int code; + private final String descp; + + public int getCode() { + return code; + } + + public String getDescp() { + return descp; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java index 7f02f42387..8b566a8472 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java @@ -16,6 +16,8 @@ */ package org.apache.dolphinscheduler.common.task.sqoop; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.utils.StringUtils; @@ -28,6 +30,23 @@ import java.util.List; */ public class SqoopParameters extends AbstractParameters { + /** + * sqoop job type: + * CUSTOM - custom sqoop job + * TEMPLATE - sqoop template job + */ + private String jobType; + + /** + * customJob eq 1, use customShell + */ + private String customShell; + + /** + * sqoop job name - map-reduce job name + */ + private String jobName; + /** * model type */ @@ -53,6 +72,16 @@ public class SqoopParameters extends AbstractParameters { */ private String targetParams; + /** + * hadoop custom param for sqoop job + */ + private List hadoopCustomParams; + + /** + * sqoop advanced param + */ + private List sqoopAdvancedParams; + public String getModelType() { return modelType; } @@ -101,18 +130,74 @@ public class SqoopParameters extends AbstractParameters { this.targetParams = targetParams; } + public String getJobType() { + return jobType; + } + + public void setJobType(String jobType) { + this.jobType = jobType; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public String getCustomShell() { + return customShell; + } + + public void setCustomShell(String customShell) { + this.customShell = customShell; + } + + public List getHadoopCustomParams() { + return hadoopCustomParams; + } + + public void setHadoopCustomParams(List hadoopCustomParams) { + this.hadoopCustomParams = hadoopCustomParams; + } + + public List getSqoopAdvancedParams() { + return sqoopAdvancedParams; + } + + public void setSqoopAdvancedParams(List sqoopAdvancedParams) { + this.sqoopAdvancedParams = sqoopAdvancedParams; + } + @Override public boolean checkParameters() { - return StringUtils.isNotEmpty(modelType)&& - concurrency != 0 && - StringUtils.isNotEmpty(sourceType)&& - StringUtils.isNotEmpty(targetType)&& - StringUtils.isNotEmpty(sourceParams)&& - StringUtils.isNotEmpty(targetParams); + + boolean sqoopParamsCheck = false; + + if (StringUtils.isEmpty(jobType)) { + return sqoopParamsCheck; + } + + if (SqoopJobType.TEMPLATE.getDescp().equals(jobType)) { + sqoopParamsCheck = StringUtils.isEmpty(customShell) && + StringUtils.isNotEmpty(modelType) && + StringUtils.isNotEmpty(jobName) && + concurrency != 0 && + StringUtils.isNotEmpty(sourceType) && + StringUtils.isNotEmpty(targetType) && + StringUtils.isNotEmpty(sourceParams) && + StringUtils.isNotEmpty(targetParams); + } else if (SqoopJobType.CUSTOM.getDescp().equals(jobType)) { + sqoopParamsCheck = StringUtils.isNotEmpty(customShell) && + StringUtils.isEmpty(jobName); + } + + return sqoopParamsCheck; } @Override public List getResourceFilesList() { - return new ArrayList<>(); + return new ArrayList<>(); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 3314789fdb..da2363fc81 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.consumer; import com.alibaba.fastjson.JSONObject; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; +import org.apache.dolphinscheduler.common.enums.SqoopJobType; import org.apache.dolphinscheduler.common.enums.TaskType; import org.apache.dolphinscheduler.common.enums.UdfType; import org.apache.dolphinscheduler.common.model.TaskNode; @@ -258,29 +259,32 @@ public class TaskPriorityQueueConsumer extends Thread{ /** - * set datax task relation + * set sqoop task relation * @param sqoopTaskExecutionContext sqoopTaskExecutionContext * @param taskNode taskNode */ private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) { SqoopParameters sqoopParameters = JSONObject.parseObject(taskNode.getParams(), SqoopParameters.class); - SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class); - TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class); + // sqoop job type is template set task relation + if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) { + SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class); + TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class); - DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); - DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); + DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); + DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); - if (dataSource != null){ - sqoopTaskExecutionContext.setDataSourceId(dataSource.getId()); - sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); - sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); - } + if (dataSource != null){ + sqoopTaskExecutionContext.setDataSourceId(dataSource.getId()); + sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); + sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + } - if (dataTarget != null){ - sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId()); - sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); - sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + if (dataTarget != null){ + sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId()); + sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); + sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + } } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java index 4944bac5ba..ffca73544d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java @@ -16,10 +16,17 @@ */ package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; +import org.apache.dolphinscheduler.common.Constants; +import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.utils.CollectionUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + + /** * common script generator */ @@ -32,6 +39,38 @@ public class CommonGenerator { try{ result.append("sqoop ") .append(sqoopParameters.getModelType()); + + //set sqoop job name + result.append(" -D mapred.job.name") + .append(Constants.EQUAL_SIGN) + .append(sqoopParameters.getJobName()); + + //set hadoop custom param + List hadoopCustomParams = sqoopParameters.getHadoopCustomParams(); + if (CollectionUtils.isNotEmpty(hadoopCustomParams)) { + for (Property hadoopCustomParam : hadoopCustomParams) { + String hadoopCustomParamStr = " -D " + hadoopCustomParam.getProp() + + Constants.EQUAL_SIGN + hadoopCustomParam.getValue(); + + if (StringUtils.isNotEmpty(hadoopCustomParamStr)) { + result.append(hadoopCustomParamStr); + } + } + } + + //set sqoop advanced custom param + List sqoopAdvancedParams = sqoopParameters.getSqoopAdvancedParams(); + if (CollectionUtils.isNotEmpty(sqoopAdvancedParams)) { + + for (Property sqoopAdvancedParam : sqoopAdvancedParams) { + String sqoopAdvancedParamStr = " " + sqoopAdvancedParam.getProp() + + " " + sqoopAdvancedParam.getValue(); + if (StringUtils.isNotEmpty(sqoopAdvancedParamStr)) { + result.append(sqoopAdvancedParamStr); + } + } + } + if(sqoopParameters.getConcurrency() >0){ result.append(" -m ") .append(sqoopParameters.getConcurrency()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java index f8e3d57c7d..402f4f8ee0 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java @@ -77,19 +77,19 @@ public class MysqlSourceGenerator implements ISourceGenerator { }else{ srcQuery += " WHERE $CONDITIONS"; } - result.append(" --query \'"+srcQuery+"\'"); + result.append(" --query \'").append(srcQuery).append("\'"); } List mapColumnHive = sourceMysqlParameter.getMapColumnHive(); if(mapColumnHive != null && !mapColumnHive.isEmpty()){ - String columnMap = ""; + StringBuilder columnMap = new StringBuilder(); for(Property item:mapColumnHive){ - columnMap = item.getProp()+"="+ item.getValue()+","; + columnMap.append(item.getProp()).append("=").append(item.getValue()).append(","); } - if(StringUtils.isNotEmpty(columnMap)){ + if(StringUtils.isNotEmpty(columnMap.toString())){ result.append(" --map-column-hive ") .append(columnMap.substring(0,columnMap.length()-1)); } @@ -98,12 +98,12 @@ public class MysqlSourceGenerator implements ISourceGenerator { List mapColumnJava = sourceMysqlParameter.getMapColumnJava(); if(mapColumnJava != null && !mapColumnJava.isEmpty()){ - String columnMap = ""; + StringBuilder columnMap = new StringBuilder(); for(Property item:mapColumnJava){ - columnMap = item.getProp()+"="+ item.getValue()+","; + columnMap.append(item.getProp()).append("=").append(item.getValue()).append(","); } - if(StringUtils.isNotEmpty(columnMap)){ + if(StringUtils.isNotEmpty(columnMap.toString())){ result.append(" --map-column-java ") .append(columnMap.substring(0,columnMap.length()-1)); } From 50da6951fc09b436eabf3fc2ab6338f325c32fd5 Mon Sep 17 00:00:00 2001 From: eights Date: Tue, 9 Jun 2020 03:39:29 -0700 Subject: [PATCH 2/5] sqoop front-end optimization --- .../formModel/tasks/_source/datasource.vue | 2 +- .../dag/_source/formModel/tasks/sqoop.vue | 962 +++++++++++------- .../src/js/module/i18n/locale/en_US.js | 7 + .../src/js/module/i18n/locale/zh_CN.js | 7 + 4 files changed, 612 insertions(+), 366 deletions(-) diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue index a173139d15..05e248f518 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue @@ -131,7 +131,7 @@ }, created () { let supportType = this.supportType || [] - this.typeList = _.cloneDeep(this.store.state.dag.dsTypeListS) + this.typeList = this.data.typeList || _.cloneDeep(this.store.state.dag.dsTypeListS) // Have a specified data source if (supportType.length) { let is = (type) => { diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue index 6594ffcf74..4bed92d27a 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue @@ -1,134 +1,194 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/