From 3d6eee5d8065fc6ac7ef73b01a5815ec4fb901ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AD=99=E6=9C=9D=E5=92=8C?= <649790970@qq.com> Date: Mon, 24 Feb 2020 10:30:17 +0800 Subject: [PATCH] #203 add sqoop task (#1974) * add sqoop task * add sqoop task * add sqoop task test * add sqoop task test * add sqoop task test * modify add sqoop task test * modify sqoop task test * modify sqoop task test * modify sqoop task test * modify sqoop task test 2 * modify sqoop task test 3 * modify sqoop task test 3 * modify sqoop task test 3 * modify sqoop task test 4 --- .../common/enums/QueryType.java | 33 + .../common/enums/TaskType.java | 4 +- .../common/task/sqoop/SqoopParameters.java | 117 +++ .../sqoop/sources/SourceHdfsParameter.java | 36 + .../sqoop/sources/SourceHiveParameter.java | 72 ++ .../sqoop/sources/SourceMysqlParameter.java | 136 +++ .../sqoop/targets/TargetHdfsParameter.java | 97 ++ .../sqoop/targets/TargetHiveParameter.java | 120 +++ .../sqoop/targets/TargetMysqlParameter.java | 132 +++ .../common/utils/TaskParametersUtils.java | 3 + .../common/task/EntityTestUtils.java | 81 ++ .../common/task/SqoopParameterEntityTest.java | 52 + .../server/worker/task/AbstractTask.java | 4 + .../server/worker/task/TaskManager.java | 3 + .../server/worker/task/sqoop/SqoopTask.java | 78 ++ .../task/sqoop/generator/CommonGenerator.java | 45 + .../sqoop/generator/ISourceGenerator.java | 32 + .../sqoop/generator/ITargetGenerator.java | 32 + .../sqoop/generator/SqoopJobGenerator.java | 109 ++ .../sources/HdfsSourceGenerator.java | 56 + .../sources/HiveSourceGenerator.java | 61 ++ .../sources/MysqlSourceGenerator.java | 120 +++ .../targets/HdfsTargetGenerator.java | 75 ++ .../targets/HiveTargetGenerator.java | 85 ++ .../targets/MysqlTargetGenerator.java | 93 ++ .../worker/task/sqoop/SqoopTaskTest.java | 139 +++ .../js/conf/home/pages/dag/_source/config.js | 4 + .../js/conf/home/pages/dag/_source/dag.scss | 3 + .../pages/dag/_source/formModel/formModel.vue | 9 + .../dag/_source/formModel/tasks/sqoop.vue | 981 ++++++++++++++++++ .../conf/home/pages/dag/img/toolbar_SQOOP.png | Bin 0 -> 1157 bytes .../src/js/module/i18n/locale/en_US.js | 51 +- .../src/js/module/i18n/locale/zh_CN.js | 48 +- pom.xml | 2 + 34 files changed, 2910 insertions(+), 3 deletions(-) create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceHdfsParameter.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceHiveParameter.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceMysqlParameter.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetHdfsParameter.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetHiveParameter.java create mode 100644 dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java create mode 100644 dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqoopParameterEntityTest.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java create mode 100644 dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java create mode 100644 dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java create mode 100644 dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue create mode 100644 dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_SQOOP.png diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java new file mode 100644 index 0000000000..13820b4bab --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.enums; + +public enum QueryType { + + FORM, + SQL; + + public static QueryType getEnum(int value){ + for (QueryType e:QueryType.values()) { + if(e.ordinal() == value) { + return e; + } + } + //For values out of enum scope + return null; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java index b996c3aec9..b56e72560c 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/TaskType.java @@ -34,6 +34,7 @@ public enum TaskType { * 8 FLINK * 9 HTTP * 10 DATAX + * 11 SQOOP */ SHELL(0, "shell"), SQL(1, "sql"), @@ -45,7 +46,8 @@ public enum TaskType { DEPENDENT(7, "dependent"), FLINK(8, "flink"), HTTP(9, "http"), - DATAX(10, "datax"); + DATAX(10, "datax"), + SQOOP(11, "sqoop"); TaskType(int code, String descp){ this.code = code; diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java new file mode 100644 index 0000000000..fb65df6c1b --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task.sqoop; + +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.utils.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * sqoop parameters + */ +public class SqoopParameters extends AbstractParameters { + + /** + * model type + */ + private String modelType; + /** + * concurrency + */ + private int concurrency; + /** + * source type + */ + private String sourceType; + /** + * target type + */ + private String targetType; + /** + * source params + */ + private String sourceParams; + /** + * target params + */ + private String targetParams; + + public String getModelType() { + return modelType; + } + + public void setModelType(String modelType) { + this.modelType = modelType; + } + + public int getConcurrency() { + return concurrency; + } + + public void setConcurrency(int concurrency) { + this.concurrency = concurrency; + } + + public String getSourceType() { + return sourceType; + } + + public void setSourceType(String sourceType) { + this.sourceType = sourceType; + } + + public String getTargetType() { + return targetType; + } + + public void setTargetType(String targetType) { + this.targetType = targetType; + } + + public String getSourceParams() { + return sourceParams; + } + + public void setSourceParams(String sourceParams) { + this.sourceParams = sourceParams; + } + + public String getTargetParams() { + return targetParams; + } + + public void setTargetParams(String targetParams) { + this.targetParams = targetParams; + } + + @Override + public boolean checkParameters() { + return StringUtils.isNotEmpty(modelType)&& + concurrency != 0 && + StringUtils.isNotEmpty(sourceType)&& + StringUtils.isNotEmpty(targetType)&& + StringUtils.isNotEmpty(sourceParams)&& + StringUtils.isNotEmpty(targetParams); + } + + @Override + public List getResourceFilesList() { + return new ArrayList<>(); + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceHdfsParameter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceHdfsParameter.java new file mode 100644 index 0000000000..07f1157be0 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceHdfsParameter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task.sqoop.sources; + +/** + * source hdfs parameter + */ +public class SourceHdfsParameter { + + /** + * export dir + */ + private String exportDir; + + public String getExportDir() { + return exportDir; + } + + public void setExportDir(String exportDir) { + this.exportDir = exportDir; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceHiveParameter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceHiveParameter.java new file mode 100644 index 0000000000..a37840f9ea --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceHiveParameter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task.sqoop.sources; + +/** + * source hive parameter + */ +public class SourceHiveParameter { + + /** + * hive database + */ + private String hiveDatabase; + /** + * hive table + */ + private String hiveTable; + /** + * hive partition key + */ + private String hivePartitionKey; + /** + * hive partition value + */ + private String hivePartitionValue; + + public String getHiveDatabase() { + return hiveDatabase; + } + + public void setHiveDatabase(String hiveDatabase) { + this.hiveDatabase = hiveDatabase; + } + + public String getHiveTable() { + return hiveTable; + } + + public void setHiveTable(String hiveTable) { + this.hiveTable = hiveTable; + } + + public String getHivePartitionKey() { + return hivePartitionKey; + } + + public void setHivePartitionKey(String hivePartitionKey) { + this.hivePartitionKey = hivePartitionKey; + } + + public String getHivePartitionValue() { + return hivePartitionValue; + } + + public void setHivePartitionValue(String hivePartitionValue) { + this.hivePartitionValue = hivePartitionValue; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceMysqlParameter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceMysqlParameter.java new file mode 100644 index 0000000000..f80d681b59 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/sources/SourceMysqlParameter.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task.sqoop.sources; + +import org.apache.dolphinscheduler.common.process.Property; + +import java.util.List; + +/** + * source mysql parameter + */ +public class SourceMysqlParameter { + + /** + * src datasource + */ + private int srcDatasource; + /** + * src table + */ + private String srcTable; + /** + * src query type + */ + private int srcQueryType; + /** + * src query sql + */ + private String srcQuerySql; + /** + * src column type + */ + private int srcColumnType; + /** + * src columns + */ + private String srcColumns; + /** + * src condition list + */ + private List srcConditionList; + /** + * map column hive + */ + private List mapColumnHive; + /** + * map column java + */ + private List mapColumnJava; + + public int getSrcDatasource() { + return srcDatasource; + } + + public void setSrcDatasource(int srcDatasource) { + this.srcDatasource = srcDatasource; + } + + public String getSrcTable() { + return srcTable; + } + + public void setSrcTable(String srcTable) { + this.srcTable = srcTable; + } + + public int getSrcQueryType() { + return srcQueryType; + } + + public void setSrcQueryType(int srcQueryType) { + this.srcQueryType = srcQueryType; + } + + public String getSrcQuerySql() { + return srcQuerySql; + } + + public void setSrcQuerySql(String srcQuerySql) { + this.srcQuerySql = srcQuerySql; + } + + public int getSrcColumnType() { + return srcColumnType; + } + + public void setSrcColumnType(int srcColumnType) { + this.srcColumnType = srcColumnType; + } + + public String getSrcColumns() { + return srcColumns; + } + + public void setSrcColumns(String srcColumns) { + this.srcColumns = srcColumns; + } + + public List getSrcConditionList() { + return srcConditionList; + } + + public void setSrcConditionList(List srcConditionList) { + this.srcConditionList = srcConditionList; + } + + public List getMapColumnHive() { + return mapColumnHive; + } + + public void setMapColumnHive(List mapColumnHive) { + this.mapColumnHive = mapColumnHive; + } + + public List getMapColumnJava() { + return mapColumnJava; + } + + public void setMapColumnJava(List mapColumnJava) { + this.mapColumnJava = mapColumnJava; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetHdfsParameter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetHdfsParameter.java new file mode 100644 index 0000000000..524921dcee --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetHdfsParameter.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task.sqoop.targets; + +/** + * target hdfs parameter + */ +public class TargetHdfsParameter { + + /** + * target dir + */ + private String targetPath; + /** + * delete target dir + */ + private boolean deleteTargetDir; + /** + * file type + */ + private String fileType; + /** + * compression codec + */ + private String compressionCodec; + /** + * fields terminated + */ + private String fieldsTerminated; + /** + * lines terminated + */ + private String linesTerminated; + + public String getTargetPath() { + return targetPath; + } + + public void setTargetPath(String targetPath) { + this.targetPath = targetPath; + } + + public boolean isDeleteTargetDir() { + return deleteTargetDir; + } + + public void setDeleteTargetDir(boolean deleteTargetDir) { + this.deleteTargetDir = deleteTargetDir; + } + + public String getFileType() { + return fileType; + } + + public void setFileType(String fileType) { + this.fileType = fileType; + } + + public String getCompressionCodec() { + return compressionCodec; + } + + public void setCompressionCodec(String compressionCodec) { + this.compressionCodec = compressionCodec; + } + + public String getFieldsTerminated() { + return fieldsTerminated; + } + + public void setFieldsTerminated(String fieldsTerminated) { + this.fieldsTerminated = fieldsTerminated; + } + + public String getLinesTerminated() { + return linesTerminated; + } + + public void setLinesTerminated(String linesTerminated) { + this.linesTerminated = linesTerminated; + } + +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetHiveParameter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetHiveParameter.java new file mode 100644 index 0000000000..f9bfde3374 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetHiveParameter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task.sqoop.targets; + +/** + * target hive parameter + */ +public class TargetHiveParameter { + + /** + * hive database + */ + private String hiveDatabase; + /** + * hive table + */ + private String hiveTable; + /** + * create hive table + */ + private boolean createHiveTable; + /** + * drop delimiter + */ + private boolean dropDelimiter; + /** + * hive overwrite + */ + private boolean hiveOverWrite; + /** + * replace delimiter + */ + private String replaceDelimiter; + /** + * hive partition key + */ + private String hivePartitionKey; + /** + * hive partition value + */ + private String hivePartitionValue; + + public String getHiveDatabase() { + return hiveDatabase; + } + + public void setHiveDatabase(String hiveDatabase) { + this.hiveDatabase = hiveDatabase; + } + + public String getHiveTable() { + return hiveTable; + } + + public void setHiveTable(String hiveTable) { + this.hiveTable = hiveTable; + } + + public boolean isCreateHiveTable() { + return createHiveTable; + } + + public void setCreateHiveTable(boolean createHiveTable) { + this.createHiveTable = createHiveTable; + } + + public boolean isDropDelimiter() { + return dropDelimiter; + } + + public void setDropDelimiter(boolean dropDelimiter) { + this.dropDelimiter = dropDelimiter; + } + + public boolean isHiveOverWrite() { + return hiveOverWrite; + } + + public void setHiveOverWrite(boolean hiveOverWrite) { + this.hiveOverWrite = hiveOverWrite; + } + + public String getReplaceDelimiter() { + return replaceDelimiter; + } + + public void setReplaceDelimiter(String replaceDelimiter) { + this.replaceDelimiter = replaceDelimiter; + } + + public String getHivePartitionKey() { + return hivePartitionKey; + } + + public void setHivePartitionKey(String hivePartitionKey) { + this.hivePartitionKey = hivePartitionKey; + } + + public String getHivePartitionValue() { + return hivePartitionValue; + } + + public void setHivePartitionValue(String hivePartitionValue) { + this.hivePartitionValue = hivePartitionValue; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java new file mode 100644 index 0000000000..47126ae993 --- /dev/null +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task.sqoop.targets; + +/** + * target mysql parameter + */ +public class TargetMysqlParameter { + + /** + * target datasource + */ + private int targetDatasource; + /** + * target table + */ + private String targetTable; + /** + * target columns + */ + private String targetColumns; + /** + * fields terminated + */ + private String fieldsTerminated; + /** + * lines terminated + */ + private String linesTerminated; + /** + * pre query + */ + private String preQuery; + /** + * is update + */ + private boolean isUpdate; + /** + * target update key + */ + private String targetUpdateKey; + /** + * target update mode + */ + private String targetUpdateMode; + + public int getTargetDatasource() { + return targetDatasource; + } + + public void setTargetDatasource(int targetDatasource) { + this.targetDatasource = targetDatasource; + } + + public String getTargetTable() { + return targetTable; + } + + public void setTargetTable(String targetTable) { + this.targetTable = targetTable; + } + + public String getTargetColumns() { + return targetColumns; + } + + public void setTargetColumns(String targetColumns) { + this.targetColumns = targetColumns; + } + + public String getFieldsTerminated() { + return fieldsTerminated; + } + + public void setFieldsTerminated(String fieldsTerminated) { + this.fieldsTerminated = fieldsTerminated; + } + + public String getLinesTerminated() { + return linesTerminated; + } + + public void setLinesTerminated(String linesTerminated) { + this.linesTerminated = linesTerminated; + } + + public String getPreQuery() { + return preQuery; + } + + public void setPreQuery(String preQuery) { + this.preQuery = preQuery; + } + + public boolean isUpdate() { + return isUpdate; + } + + public void setUpdate(boolean update) { + isUpdate = update; + } + + public String getTargetUpdateKey() { + return targetUpdateKey; + } + + public void setTargetUpdateKey(String targetUpdateKey) { + this.targetUpdateKey = targetUpdateKey; + } + + public String getTargetUpdateMode() { + return targetUpdateMode; + } + + public void setTargetUpdateMode(String targetUpdateMode) { + this.targetUpdateMode = targetUpdateMode; + } +} diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java index a2ae6a68e3..7a0e069a9a 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/TaskParametersUtils.java @@ -28,6 +28,7 @@ import org.apache.dolphinscheduler.common.task.python.PythonParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.task.spark.SparkParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.task.subprocess.SubProcessParameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +72,8 @@ public class TaskParametersUtils { return JSONUtils.parseObject(parameter, HttpParameters.class); case DATAX: return JSONUtils.parseObject(parameter, DataxParameters.class); + case SQOOP: + return JSONUtils.parseObject(parameter, SqoopParameters.class); default: return null; } diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java new file mode 100644 index 0000000000..5d867bc4d9 --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/EntityTestUtils.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.*; + +/** + * entity test utils + */ +public class EntityTestUtils { + + private static final Map OBJECT_MAP = new HashMap<>(); + + private static final String SKIP_METHOD = "getClass,notify,notifyAll,wait,equals,hashCode,clone"; + + static { + OBJECT_MAP.put("java.lang.Long", 1L); + OBJECT_MAP.put("java.lang.String", "test"); + OBJECT_MAP.put("java.lang.Integer", 1); + OBJECT_MAP.put("int", 1); + OBJECT_MAP.put("long", 1L); + OBJECT_MAP.put("java.util.Date", new Date()); + OBJECT_MAP.put("char", '1'); + OBJECT_MAP.put("java.util.Map", new HashMap()); + OBJECT_MAP.put("boolean", true); + } + + public static void run(List classList) + throws IllegalAccessException, InvocationTargetException, InstantiationException { + for (Class temp : classList) { + Object tempInstance = new Object(); + Constructor[] constructors = temp.getConstructors(); + for (Constructor constructor : constructors) { + final Class[] parameterTypes = constructor.getParameterTypes(); + if (parameterTypes.length == 0) { + tempInstance = constructor.newInstance(); + } else { + Object[] objects = new Object[parameterTypes.length]; + for (int i = 0; i < parameterTypes.length; i++) { + objects[i] = OBJECT_MAP.get(parameterTypes[i].getName()); + } + tempInstance = constructor.newInstance(objects); + } + } + + Method[] methods = temp.getMethods(); + for (final Method method : methods) { + if (SKIP_METHOD.contains(method.getName())) { + break; + } + final Class[] parameterTypes = method.getParameterTypes(); + if (parameterTypes.length != 0) { + Object[] objects = new Object[parameterTypes.length]; + for (int i = 0; i < parameterTypes.length; i++) { + objects[i] = OBJECT_MAP.get(parameterTypes[i].getName()); + } + method.invoke(tempInstance, objects); + } else { + method.invoke(tempInstance); + } + } + } + } +} \ No newline at end of file diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqoopParameterEntityTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqoopParameterEntityTest.java new file mode 100644 index 0000000000..5f35e89ddd --- /dev/null +++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/task/SqoopParameterEntityTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.common.task; + +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter; +import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter; +import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; +import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter; +import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter; +import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; +import org.junit.Assert; +import org.junit.Test; +import java.util.ArrayList; +import java.util.List; + +/** + * sqoop parameter entity test + */ +public class SqoopParameterEntityTest { + + @Test + public void testEntity(){ + try { + List classList = new ArrayList<>(); + classList.add(SourceMysqlParameter.class); + classList.add(SourceHiveParameter.class); + classList.add(SourceHdfsParameter.class); + classList.add(SqoopParameters.class); + classList.add(TargetMysqlParameter.class); + classList.add(TargetHiveParameter.class); + classList.add(TargetHdfsParameter.class); + EntityTestUtils.run(classList); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java index f2772d0747..5c50df2265 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.common.task.python.PythonParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; import org.apache.dolphinscheduler.common.task.spark.SparkParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters; +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.TaskRecordDao; import org.apache.dolphinscheduler.server.utils.ParamUtils; @@ -198,6 +199,9 @@ public abstract class AbstractTask { case DATAX: paramsClass = DataxParameters.class; break; + case SQOOP: + paramsClass = SqoopParameters.class; + break; default: logger.error("not support this task type: {}", taskType); throw new IllegalArgumentException("not support this task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java index 67deb7a3fa..1ab6f2f3da 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/TaskManager.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.worker.task.python.PythonTask; import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask; import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask; import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask; +import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopTask; import org.slf4j.Logger; /** @@ -68,6 +69,8 @@ public class TaskManager { return new HttpTask(props, logger); case DATAX: return new DataxTask(props, logger); + case SQOOP: + return new SqoopTask(props, logger); default: logger.error("unsupport task type: {}", taskType); throw new IllegalArgumentException("not support task type"); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java new file mode 100644 index 0000000000..64bc7924d2 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop; + +import com.alibaba.fastjson.JSON; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.AbstractParameters; +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.server.utils.ParamUtils; +import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; +import org.slf4j.Logger; +import java.util.Map; + +/** + * sqoop task extends the shell task + */ +public class SqoopTask extends AbstractYarnTask { + + private SqoopParameters sqoopParameters; + + public SqoopTask(TaskProps props, Logger logger){ + super(props,logger); + } + + @Override + public void init() throws Exception { + logger.info("sqoop task params {}", taskProps.getTaskParams()); + sqoopParameters = + JSON.parseObject(taskProps.getTaskParams(),SqoopParameters.class); + if (!sqoopParameters.checkParameters()) { + throw new RuntimeException("sqoop task params is not valid"); + } + + } + + @Override + protected String buildCommand() throws Exception { + //get sqoop scripts + SqoopJobGenerator generator = new SqoopJobGenerator(); + String script = generator.generateSqoopJob(sqoopParameters); + + Map paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), + taskProps.getDefinedParams(), + sqoopParameters.getLocalParametersMap(), + taskProps.getCmdTypeIfComplement(), + taskProps.getScheduleTime()); + + if(paramsMap != null){ + String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); + logger.info("sqoop script: {}", resultScripts); + return resultScripts; + } + + return null; + } + + @Override + public AbstractParameters getParameters() { + return sqoopParameters; + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java new file mode 100644 index 0000000000..4944bac5ba --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; + +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * common script generator + */ +public class CommonGenerator { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + public String generate(SqoopParameters sqoopParameters) { + StringBuilder result = new StringBuilder(); + try{ + result.append("sqoop ") + .append(sqoopParameters.getModelType()); + if(sqoopParameters.getConcurrency() >0){ + result.append(" -m ") + .append(sqoopParameters.getConcurrency()); + } + }catch (Exception e){ + logger.error(e.getMessage()); + } + + return result.toString(); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java new file mode 100644 index 0000000000..6c1d1fdca8 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; + +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; + +/** + * Source Generator Interface + */ +public interface ISourceGenerator { + + /** + * generate the source script + * @param sqoopParameters sqoop params + * @return + */ + String generate(SqoopParameters sqoopParameters); +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java new file mode 100644 index 0000000000..be307af5f2 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; + +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; + +/** + * Target Generator Interface + */ +public interface ITargetGenerator { + + /** + * generate the target script + * @param sqoopParameters sqoop params + * @return + */ + String generate(SqoopParameters sqoopParameters); +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java new file mode 100644 index 0000000000..24c76e027d --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop.generator; + +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HdfsSourceGenerator; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HiveSourceGenerator; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.MysqlSourceGenerator; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets.HdfsTargetGenerator; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets.HiveTargetGenerator; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets.MysqlTargetGenerator; + +/** + * Sqoop Job Scripts Generator + */ +public class SqoopJobGenerator { + + private static final String MYSQL = "MYSQL"; + private static final String HIVE = "HIVE"; + private static final String HDFS = "HDFS"; + + /** + * target script generator + */ + private ITargetGenerator targetGenerator; + /** + * source script generator + */ + private ISourceGenerator sourceGenerator; + /** + * common script generator + */ + private CommonGenerator commonGenerator; + + public SqoopJobGenerator(){ + commonGenerator = new CommonGenerator(); + } + + private void createSqoopJobGenerator(String sourceType,String targetType){ + sourceGenerator = createSourceGenerator(sourceType); + targetGenerator = createTargetGenerator(targetType); + } + + /** + * get the final sqoop scripts + * @param sqoopParameters + * @return + */ + public String generateSqoopJob(SqoopParameters sqoopParameters){ + createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType()); + if(sourceGenerator == null || targetGenerator == null){ + return null; + } + + return commonGenerator.generate(sqoopParameters) + + sourceGenerator.generate(sqoopParameters) + + targetGenerator.generate(sqoopParameters); + } + + /** + * get the source generator + * @param sourceType + * @return + */ + private ISourceGenerator createSourceGenerator(String sourceType){ + switch (sourceType){ + case MYSQL: + return new MysqlSourceGenerator(); + case HIVE: + return new HiveSourceGenerator(); + case HDFS: + return new HdfsSourceGenerator(); + default: + return null; + } + } + + /** + * get the target generator + * @param targetType + * @return + */ + private ITargetGenerator createTargetGenerator(String targetType){ + switch (targetType){ + case MYSQL: + return new MysqlTargetGenerator(); + case HIVE: + return new HiveTargetGenerator(); + case HDFS: + return new HdfsTargetGenerator(); + default: + return null; + } + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java new file mode 100644 index 0000000000..47b01363e6 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources; + +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.common.utils.StringUtils; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * hdfs source generator + */ +public class HdfsSourceGenerator implements ISourceGenerator { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public String generate(SqoopParameters sqoopParameters) { + StringBuilder result = new StringBuilder(); + try{ + SourceHdfsParameter sourceHdfsParameter + = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHdfsParameter.class); + + if(sourceHdfsParameter != null){ + if(StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())){ + result.append(" --export-dir ") + .append(sourceHdfsParameter.getExportDir()); + }else{ + throw new Exception("--export-dir is null"); + } + + } + }catch (Exception e){ + logger.error("get hdfs source failed",e); + } + + return result.toString(); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java new file mode 100644 index 0000000000..91363e296a --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources; + +import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * hive source generator + */ +public class HiveSourceGenerator implements ISourceGenerator { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public String generate(SqoopParameters sqoopParameters) { + StringBuilder sb = new StringBuilder(); + try{ + SourceHiveParameter sourceHiveParameter + = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHiveParameter.class); + if(sourceHiveParameter != null){ + if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())){ + sb.append(" --hcatalog-database ").append(sourceHiveParameter.getHiveDatabase()); + } + + if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())){ + sb.append(" --hcatalog-table ").append(sourceHiveParameter.getHiveTable()); + } + + if(StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey())&& + StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())){ + sb.append(" --hcatalog-partition-keys ").append(sourceHiveParameter.getHivePartitionKey()) + .append(" --hcatalog-partition-values ").append(sourceHiveParameter.getHivePartitionValue()); + } + } + }catch (Exception e){ + logger.error(e.getMessage()); + } + + return sb.toString(); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java new file mode 100644 index 0000000000..050fef7cc7 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources; + +import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.enums.QueryType; +import org.apache.dolphinscheduler.common.process.Property; +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; +import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * mysql source generator + */ +public class MysqlSourceGenerator implements ISourceGenerator { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public String generate(SqoopParameters sqoopParameters) { + StringBuilder result = new StringBuilder(); + try { + SourceMysqlParameter sourceMysqlParameter + = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceMysqlParameter.class); + + if(sourceMysqlParameter != null){ + ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); + DataSource dataSource= processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource()); + BaseDataSource baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(), + dataSource.getConnectionParams()); + if(baseDataSource != null){ + result.append(" --connect ") + .append(baseDataSource.getJdbcUrl()) + .append(" --username ") + .append(baseDataSource.getUser()) + .append(" --password ") + .append(baseDataSource.getPassword()); + + if(sourceMysqlParameter.getSrcQueryType() == QueryType.FORM.ordinal()){ + if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())){ + result.append(" --table ").append(sourceMysqlParameter.getSrcTable()); + } + + if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())){ + result.append(" --columns ").append(sourceMysqlParameter.getSrcColumns()); + } + + }else if(sourceMysqlParameter.getSrcQueryType() == QueryType.SQL.ordinal()){ + if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())){ + + String srcQuery = sourceMysqlParameter.getSrcQuerySql(); + if(srcQuery.toLowerCase().contains("where")){ + srcQuery += " AND "+"$CONDITIONS"; + }else{ + srcQuery += " WHERE $CONDITIONS"; + } + result.append(" --query \'"+srcQuery+"\'"); + } + } + + List mapColumnHive = sourceMysqlParameter.getMapColumnHive(); + + if(mapColumnHive != null && !mapColumnHive.isEmpty()){ + String columnMap = ""; + for(Property item:mapColumnHive){ + columnMap = item.getProp()+"="+ item.getValue()+","; + } + + if(StringUtils.isNotEmpty(columnMap)){ + result.append(" --map-column-hive ") + .append(columnMap.substring(0,columnMap.length()-1)); + } + } + + List mapColumnJava = sourceMysqlParameter.getMapColumnJava(); + + if(mapColumnJava != null && !mapColumnJava.isEmpty()){ + String columnMap = ""; + for(Property item:mapColumnJava){ + columnMap = item.getProp()+"="+ item.getValue()+","; + } + + if(StringUtils.isNotEmpty(columnMap)){ + result.append(" --map-column-java ") + .append(columnMap.substring(0,columnMap.length()-1)); + } + } + } + } + }catch (Exception e){ + logger.error(e.getMessage()); + } + + return result.toString(); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java new file mode 100644 index 0000000000..411e9b4450 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets; + +import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * hdfs target generator + */ +public class HdfsTargetGenerator implements ITargetGenerator { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public String generate(SqoopParameters sqoopParameters) { + StringBuilder result = new StringBuilder(); + try{ + TargetHdfsParameter targetHdfsParameter = + JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHdfsParameter.class); + + if(targetHdfsParameter != null){ + + if(StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())){ + result.append(" --target-dir ").append(targetHdfsParameter.getTargetPath()); + } + + if(StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())){ + result.append(" --compression-codec ").append(targetHdfsParameter.getCompressionCodec()); + } + + if(StringUtils.isNotEmpty(targetHdfsParameter.getFileType())){ + result.append(" ").append(targetHdfsParameter.getFileType()); + } + + if(targetHdfsParameter.isDeleteTargetDir()){ + result.append(" --delete-target-dir"); + } + + if(StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())){ + result.append(" --fields-terminated-by '").append(targetHdfsParameter.getFieldsTerminated()).append("'"); + } + + if(StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())){ + result.append(" --lines-terminated-by '").append(targetHdfsParameter.getLinesTerminated()).append("'"); + } + + result.append(" --null-non-string 'NULL' --null-string 'NULL'"); + } + }catch(Exception e){ + logger.error(e.getMessage()); + } + + return result.toString(); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java new file mode 100644 index 0000000000..ad59173ad0 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets; + +import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * hive target generator + */ +public class HiveTargetGenerator implements ITargetGenerator { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public String generate(SqoopParameters sqoopParameters) { + + StringBuilder result = new StringBuilder(); + + try{ + TargetHiveParameter targetHiveParameter = + JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHiveParameter.class); + if(targetHiveParameter != null){ + + result.append(" --hive-import "); + + if(StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase())&& + StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())){ + result.append(" --hive-table ") + .append(targetHiveParameter.getHiveDatabase()) + .append(".") + .append(targetHiveParameter.getHiveTable()); + } + + if(targetHiveParameter.isCreateHiveTable()){ + result.append(" --create-hive-table"); + } + + if(targetHiveParameter.isDropDelimiter()){ + result.append(" --hive-drop-import-delims"); + } + + if(targetHiveParameter.isHiveOverWrite()){ + result.append(" --hive-overwrite -delete-target-dir"); + } + + if(StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())){ + result.append(" --hive-delims-replacement ").append(targetHiveParameter.getReplaceDelimiter()); + } + + if(StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey())&& + StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())){ + result.append(" --hive-partition-key ") + .append(targetHiveParameter.getHivePartitionKey()) + .append(" --hive-partition-value ") + .append(targetHiveParameter.getHivePartitionValue()); + } + + } + }catch(Exception e){ + logger.error(e.getMessage()); + } + + return result.toString(); + } +} diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java new file mode 100644 index 0000000000..0733338812 --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets; + +import org.apache.commons.lang.StringUtils; +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; +import org.apache.dolphinscheduler.common.utils.JSONUtils; +import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; +import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * mysql target generator + */ +public class MysqlTargetGenerator implements ITargetGenerator { + + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public String generate(SqoopParameters sqoopParameters) { + + StringBuilder result = new StringBuilder(); + try{ + + TargetMysqlParameter targetMysqlParameter = + JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetMysqlParameter.class); + + if(targetMysqlParameter != null && targetMysqlParameter.getTargetDatasource() != 0){ + + ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); + DataSource dataSource= processService.findDataSourceById(targetMysqlParameter.getTargetDatasource()); + // get datasource + BaseDataSource baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(), + dataSource.getConnectionParams()); + + if(baseDataSource != null){ + result.append(" --connect ") + .append(baseDataSource.getJdbcUrl()) + .append(" --username ") + .append(baseDataSource.getUser()) + .append(" --password ") + .append(baseDataSource.getPassword()) + .append(" --table ") + .append(targetMysqlParameter.getTargetTable()); + + if(StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())){ + result.append(" --columns ").append(targetMysqlParameter.getTargetColumns()); + } + + if(StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())){ + result.append(" --fields-terminated-by '").append(targetMysqlParameter.getFieldsTerminated()).append("'"); + } + + if(StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())){ + result.append(" --lines-terminated-by '").append(targetMysqlParameter.getLinesTerminated()).append("'"); + } + + if(targetMysqlParameter.isUpdate()){ + if(StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey())&& + StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())){ + result.append(" --update-key ").append(targetMysqlParameter.getTargetUpdateKey()) + .append(" --update-mode ").append(targetMysqlParameter.getTargetUpdateMode()); + } + } + } + } + }catch (Exception e){ + logger.error(e.getMessage()); + } + + return result.toString(); + } +} diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java new file mode 100644 index 0000000000..f8688e7c0c --- /dev/null +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dolphinscheduler.server.worker.task.sqoop; + +import com.alibaba.fastjson.JSONObject; +import org.apache.dolphinscheduler.common.enums.DbType; +import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; +import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter; +import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter; +import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter; +import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter; +import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter; +import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter; +import org.apache.dolphinscheduler.dao.entity.DataSource; +import org.apache.dolphinscheduler.server.worker.task.TaskProps; +import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; +import org.apache.dolphinscheduler.service.process.ProcessService; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContext; + +import java.util.*; + +/** + * sqoop task test + */ +@RunWith(MockitoJUnitRunner.Silent.class) +public class SqoopTaskTest { + + private static final Logger logger = LoggerFactory.getLogger(SqoopTaskTest.class); + + private ProcessService processService; + private ApplicationContext applicationContext; + private SqoopTask sqoopTask; + + @Before + public void before() throws Exception{ + processService = Mockito.mock(ProcessService.class); + Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); + applicationContext = Mockito.mock(ApplicationContext.class); + SpringApplicationContext springApplicationContext = new SpringApplicationContext(); + springApplicationContext.setApplicationContext(applicationContext); + Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); + + TaskProps props = new TaskProps(); + props.setTaskDir("/tmp"); + props.setTaskAppId(String.valueOf(System.currentTimeMillis())); + props.setTaskInstId(1); + props.setTenantCode("1"); + props.setEnvFile(".dolphinscheduler_env.sh"); + props.setTaskStartTime(new Date()); + props.setTaskTimeout(0); + props.setTaskParams("{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"); + + sqoopTask = new SqoopTask(props,logger); + sqoopTask.init(); + } + + @Test + public void testGenerator(){ + String data1 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters sqoopParameters1 = JSONObject.parseObject(data1,SqoopParameters.class); + + SqoopJobGenerator generator = new SqoopJobGenerator(); + String script = generator.generateSqoopJob(sqoopParameters1); + String expected = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; + Assert.assertEquals(expected, script); + + String data2 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters sqoopParameters2 = JSONObject.parseObject(data2,SqoopParameters.class); + + String script2 = generator.generateSqoopJob(sqoopParameters2); + String expected2 = "sqoop export -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; + Assert.assertEquals(expected2, script2); + + String data3 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; + SqoopParameters sqoopParameters3 = JSONObject.parseObject(data3,SqoopParameters.class); + + String script3 = generator.generateSqoopJob(sqoopParameters3); + String expected3 = "sqoop export -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'"; + Assert.assertEquals(expected3, script3); + + String data4 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; + SqoopParameters sqoopParameters4 = JSONObject.parseObject(data4,SqoopParameters.class); + + String script4 = generator.generateSqoopJob(sqoopParameters4); + String expected4 = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; + Assert.assertEquals(expected4, script4); + + } + + private DataSource getDataSource() { + DataSource dataSource = new DataSource(); + dataSource.setType(DbType.MYSQL); + dataSource.setConnectionParams( + "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}"); + dataSource.setUserId(1); + return dataSource; + } + + @Test + public void testGetParameters() { + Assert.assertNotNull(sqoopTask.getParameters()); + } + + /** + * Method: init + */ + @Test + public void testInit(){ + try { + sqoopTask.init(); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + +} diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js index e8187043bf..be4daa6595 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/config.js @@ -283,6 +283,10 @@ let tasksType = { 'DATAX': { desc: 'DataX', color: '#1fc747' + }, + 'SQOOP': { + desc: 'SQOOP', + color: '#E46F13' } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss index 420bae8c89..08918c969f 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.scss @@ -104,6 +104,9 @@ .icos-DATAX { background: url("../img/toolbar_DATAX.png") no-repeat 50% 50%; } + .icos-SQOOP { + background: url("../img/toolbar_SQOOP.png") no-repeat 50% 50%; + } .toolbar { width: 60px; height: 100%; diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue index e05de8e880..177b252693 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/formModel.vue @@ -204,6 +204,13 @@ ref="DATAX" :backfill-item="backfillItem"> + +
@@ -229,6 +236,7 @@ import mDependent from './tasks/dependent' import mHttp from './tasks/http' import mDatax from './tasks/datax' + import mSqoop from './tasks/sqoop' import mSubProcess from './tasks/sub_process' import mSelectInput from './_source/selectInput' import mTimeoutAlarm from './_source/timeoutAlarm' @@ -589,6 +597,7 @@ mDependent, mHttp, mDatax, + mSqoop, mSelectInput, mTimeoutAlarm, mPriority, diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue new file mode 100644 index 0000000000..eead745a06 --- /dev/null +++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue @@ -0,0 +1,981 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + + diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_SQOOP.png b/dolphinscheduler-ui/src/js/conf/home/pages/dag/img/toolbar_SQOOP.png new file mode 100644 index 0000000000000000000000000000000000000000..2ab3b6bd4a5c069a9a864431a68f37e4b49ed8fa GIT binary patch literal 1157 zcmV;01bX|4P)Px#1ZP1_K>z@;j|==^1poj532;bRa{vGi!vFvd!vV){sAK>D1R6<1K~zXf?UiXv zTvZguZ^JOvrlgu`z^F7)2EH^>T0>~mVrr>XQ9qCvV$-kJGt46`rKa~Bx8k%>*e`TOwR`JZ#{dH0@s z-+M^7XZ&CKCX%HGr-uE}026``hz6&E(@YQ?{_9$=feFIU^r+46VB!{V=~4IiX`?a} z3-42ZamL&Yo=&c(&wilt-xNSbg&a~y2WX8|J`kAWfAwz@+z;A|W}L36k{OUvY` zRzt_6R}T`GR_NPE2;&GiPaBjI9vzkaj=!`Xy^=diB?acuVOB9A2$6WS&wdf64^yEY zh5lE#qN$L3Y-XIgD7yweWESDPt-saK&IF;)cJWW!1tthbC*Sd z(}WC+rg6Dt&W5vy(>Tlhq&b#IBtSyw8HcZJK( zLbnhi)x=cDzO4n3T~LA|ju8?-C1vlk6ASq5)VY}E*4~dl+FMt@pQbc7{!m$6*{;GT zn#9wo-}HAUh=(K_ajy@5RY_u)ZEx%Z&pR(~0K2T>bCT?(A)x5_lI8fmBJZ(Pa4UV~ zCAjX`xeNP{2vS{E%laoC0r9r9o}Cb%O;`3>G08yL8(x1l50AX&qCsI(&I2H~?vN6d zAX(_D4KMP|7tl?Jl<>jo-G2BI(0IoUxfVb(1t z<%fT47#uN?h=7Lrl(LR~D&{u?733Gdb=&qm;NsLO>O^U)#&rCFPj*wawfqB@!@B0d zw2aJ0iG%fe0d%GN7zv;i_mMUtlv7+Tge}yjJeh?XQkY*z)H1nBJ*ie@5RtoXBYf5gGbrlC^&7D?y0Vk;{Fz8-wz{ocl?$_#A5OC)e@%W zx%pAI<*7^&l)hpNF`XJIm6oi;>%FW?{_j&ErP80)yj%KeslgdxIyF)%{olA}r3K+% XvR#xuLVs**/common/threadutils/*.java **/common/graph/*.java **/common/queue/*.java + **/common/task/SqoopParameterEntityTest.java **/api/utils/CheckUtilsTest.java **/api/utils/FileUtilsTest.java **/api/utils/FourLetterWordTest.java @@ -727,6 +728,7 @@ **/alert/template/AlertTemplateFactoryTest.java **/alert/template/impl/DefaultHTMLTemplateTest.java **/server/worker/task/datax/DataxTaskTest.java + **/server/worker/task/sqoop/SqoopTaskTest.java **/server/utils/DataxUtilsTest.java